about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/h2/connection.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/h2/connection.py')
-rw-r--r--.venv/lib/python3.12/site-packages/h2/connection.py2112
1 files changed, 2112 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/h2/connection.py b/.venv/lib/python3.12/site-packages/h2/connection.py
new file mode 100644
index 00000000..28be9fca
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/h2/connection.py
@@ -0,0 +1,2112 @@
+"""
+h2/connection
+~~~~~~~~~~~~~
+
+An implementation of a HTTP/2 connection.
+"""
+from __future__ import annotations
+
+import base64
+from enum import Enum, IntEnum
+from typing import TYPE_CHECKING, Any, Callable
+
+from hpack.exceptions import HPACKError, OversizedHeaderListError
+from hpack.hpack import Decoder, Encoder
+from hyperframe.exceptions import InvalidPaddingError
+from hyperframe.frame import (
+    AltSvcFrame,
+    ContinuationFrame,
+    DataFrame,
+    ExtensionFrame,
+    Frame,
+    GoAwayFrame,
+    HeadersFrame,
+    PingFrame,
+    PriorityFrame,
+    PushPromiseFrame,
+    RstStreamFrame,
+    SettingsFrame,
+    WindowUpdateFrame,
+)
+
+from .config import H2Configuration
+from .errors import ErrorCodes, _error_code_from_int
+from .events import (
+    AlternativeServiceAvailable,
+    ConnectionTerminated,
+    Event,
+    InformationalResponseReceived,
+    PingAckReceived,
+    PingReceived,
+    PriorityUpdated,
+    RemoteSettingsChanged,
+    RequestReceived,
+    ResponseReceived,
+    SettingsAcknowledged,
+    TrailersReceived,
+    UnknownFrameReceived,
+    WindowUpdated,
+)
+from .exceptions import (
+    DenialOfServiceError,
+    FlowControlError,
+    FrameTooLargeError,
+    NoAvailableStreamIDError,
+    NoSuchStreamError,
+    ProtocolError,
+    RFC1122Error,
+    StreamClosedError,
+    StreamIDTooLowError,
+    TooManyStreamsError,
+)
+from .frame_buffer import FrameBuffer
+from .settings import ChangedSetting, SettingCodes, Settings
+from .stream import H2Stream, StreamClosedBy
+from .utilities import SizeLimitDict, guard_increment_window
+from .windows import WindowManager
+
+if TYPE_CHECKING:  # pragma: no cover
+    from collections.abc import Iterable
+
+    from hpack.struct import Header, HeaderWeaklyTyped
+
+
+class ConnectionState(Enum):
+    IDLE = 0
+    CLIENT_OPEN = 1
+    SERVER_OPEN = 2
+    CLOSED = 3
+
+
+class ConnectionInputs(Enum):
+    SEND_HEADERS = 0
+    SEND_PUSH_PROMISE = 1
+    SEND_DATA = 2
+    SEND_GOAWAY = 3
+    SEND_WINDOW_UPDATE = 4
+    SEND_PING = 5
+    SEND_SETTINGS = 6
+    SEND_RST_STREAM = 7
+    SEND_PRIORITY = 8
+    RECV_HEADERS = 9
+    RECV_PUSH_PROMISE = 10
+    RECV_DATA = 11
+    RECV_GOAWAY = 12
+    RECV_WINDOW_UPDATE = 13
+    RECV_PING = 14
+    RECV_SETTINGS = 15
+    RECV_RST_STREAM = 16
+    RECV_PRIORITY = 17
+    SEND_ALTERNATIVE_SERVICE = 18  # Added in 2.3.0
+    RECV_ALTERNATIVE_SERVICE = 19  # Added in 2.3.0
+
+
+class AllowedStreamIDs(IntEnum):
+    EVEN = 0
+    ODD = 1
+
+
+class H2ConnectionStateMachine:
+    """
+    A single HTTP/2 connection state machine.
+
+    This state machine, while defined in its own class, is logically part of
+    the H2Connection class also defined in this file. The state machine itself
+    maintains very little state directly, instead focusing entirely on managing
+    state transitions.
+    """
+
+    # For the purposes of this state machine we treat HEADERS and their
+    # associated CONTINUATION frames as a single jumbo frame. The protocol
+    # allows/requires this by preventing other frames from being interleved in
+    # between HEADERS/CONTINUATION frames.
+    #
+    # The _transitions dictionary contains a mapping of tuples of
+    # (state, input) to tuples of (side_effect_function, end_state). This map
+    # contains all allowed transitions: anything not in this map is invalid
+    # and immediately causes a transition to ``closed``.
+
+    _transitions = {
+        # State: idle
+        (ConnectionState.IDLE, ConnectionInputs.SEND_HEADERS):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.IDLE, ConnectionInputs.RECV_HEADERS):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.IDLE, ConnectionInputs.SEND_SETTINGS):
+            (None, ConnectionState.IDLE),
+        (ConnectionState.IDLE, ConnectionInputs.RECV_SETTINGS):
+            (None, ConnectionState.IDLE),
+        (ConnectionState.IDLE, ConnectionInputs.SEND_WINDOW_UPDATE):
+            (None, ConnectionState.IDLE),
+        (ConnectionState.IDLE, ConnectionInputs.RECV_WINDOW_UPDATE):
+            (None, ConnectionState.IDLE),
+        (ConnectionState.IDLE, ConnectionInputs.SEND_PING):
+            (None, ConnectionState.IDLE),
+        (ConnectionState.IDLE, ConnectionInputs.RECV_PING):
+            (None, ConnectionState.IDLE),
+        (ConnectionState.IDLE, ConnectionInputs.SEND_GOAWAY):
+            (None, ConnectionState.CLOSED),
+        (ConnectionState.IDLE, ConnectionInputs.RECV_GOAWAY):
+            (None, ConnectionState.CLOSED),
+        (ConnectionState.IDLE, ConnectionInputs.SEND_PRIORITY):
+            (None, ConnectionState.IDLE),
+        (ConnectionState.IDLE, ConnectionInputs.RECV_PRIORITY):
+            (None, ConnectionState.IDLE),
+        (ConnectionState.IDLE, ConnectionInputs.SEND_ALTERNATIVE_SERVICE):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.IDLE, ConnectionInputs.RECV_ALTERNATIVE_SERVICE):
+            (None, ConnectionState.CLIENT_OPEN),
+
+        # State: open, client side.
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_HEADERS):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_DATA):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_GOAWAY):
+            (None, ConnectionState.CLOSED),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_WINDOW_UPDATE):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_PING):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_SETTINGS):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_PRIORITY):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_HEADERS):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PUSH_PROMISE):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_DATA):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_GOAWAY):
+            (None, ConnectionState.CLOSED),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_WINDOW_UPDATE):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PING):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_SETTINGS):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_RST_STREAM):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_RST_STREAM):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PRIORITY):
+            (None, ConnectionState.CLIENT_OPEN),
+        (ConnectionState.CLIENT_OPEN,
+            ConnectionInputs.RECV_ALTERNATIVE_SERVICE):
+                (None, ConnectionState.CLIENT_OPEN),
+
+        # State: open, server side.
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_HEADERS):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PUSH_PROMISE):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_DATA):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_GOAWAY):
+            (None, ConnectionState.CLOSED),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_WINDOW_UPDATE):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PING):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_SETTINGS):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PRIORITY):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_HEADERS):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_DATA):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_GOAWAY):
+            (None, ConnectionState.CLOSED),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_WINDOW_UPDATE):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_PING):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_SETTINGS):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_PRIORITY):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_RST_STREAM):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_RST_STREAM):
+            (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN,
+            ConnectionInputs.SEND_ALTERNATIVE_SERVICE):
+                (None, ConnectionState.SERVER_OPEN),
+        (ConnectionState.SERVER_OPEN,
+            ConnectionInputs.RECV_ALTERNATIVE_SERVICE):
+                (None, ConnectionState.SERVER_OPEN),
+
+        # State: closed
+        (ConnectionState.CLOSED, ConnectionInputs.SEND_GOAWAY):
+            (None, ConnectionState.CLOSED),
+        (ConnectionState.CLOSED, ConnectionInputs.RECV_GOAWAY):
+            (None, ConnectionState.CLOSED),
+    }
+
+    def __init__(self) -> None:
+        self.state = ConnectionState.IDLE
+
+    def process_input(self, input_: ConnectionInputs) -> list[Event]:
+        """
+        Process a specific input in the state machine.
+        """
+        if not isinstance(input_, ConnectionInputs):
+            msg = "Input must be an instance of ConnectionInputs"
+            raise ValueError(msg)  # noqa: TRY004
+
+        try:
+            func, target_state = self._transitions[(self.state, input_)]
+        except KeyError as e:
+            old_state = self.state
+            self.state = ConnectionState.CLOSED
+            msg = f"Invalid input {input_} in state {old_state}"
+            raise ProtocolError(msg) from e
+        else:
+            self.state = target_state
+            if func is not None:  # pragma: no cover
+                return func()
+
+            return []
+
+
+class H2Connection:
+    """
+    A low-level HTTP/2 connection object. This handles building and receiving
+    frames and maintains both connection and per-stream state for all streams
+    on this connection.
+
+    This wraps a HTTP/2 Connection state machine implementation, ensuring that
+    frames can only be sent/received when the connection is in a valid state.
+    It also builds stream state machines on demand to ensure that the
+    constraints of those state machines are met as well. Attempts to create
+    frames that cannot be sent will raise a ``ProtocolError``.
+
+    .. versionchanged:: 2.3.0
+       Added the ``header_encoding`` keyword argument.
+
+    .. versionchanged:: 2.5.0
+       Added the ``config`` keyword argument. Deprecated the ``client_side``
+       and ``header_encoding`` parameters.
+
+    .. versionchanged:: 3.0.0
+       Removed deprecated parameters and properties.
+
+    :param config: The configuration for the HTTP/2 connection.
+
+        .. versionadded:: 2.5.0
+
+    :type config: :class:`H2Configuration <h2.config.H2Configuration>`
+    """
+
+    # The initial maximum outbound frame size. This can be changed by receiving
+    # a settings frame.
+    DEFAULT_MAX_OUTBOUND_FRAME_SIZE = 65535
+
+    # The initial maximum inbound frame size. This is somewhat arbitrarily
+    # chosen.
+    DEFAULT_MAX_INBOUND_FRAME_SIZE = 2**24
+
+    # The highest acceptable stream ID.
+    HIGHEST_ALLOWED_STREAM_ID = 2**31 - 1
+
+    # The largest acceptable window increment.
+    MAX_WINDOW_INCREMENT = 2**31 - 1
+
+    # The initial default value of SETTINGS_MAX_HEADER_LIST_SIZE.
+    DEFAULT_MAX_HEADER_LIST_SIZE = 2**16
+
+    # Keep in memory limited amount of results for streams closes
+    MAX_CLOSED_STREAMS = 2**16
+
+    def __init__(self, config: H2Configuration | None = None) -> None:
+        self.state_machine = H2ConnectionStateMachine()
+        self.streams: dict[int, H2Stream] = {}
+        self.highest_inbound_stream_id = 0
+        self.highest_outbound_stream_id = 0
+        self.encoder = Encoder()
+        self.decoder = Decoder()
+
+        # This won't always actually do anything: for versions of HPACK older
+        # than 2.3.0 it does nothing. However, we have to try!
+        self.decoder.max_header_list_size = self.DEFAULT_MAX_HEADER_LIST_SIZE
+
+        #: The configuration for this HTTP/2 connection object.
+        #:
+        #: .. versionadded:: 2.5.0
+        self.config = config or H2Configuration(client_side=True)
+
+        # Objects that store settings, including defaults.
+        #
+        # We set the MAX_CONCURRENT_STREAMS value to 100 because its default is
+        # unbounded, and that's a dangerous default because it allows
+        # essentially unbounded resources to be allocated regardless of how
+        # they will be used. 100 should be suitable for the average
+        # application. This default obviously does not apply to the remote
+        # peer's settings: the remote peer controls them!
+        #
+        # We also set MAX_HEADER_LIST_SIZE to a reasonable value. This is to
+        # advertise our defence against CVE-2016-6581. However, not all
+        # versions of HPACK will let us do it. That's ok: we should at least
+        # suggest that we're not vulnerable.
+        self.local_settings = Settings(
+            client=self.config.client_side,
+            initial_values={
+                SettingCodes.MAX_CONCURRENT_STREAMS: 100,
+                SettingCodes.MAX_HEADER_LIST_SIZE:
+                    self.DEFAULT_MAX_HEADER_LIST_SIZE,
+            },
+        )
+        self.remote_settings = Settings(client=not self.config.client_side)
+
+        # The current value of the connection flow control windows on the
+        # connection.
+        self.outbound_flow_control_window = (
+            self.remote_settings.initial_window_size
+        )
+
+        #: The maximum size of a frame that can be emitted by this peer, in
+        #: bytes.
+        self.max_outbound_frame_size = self.remote_settings.max_frame_size
+
+        #: The maximum size of a frame that can be received by this peer, in
+        #: bytes.
+        self.max_inbound_frame_size = self.local_settings.max_frame_size
+
+        # Buffer for incoming data.
+        self.incoming_buffer = FrameBuffer(server=not self.config.client_side)
+
+        # A private variable to store a sequence of received header frames
+        # until completion.
+        self._header_frames: list[Frame] = []
+
+        # Data that needs to be sent.
+        self._data_to_send = bytearray()
+
+        # Keeps track of how streams are closed.
+        # Used to ensure that we don't blow up in the face of frames that were
+        # in flight when a RST_STREAM was sent.
+        # Also used to determine whether we should consider a frame received
+        # while a stream is closed as either a stream error or a connection
+        # error.
+        self._closed_streams: dict[int, StreamClosedBy | None] = SizeLimitDict(
+            size_limit=self.MAX_CLOSED_STREAMS,
+        )
+
+        # The flow control window manager for the connection.
+        self._inbound_flow_control_window_manager = WindowManager(
+            max_window_size=self.local_settings.initial_window_size,
+        )
+
+        # When in doubt use dict-dispatch.
+        self._frame_dispatch_table: dict[type[Frame], Callable] = {  # type: ignore
+            HeadersFrame: self._receive_headers_frame,
+            PushPromiseFrame: self._receive_push_promise_frame,
+            SettingsFrame: self._receive_settings_frame,
+            DataFrame: self._receive_data_frame,
+            WindowUpdateFrame: self._receive_window_update_frame,
+            PingFrame: self._receive_ping_frame,
+            RstStreamFrame: self._receive_rst_stream_frame,
+            PriorityFrame: self._receive_priority_frame,
+            GoAwayFrame: self._receive_goaway_frame,
+            ContinuationFrame: self._receive_naked_continuation,
+            AltSvcFrame: self._receive_alt_svc_frame,
+            ExtensionFrame: self._receive_unknown_frame,
+        }
+
+    def _prepare_for_sending(self, frames: list[Frame]) -> None:
+        if not frames:
+            return
+        self._data_to_send += b"".join(f.serialize() for f in frames)
+        assert all(f.body_len <= self.max_outbound_frame_size for f in frames)
+
+    def _open_streams(self, remainder: int) -> int:
+        """
+        A common method of counting number of open streams. Returns the number
+        of streams that are open *and* that have (stream ID % 2) == remainder.
+        While it iterates, also deletes any closed streams.
+        """
+        count = 0
+        to_delete = []
+
+        for stream_id, stream in self.streams.items():
+            if stream.open and (stream_id % 2 == remainder):
+                count += 1
+            elif stream.closed:
+                to_delete.append(stream_id)
+
+        for stream_id in to_delete:
+            stream = self.streams.pop(stream_id)
+            self._closed_streams[stream_id] = stream.closed_by
+
+        return count
+
+    @property
+    def open_outbound_streams(self) -> int:
+        """
+        The current number of open outbound streams.
+        """
+        outbound_numbers = int(self.config.client_side)
+        return self._open_streams(outbound_numbers)
+
+    @property
+    def open_inbound_streams(self) -> int:
+        """
+        The current number of open inbound streams.
+        """
+        inbound_numbers = int(not self.config.client_side)
+        return self._open_streams(inbound_numbers)
+
+    @property
+    def inbound_flow_control_window(self) -> int:
+        """
+        The size of the inbound flow control window for the connection. This is
+        rarely publicly useful: instead, use :meth:`remote_flow_control_window
+        <h2.connection.H2Connection.remote_flow_control_window>`. This
+        shortcut is largely present to provide a shortcut to this data.
+        """
+        return self._inbound_flow_control_window_manager.current_window_size
+
+    def _begin_new_stream(self, stream_id: int, allowed_ids: AllowedStreamIDs) -> H2Stream:
+        """
+        Initiate a new stream.
+
+        .. versionchanged:: 2.0.0
+           Removed this function from the public API.
+
+        :param stream_id: The ID of the stream to open.
+        :param allowed_ids: What kind of stream ID is allowed.
+        """
+        self.config.logger.debug(
+            "Attempting to initiate stream ID %d", stream_id,
+        )
+        outbound = self._stream_id_is_outbound(stream_id)
+        highest_stream_id = (
+            self.highest_outbound_stream_id if outbound else
+            self.highest_inbound_stream_id
+        )
+
+        if stream_id <= highest_stream_id:
+            raise StreamIDTooLowError(stream_id, highest_stream_id)
+
+        if (stream_id % 2) != int(allowed_ids):
+            msg = "Invalid stream ID for peer."
+            raise ProtocolError(msg)
+
+        s = H2Stream(
+            stream_id,
+            config=self.config,
+            inbound_window_size=self.local_settings.initial_window_size,
+            outbound_window_size=self.remote_settings.initial_window_size,
+        )
+        self.config.logger.debug("Stream ID %d created", stream_id)
+        s.max_outbound_frame_size = self.max_outbound_frame_size
+
+        self.streams[stream_id] = s
+        self.config.logger.debug("Current streams: %s", self.streams.keys())
+
+        if outbound:
+            self.highest_outbound_stream_id = stream_id
+        else:
+            self.highest_inbound_stream_id = stream_id
+
+        return s
+
+    def initiate_connection(self) -> None:
+        """
+        Provides any data that needs to be sent at the start of the connection.
+        Must be called for both clients and servers.
+        """
+        self.config.logger.debug("Initializing connection")
+        self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS)
+        if self.config.client_side:
+            preamble = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
+        else:
+            preamble = b""
+
+        f = SettingsFrame(0)
+        for setting, value in self.local_settings.items():
+            f.settings[setting] = value
+        self.config.logger.debug(
+            "Send Settings frame: %s", self.local_settings,
+        )
+
+        self._data_to_send += preamble + f.serialize()
+
+    def initiate_upgrade_connection(self, settings_header: bytes | None = None) -> bytes | None:
+        """
+        Call to initialise the connection object for use with an upgraded
+        HTTP/2 connection (i.e. a connection negotiated using the
+        ``Upgrade: h2c`` HTTP header).
+
+        This method differs from :meth:`initiate_connection
+        <h2.connection.H2Connection.initiate_connection>` in several ways.
+        Firstly, it handles the additional SETTINGS frame that is sent in the
+        ``HTTP2-Settings`` header field. When called on a client connection,
+        this method will return a bytestring that the caller can put in the
+        ``HTTP2-Settings`` field they send on their initial request. When
+        called on a server connection, the user **must** provide the value they
+        received from the client in the ``HTTP2-Settings`` header field to the
+        ``settings_header`` argument, which will be used appropriately.
+
+        Additionally, this method sets up stream 1 in a half-closed state
+        appropriate for this side of the connection, to reflect the fact that
+        the request is already complete.
+
+        Finally, this method also prepares the appropriate preamble to be sent
+        after the upgrade.
+
+        .. versionadded:: 2.3.0
+
+        :param settings_header: (optional, server-only): The value of the
+             ``HTTP2-Settings`` header field received from the client.
+        :type settings_header: ``bytes``
+
+        :returns: For clients, a bytestring to put in the ``HTTP2-Settings``.
+            For servers, returns nothing.
+        :rtype: ``bytes`` or ``None``
+        """
+        self.config.logger.debug(
+            "Upgrade connection. Current settings: %s", self.local_settings,
+        )
+
+        frame_data = None
+        # Begin by getting the preamble in place.
+        self.initiate_connection()
+
+        if self.config.client_side:
+            f = SettingsFrame(0)
+            for setting, value in self.local_settings.items():
+                f.settings[setting] = value
+
+            frame_data = f.serialize_body()
+            frame_data = base64.urlsafe_b64encode(frame_data)
+        elif settings_header:
+            # We have a settings header from the client. This needs to be
+            # applied, but we want to throw away the ACK. We do this by
+            # inserting the data into a Settings frame and then passing it to
+            # the state machine, but ignoring the return value.
+            settings_header = base64.urlsafe_b64decode(settings_header)
+            f = SettingsFrame(0)
+            f.parse_body(memoryview(settings_header))
+            self._receive_settings_frame(f)
+
+        # Set up appropriate state. Stream 1 in a half-closed state:
+        # half-closed(local) for clients, half-closed(remote) for servers.
+        # Additionally, we need to set up the Connection state machine.
+        connection_input = (
+            ConnectionInputs.SEND_HEADERS if self.config.client_side
+            else ConnectionInputs.RECV_HEADERS
+        )
+        self.config.logger.debug("Process input %s", connection_input)
+        self.state_machine.process_input(connection_input)
+
+        # Set up stream 1.
+        self._begin_new_stream(stream_id=1, allowed_ids=AllowedStreamIDs.ODD)
+        self.streams[1].upgrade(self.config.client_side)
+        return frame_data
+
+    def _get_or_create_stream(self, stream_id: int, allowed_ids: AllowedStreamIDs) -> H2Stream:
+        """
+        Gets a stream by its stream ID. Will create one if one does not already
+        exist. Use allowed_ids to circumvent the usual stream ID rules for
+        clients and servers.
+
+        .. versionchanged:: 2.0.0
+           Removed this function from the public API.
+        """
+        try:
+            return self.streams[stream_id]
+        except KeyError:
+            return self._begin_new_stream(stream_id, allowed_ids)
+
+    def _get_stream_by_id(self, stream_id: int | None) -> H2Stream:
+        """
+        Gets a stream by its stream ID. Raises NoSuchStreamError if the stream
+        ID does not correspond to a known stream and is higher than the current
+        maximum: raises if it is lower than the current maximum.
+
+        .. versionchanged:: 2.0.0
+           Removed this function from the public API.
+        """
+        if not stream_id:
+            raise NoSuchStreamError(-1)  # pragma: no cover
+        try:
+            return self.streams[stream_id]
+        except KeyError as e:
+            outbound = self._stream_id_is_outbound(stream_id)
+            highest_stream_id = (
+                self.highest_outbound_stream_id if outbound else
+                self.highest_inbound_stream_id
+            )
+
+            if stream_id > highest_stream_id:
+                raise NoSuchStreamError(stream_id) from e
+            raise StreamClosedError(stream_id) from e
+
+    def get_next_available_stream_id(self) -> int:
+        """
+        Returns an integer suitable for use as the stream ID for the next
+        stream created by this endpoint. For server endpoints, this stream ID
+        will be even. For client endpoints, this stream ID will be odd. If no
+        stream IDs are available, raises :class:`NoAvailableStreamIDError
+        <h2.exceptions.NoAvailableStreamIDError>`.
+
+        .. warning:: The return value from this function does not change until
+                     the stream ID has actually been used by sending or pushing
+                     headers on that stream. For that reason, it should be
+                     called as close as possible to the actual use of the
+                     stream ID.
+
+        .. versionadded:: 2.0.0
+
+        :raises: :class:`NoAvailableStreamIDError
+            <h2.exceptions.NoAvailableStreamIDError>`
+        :returns: The next free stream ID this peer can use to initiate a
+            stream.
+        :rtype: ``int``
+        """
+        # No streams have been opened yet, so return the lowest allowed stream
+        # ID.
+        if not self.highest_outbound_stream_id:
+            next_stream_id = 1 if self.config.client_side else 2
+        else:
+            next_stream_id = self.highest_outbound_stream_id + 2
+        self.config.logger.debug(
+            "Next available stream ID %d", next_stream_id,
+        )
+        if next_stream_id > self.HIGHEST_ALLOWED_STREAM_ID:
+            msg = "Exhausted allowed stream IDs"
+            raise NoAvailableStreamIDError(msg)
+
+        return next_stream_id
+
+    def send_headers(self,
+                     stream_id: int,
+                     headers: Iterable[HeaderWeaklyTyped],
+                     end_stream: bool = False,
+                     priority_weight: int | None = None,
+                     priority_depends_on: int | None = None,
+                     priority_exclusive: bool | None = None) -> None:
+        """
+        Send headers on a given stream.
+
+        This function can be used to send request or response headers: the kind
+        that are sent depends on whether this connection has been opened as a
+        client or server connection, and whether the stream was opened by the
+        remote peer or not.
+
+        If this is a client connection, calling ``send_headers`` will send the
+        headers as a request. It will also implicitly open the stream being
+        used. If this is a client connection and ``send_headers`` has *already*
+        been called, this will send trailers instead.
+
+        If this is a server connection, calling ``send_headers`` will send the
+        headers as a response. It is a protocol error for a server to open a
+        stream by sending headers. If this is a server connection and
+        ``send_headers`` has *already* been called, this will send trailers
+        instead.
+
+        When acting as a server, you may call ``send_headers`` any number of
+        times allowed by the following rules, in this order:
+
+        - zero or more times with ``(':status', '1XX')`` (where ``1XX`` is a
+          placeholder for any 100-level status code).
+        - once with any other status header.
+        - zero or one time for trailers.
+
+        That is, you are allowed to send as many informational responses as you
+        like, followed by one complete response and zero or one HTTP trailer
+        blocks.
+
+        Clients may send one or two header blocks: one request block, and
+        optionally one trailer block.
+
+        If it is important to send HPACK "never indexed" header fields (as
+        defined in `RFC 7451 Section 7.1.3
+        <https://tools.ietf.org/html/rfc7541#section-7.1.3>`_), the user may
+        instead provide headers using the HPACK library's :class:`HeaderTuple
+        <hpack:hpack.HeaderTuple>` and :class:`NeverIndexedHeaderTuple
+        <hpack:hpack.NeverIndexedHeaderTuple>` objects.
+
+        This method also allows users to prioritize the stream immediately,
+        by sending priority information on the HEADERS frame directly. To do
+        this, any one of ``priority_weight``, ``priority_depends_on``, or
+        ``priority_exclusive`` must be set to a value that is not ``None``. For
+        more information on the priority fields, see :meth:`prioritize
+        <h2.connection.H2Connection.prioritize>`.
+
+        .. warning:: In HTTP/2, it is mandatory that all the HTTP/2 special
+            headers (that is, ones whose header keys begin with ``:``) appear
+            at the start of the header block, before any normal headers.
+
+        .. versionchanged:: 2.3.0
+           Added support for using :class:`HeaderTuple
+           <hpack:hpack.HeaderTuple>` objects to store headers.
+
+        .. versionchanged:: 2.4.0
+           Added the ability to provide priority keyword arguments:
+           ``priority_weight``, ``priority_depends_on``, and
+           ``priority_exclusive``.
+
+        :param stream_id: The stream ID to send the headers on. If this stream
+            does not currently exist, it will be created.
+        :type stream_id: ``int``
+
+        :param headers: The request/response headers to send.
+        :type headers: An iterable of two tuples of bytestrings or
+            :class:`HeaderTuple <hpack:hpack.HeaderTuple>` objects.
+
+        :param end_stream: Whether this headers frame should end the stream
+            immediately (that is, whether no more data will be sent after this
+            frame). Defaults to ``False``.
+        :type end_stream: ``bool``
+
+        :param priority_weight: Sets the priority weight of the stream. See
+            :meth:`prioritize <h2.connection.H2Connection.prioritize>` for more
+            about how this field works. Defaults to ``None``, which means that
+            no priority information will be sent.
+        :type priority_weight: ``int`` or ``None``
+
+        :param priority_depends_on: Sets which stream this one depends on for
+            priority purposes. See :meth:`prioritize
+            <h2.connection.H2Connection.prioritize>` for more about how this
+            field works. Defaults to ``None``, which means that no priority
+            information will be sent.
+        :type priority_depends_on: ``int`` or ``None``
+
+        :param priority_exclusive: Sets whether this stream exclusively depends
+            on the stream given in ``priority_depends_on`` for priority
+            purposes. See :meth:`prioritize
+            <h2.connection.H2Connection.prioritize>` for more about how this
+            field workds. Defaults to ``None``, which means that no priority
+            information will be sent.
+        :type priority_depends_on: ``bool`` or ``None``
+
+        :returns: Nothing
+        """
+        self.config.logger.debug(
+            "Send headers on stream ID %d", stream_id,
+        )
+
+        # Check we can open the stream.
+        if stream_id not in self.streams:
+            max_open_streams = self.remote_settings.max_concurrent_streams
+            if (self.open_outbound_streams + 1) > max_open_streams:
+                msg = f"Max outbound streams is {max_open_streams}, {self.open_outbound_streams} open"
+                raise TooManyStreamsError(msg)
+
+        self.state_machine.process_input(ConnectionInputs.SEND_HEADERS)
+        stream = self._get_or_create_stream(
+            stream_id, AllowedStreamIDs(self.config.client_side),
+        )
+
+        frames: list[Frame] = []
+        frames.extend(stream.send_headers(
+            headers, self.encoder, end_stream,
+        ))
+
+        # We may need to send priority information.
+        priority_present = (
+            (priority_weight is not None) or
+            (priority_depends_on is not None) or
+            (priority_exclusive is not None)
+        )
+
+        if priority_present:
+            if not self.config.client_side:
+                msg = "Servers SHOULD NOT prioritize streams."
+                raise RFC1122Error(msg)
+
+            headers_frame = frames[0]
+            assert isinstance(headers_frame, HeadersFrame)
+
+            headers_frame.flags.add("PRIORITY")
+            frames[0] = _add_frame_priority(
+                headers_frame,
+                priority_weight,
+                priority_depends_on,
+                priority_exclusive,
+            )
+
+        self._prepare_for_sending(frames)
+
+    def send_data(self,
+                  stream_id: int,
+                  data: bytes | memoryview,
+                  end_stream: bool = False,
+                  pad_length: Any = None) -> None:
+        """
+        Send data on a given stream.
+
+        This method does no breaking up of data: if the data is larger than the
+        value returned by :meth:`local_flow_control_window
+        <h2.connection.H2Connection.local_flow_control_window>` for this stream
+        then a :class:`FlowControlError <h2.exceptions.FlowControlError>` will
+        be raised. If the data is larger than :data:`max_outbound_frame_size
+        <h2.connection.H2Connection.max_outbound_frame_size>` then a
+        :class:`FrameTooLargeError <h2.exceptions.FrameTooLargeError>` will be
+        raised.
+
+        h2 does this to avoid buffering the data internally. If the user
+        has more data to send than h2 will allow, consider breaking it up
+        and buffering it externally.
+
+        :param stream_id: The ID of the stream on which to send the data.
+        :type stream_id: ``int``
+        :param data: The data to send on the stream.
+        :type data: ``bytes``
+        :param end_stream: (optional) Whether this is the last data to be sent
+            on the stream. Defaults to ``False``.
+        :type end_stream: ``bool``
+        :param pad_length: (optional) Length of the padding to apply to the
+            data frame. Defaults to ``None`` for no use of padding. Note that
+            a value of ``0`` results in padding of length ``0``
+            (with the "padding" flag set on the frame).
+
+            .. versionadded:: 2.6.0
+
+        :type pad_length: ``int``
+        :returns: Nothing
+        """
+        self.config.logger.debug(
+            "Send data on stream ID %d with len %d", stream_id, len(data),
+        )
+        frame_size = len(data)
+        if pad_length is not None:
+            if not isinstance(pad_length, int):
+                msg = "pad_length must be an int"
+                raise TypeError(msg)
+            if pad_length < 0 or pad_length > 255:
+                msg = "pad_length must be within range: [0, 255]"
+                raise ValueError(msg)
+            # Account for padding bytes plus the 1-byte padding length field.
+            frame_size += pad_length + 1
+        self.config.logger.debug(
+            "Frame size on stream ID %d is %d", stream_id, frame_size,
+        )
+
+        if frame_size > self.local_flow_control_window(stream_id):
+            msg = f"Cannot send {frame_size} bytes, flow control window is {self.local_flow_control_window(stream_id)}"
+            raise FlowControlError(msg)
+        if frame_size > self.max_outbound_frame_size:
+            msg = f"Cannot send frame size {frame_size}, max frame size is {self.max_outbound_frame_size}"
+            raise FrameTooLargeError(msg)
+
+        self.state_machine.process_input(ConnectionInputs.SEND_DATA)
+        frames = self.streams[stream_id].send_data(
+            data, end_stream, pad_length=pad_length,
+        )
+
+        self._prepare_for_sending(frames)
+
+        self.outbound_flow_control_window -= frame_size
+        self.config.logger.debug(
+            "Outbound flow control window size is %d",
+            self.outbound_flow_control_window,
+        )
+        assert self.outbound_flow_control_window >= 0
+
+    def end_stream(self, stream_id: int) -> None:
+        """
+        Cleanly end a given stream.
+
+        This method ends a stream by sending an empty DATA frame on that stream
+        with the ``END_STREAM`` flag set.
+
+        :param stream_id: The ID of the stream to end.
+        :type stream_id: ``int``
+        :returns: Nothing
+        """
+        self.config.logger.debug("End stream ID %d", stream_id)
+        self.state_machine.process_input(ConnectionInputs.SEND_DATA)
+        frames = self.streams[stream_id].end_stream()
+        self._prepare_for_sending(frames)
+
+    def increment_flow_control_window(self, increment: int, stream_id: int | None = None) -> None:
+        """
+        Increment a flow control window, optionally for a single stream. Allows
+        the remote peer to send more data.
+
+        .. versionchanged:: 2.0.0
+           Rejects attempts to increment the flow control window by out of
+           range values with a ``ValueError``.
+
+        :param increment: The amount to increment the flow control window by.
+        :type increment: ``int``
+        :param stream_id: (optional) The ID of the stream that should have its
+            flow control window opened. If not present or ``None``, the
+            connection flow control window will be opened instead.
+        :type stream_id: ``int`` or ``None``
+        :returns: Nothing
+        :raises: ``ValueError``
+        """
+        if not (1 <= increment <= self.MAX_WINDOW_INCREMENT):
+            msg = f"Flow control increment must be between 1 and {self.MAX_WINDOW_INCREMENT}"
+            raise ValueError(msg)
+
+        self.state_machine.process_input(ConnectionInputs.SEND_WINDOW_UPDATE)
+
+        if stream_id is not None:
+            stream = self.streams[stream_id]
+            frames = stream.increase_flow_control_window(
+                increment,
+            )
+
+            self.config.logger.debug(
+                "Increase stream ID %d flow control window by %d",
+                stream_id, increment,
+            )
+        else:
+            self._inbound_flow_control_window_manager.window_opened(increment)
+            f = WindowUpdateFrame(0)
+            f.window_increment = increment
+            frames = [f]
+
+            self.config.logger.debug(
+                "Increase connection flow control window by %d", increment,
+            )
+
+        self._prepare_for_sending(frames)
+
+    def push_stream(self,
+                    stream_id: int,
+                    promised_stream_id: int,
+                    request_headers: Iterable[HeaderWeaklyTyped]) -> None:
+        """
+        Push a response to the client by sending a PUSH_PROMISE frame.
+
+        If it is important to send HPACK "never indexed" header fields (as
+        defined in `RFC 7451 Section 7.1.3
+        <https://tools.ietf.org/html/rfc7541#section-7.1.3>`_), the user may
+        instead provide headers using the HPACK library's :class:`HeaderTuple
+        <hpack:hpack.HeaderTuple>` and :class:`NeverIndexedHeaderTuple
+        <hpack:hpack.NeverIndexedHeaderTuple>` objects.
+
+        :param stream_id: The ID of the stream that this push is a response to.
+        :type stream_id: ``int``
+        :param promised_stream_id: The ID of the stream that the pushed
+            response will be sent on.
+        :type promised_stream_id: ``int``
+        :param request_headers: The headers of the request that the pushed
+            response will be responding to.
+        :type request_headers: An iterable of two tuples of bytestrings or
+            :class:`HeaderTuple <hpack:hpack.HeaderTuple>` objects.
+        :returns: Nothing
+        """
+        self.config.logger.debug(
+            "Send Push Promise frame on stream ID %d", stream_id,
+        )
+
+        if not self.remote_settings.enable_push:
+            msg = "Remote peer has disabled stream push"
+            raise ProtocolError(msg)
+
+        self.state_machine.process_input(ConnectionInputs.SEND_PUSH_PROMISE)
+        stream = self._get_stream_by_id(stream_id)
+
+        # We need to prevent users pushing streams in response to streams that
+        # they themselves have already pushed: see #163 and RFC 7540 § 6.6. The
+        # easiest way to do that is to assert that the stream_id is not even:
+        # this shortcut works because only servers can push and the state
+        # machine will enforce this.
+        if (stream_id % 2) == 0:
+            msg = "Cannot recursively push streams."
+            raise ProtocolError(msg)
+
+        new_stream = self._begin_new_stream(
+            promised_stream_id, AllowedStreamIDs.EVEN,
+        )
+        self.streams[promised_stream_id] = new_stream
+
+        frames = stream.push_stream_in_band(
+            promised_stream_id, request_headers, self.encoder,
+        )
+        new_frames = new_stream.locally_pushed()
+        self._prepare_for_sending(frames + new_frames)
+
+    def ping(self, opaque_data: bytes | str) -> None:
+        """
+        Send a PING frame.
+
+        :param opaque_data: A bytestring of length 8 that will be sent in the
+                            PING frame.
+        :returns: Nothing
+        """
+        self.config.logger.debug("Send Ping frame")
+
+        if not isinstance(opaque_data, bytes) or len(opaque_data) != 8:
+            msg = f"Invalid value for ping data: {opaque_data!r}"
+            raise ValueError(msg)
+
+        self.state_machine.process_input(ConnectionInputs.SEND_PING)
+        f = PingFrame(0)
+        f.opaque_data = opaque_data
+        self._prepare_for_sending([f])
+
+    def reset_stream(self, stream_id: int, error_code: ErrorCodes | int = 0) -> None:
+        """
+        Reset a stream.
+
+        This method forcibly closes a stream by sending a RST_STREAM frame for
+        a given stream. This is not a graceful closure. To gracefully end a
+        stream, try the :meth:`end_stream
+        <h2.connection.H2Connection.end_stream>` method.
+
+        :param stream_id: The ID of the stream to reset.
+        :type stream_id: ``int``
+        :param error_code: (optional) The error code to use to reset the
+            stream. Defaults to :data:`ErrorCodes.NO_ERROR
+            <h2.errors.ErrorCodes.NO_ERROR>`.
+        :type error_code: ``int``
+        :returns: Nothing
+        """
+        self.config.logger.debug("Reset stream ID %d", stream_id)
+        self.state_machine.process_input(ConnectionInputs.SEND_RST_STREAM)
+        stream = self._get_stream_by_id(stream_id)
+        frames = stream.reset_stream(error_code)
+        self._prepare_for_sending(frames)
+
+    def close_connection(self,
+                         error_code: ErrorCodes | int = 0,
+                         additional_data: bytes | None = None,
+                         last_stream_id: int | None = None) -> None:
+        """
+        Close a connection, emitting a GOAWAY frame.
+
+        .. versionchanged:: 2.4.0
+           Added ``additional_data`` and ``last_stream_id`` arguments.
+
+        :param error_code: (optional) The error code to send in the GOAWAY
+            frame.
+        :param additional_data: (optional) Additional debug data indicating
+            a reason for closing the connection. Must be a bytestring.
+        :param last_stream_id: (optional) The last stream which was processed
+            by the sender. Defaults to ``highest_inbound_stream_id``.
+        :returns: Nothing
+        """
+        self.config.logger.debug("Close connection")
+        self.state_machine.process_input(ConnectionInputs.SEND_GOAWAY)
+
+        # Additional_data must be bytes
+        if additional_data is not None:
+            assert isinstance(additional_data, bytes)
+
+        if last_stream_id is None:
+            last_stream_id = self.highest_inbound_stream_id
+
+        f = GoAwayFrame(
+            stream_id=0,
+            last_stream_id=last_stream_id,
+            error_code=error_code,
+            additional_data=(additional_data or b""),
+        )
+        self._prepare_for_sending([f])
+
+    def update_settings(self, new_settings: dict[SettingCodes | int, int]) -> None:
+        """
+        Update the local settings. This will prepare and emit the appropriate
+        SETTINGS frame.
+
+        :param new_settings: A dictionary of {setting: new value}
+        """
+        self.config.logger.debug(
+            "Update connection settings to %s", new_settings,
+        )
+        self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS)
+        self.local_settings.update(new_settings)
+        s = SettingsFrame(0)
+        s.settings = new_settings
+        self._prepare_for_sending([s])
+
+    def advertise_alternative_service(self,
+                                      field_value: bytes | str,
+                                      origin: bytes | None = None,
+                                      stream_id: int | None = None) -> None:
+        """
+        Notify a client about an available Alternative Service.
+
+        An Alternative Service is defined in `RFC 7838
+        <https://tools.ietf.org/html/rfc7838>`_. An Alternative Service
+        notification informs a client that a given origin is also available
+        elsewhere.
+
+        Alternative Services can be advertised in two ways. Firstly, they can
+        be advertised explicitly: that is, a server can say "origin X is also
+        available at Y". To advertise like this, set the ``origin`` argument
+        and not the ``stream_id`` argument. Alternatively, they can be
+        advertised implicitly: that is, a server can say "the origin you're
+        contacting on stream X is also available at Y". To advertise like this,
+        set the ``stream_id`` argument and not the ``origin`` argument.
+
+        The explicit method of advertising can be done as long as the
+        connection is active. The implicit method can only be done after the
+        client has sent the request headers and before the server has sent the
+        response headers: outside of those points, h2 will forbid sending
+        the Alternative Service advertisement by raising a ProtocolError.
+
+        The ``field_value`` parameter is specified in RFC 7838. h2 does
+        not validate or introspect this argument: the user is required to
+        ensure that it's well-formed. ``field_value`` corresponds to RFC 7838's
+        "Alternative Service Field Value".
+
+        .. note:: It is strongly preferred to use the explicit method of
+                  advertising Alternative Services. The implicit method of
+                  advertising Alternative Services has a number of subtleties
+                  and can lead to inconsistencies between the server and
+                  client. h2 allows both mechanisms, but caution is
+                  strongly advised.
+
+        .. versionadded:: 2.3.0
+
+        :param field_value: The RFC 7838 Alternative Service Field Value. This
+            argument is not introspected by h2: the user is responsible
+            for ensuring that it is well-formed.
+        :type field_value: ``bytes``
+
+        :param origin: The origin/authority to which the Alternative Service
+            being advertised applies. Must not be provided at the same time as
+            ``stream_id``.
+        :type origin: ``bytes`` or ``None``
+
+        :param stream_id: The ID of the stream which was sent to the authority
+            for which this Alternative Service advertisement applies. Must not
+            be provided at the same time as ``origin``.
+        :type stream_id: ``int`` or ``None``
+
+        :returns: Nothing.
+        """
+        if not isinstance(field_value, bytes):
+            msg = "Field must be bytestring."
+            raise ValueError(msg)  # noqa: TRY004
+
+        if origin is not None and stream_id is not None:
+            msg = "Must not provide both origin and stream_id"
+            raise ValueError(msg)
+
+        self.state_machine.process_input(
+            ConnectionInputs.SEND_ALTERNATIVE_SERVICE,
+        )
+
+        if origin is not None:
+            # This ALTSVC is sent on stream zero.
+            f = AltSvcFrame(stream_id=0)
+            f.origin = origin
+            f.field = field_value
+            frames: list[Frame] = [f]
+        else:
+            stream = self._get_stream_by_id(stream_id)
+            frames = stream.advertise_alternative_service(field_value)
+
+        self._prepare_for_sending(frames)
+
+    def prioritize(self,
+                   stream_id: int,
+                   weight: int | None = None,
+                   depends_on: int | None = None,
+                   exclusive: bool | None = None) -> None:
+        """
+        Notify a server about the priority of a stream.
+
+        Stream priorities are a form of guidance to a remote server: they
+        inform the server about how important a given response is, so that the
+        server may allocate its resources (e.g. bandwidth, CPU time, etc.)
+        accordingly. This exists to allow clients to ensure that the most
+        important data arrives earlier, while less important data does not
+        starve out the more important data.
+
+        Stream priorities are explained in depth in `RFC 7540 Section 5.3
+        <https://tools.ietf.org/html/rfc7540#section-5.3>`_.
+
+        This method updates the priority information of a single stream. It may
+        be called well before a stream is actively in use, or well after a
+        stream is closed.
+
+        .. warning:: RFC 7540 allows for servers to change the priority of
+                     streams. However, h2 **does not** allow server
+                     stacks to do this. This is because most clients do not
+                     adequately know how to respond when provided conflicting
+                     priority information, and relatively little utility is
+                     provided by making that functionality available.
+
+        .. note:: h2 **does not** maintain any information about the
+                  RFC 7540 priority tree. That means that h2 does not
+                  prevent incautious users from creating invalid priority
+                  trees, particularly by creating priority loops. While some
+                  basic error checking is provided by h2, users are
+                  strongly recommended to understand their prioritisation
+                  strategies before using the priority tools here.
+
+        .. note:: Priority information is strictly advisory. Servers are
+                  allowed to disregard it entirely. Avoid relying on the idea
+                  that your priority signaling will definitely be obeyed.
+
+        .. versionadded:: 2.4.0
+
+        :param stream_id: The ID of the stream to prioritize.
+        :type stream_id: ``int``
+
+        :param weight: The weight to give the stream. Defaults to ``16``, the
+             default weight of any stream. May be any value between ``1`` and
+             ``256`` inclusive. The relative weight of a stream indicates what
+             proportion of available resources will be allocated to that
+             stream.
+        :type weight: ``int``
+
+        :param depends_on: The ID of the stream on which this stream depends.
+             This stream will only be progressed if it is impossible to
+             progress the parent stream (the one on which this one depends).
+             Passing the value ``0`` means that this stream does not depend on
+             any other. Defaults to ``0``.
+        :type depends_on: ``int``
+
+        :param exclusive: Whether this stream is an exclusive dependency of its
+            "parent" stream (i.e. the stream given by ``depends_on``). If a
+            stream is an exclusive dependency of another, that means that all
+            previously-set children of the parent are moved to become children
+            of the new exclusively-dependent stream. Defaults to ``False``.
+        :type exclusive: ``bool``
+        """
+        if not self.config.client_side:
+            msg = "Servers SHOULD NOT prioritize streams."
+            raise RFC1122Error(msg)
+
+        self.state_machine.process_input(
+            ConnectionInputs.SEND_PRIORITY,
+        )
+
+        frame = PriorityFrame(stream_id)
+        frame_prio = _add_frame_priority(frame, weight, depends_on, exclusive)
+
+        self._prepare_for_sending([frame_prio])
+
+    def local_flow_control_window(self, stream_id: int) -> int:
+        """
+        Returns the maximum amount of data that can be sent on stream
+        ``stream_id``.
+
+        This value will never be larger than the total data that can be sent on
+        the connection: even if the given stream allows more data, the
+        connection window provides a logical maximum to the amount of data that
+        can be sent.
+
+        The maximum data that can be sent in a single data frame on a stream
+        is either this value, or the maximum frame size, whichever is
+        *smaller*.
+
+        :param stream_id: The ID of the stream whose flow control window is
+            being queried.
+        :type stream_id: ``int``
+        :returns: The amount of data in bytes that can be sent on the stream
+            before the flow control window is exhausted.
+        :rtype: ``int``
+        """
+        stream = self._get_stream_by_id(stream_id)
+        return min(
+            self.outbound_flow_control_window,
+            stream.outbound_flow_control_window,
+        )
+
+    def remote_flow_control_window(self, stream_id: int) -> int:
+        """
+        Returns the maximum amount of data the remote peer can send on stream
+        ``stream_id``.
+
+        This value will never be larger than the total data that can be sent on
+        the connection: even if the given stream allows more data, the
+        connection window provides a logical maximum to the amount of data that
+        can be sent.
+
+        The maximum data that can be sent in a single data frame on a stream
+        is either this value, or the maximum frame size, whichever is
+        *smaller*.
+
+        :param stream_id: The ID of the stream whose flow control window is
+            being queried.
+        :type stream_id: ``int``
+        :returns: The amount of data in bytes that can be received on the
+            stream before the flow control window is exhausted.
+        :rtype: ``int``
+        """
+        stream = self._get_stream_by_id(stream_id)
+        return min(
+            self.inbound_flow_control_window,
+            stream.inbound_flow_control_window,
+        )
+
+    def acknowledge_received_data(self, acknowledged_size: int, stream_id: int) -> None:
+        """
+        Inform the :class:`H2Connection <h2.connection.H2Connection>` that a
+        certain number of flow-controlled bytes have been processed, and that
+        the space should be handed back to the remote peer at an opportune
+        time.
+
+        .. versionadded:: 2.5.0
+
+        :param acknowledged_size: The total *flow-controlled size* of the data
+            that has been processed. Note that this must include the amount of
+            padding that was sent with that data.
+        :type acknowledged_size: ``int``
+        :param stream_id: The ID of the stream on which this data was received.
+        :type stream_id: ``int``
+        :returns: Nothing
+        :rtype: ``None``
+        """
+        self.config.logger.debug(
+            "Ack received data on stream ID %d with size %d",
+            stream_id, acknowledged_size,
+        )
+        if stream_id <= 0:
+            msg = f"Stream ID {stream_id} is not valid for acknowledge_received_data"
+            raise ValueError(msg)
+        if acknowledged_size < 0:
+            msg = "Cannot acknowledge negative data"
+            raise ValueError(msg)
+
+        frames: list[Frame] = []
+
+        conn_manager = self._inbound_flow_control_window_manager
+        conn_increment = conn_manager.process_bytes(acknowledged_size)
+        if conn_increment:
+            f = WindowUpdateFrame(0)
+            f.window_increment = conn_increment
+            frames.append(f)
+
+        try:
+            stream = self._get_stream_by_id(stream_id)
+        except StreamClosedError:
+            # The stream is already gone. We're not worried about incrementing
+            # the window in this case.
+            pass
+        else:
+            # No point incrementing the windows of closed streams.
+            if stream.open:
+                frames.extend(
+                    stream.acknowledge_received_data(acknowledged_size),
+                )
+
+        self._prepare_for_sending(frames)
+
+    def data_to_send(self, amount: int | None = None) -> bytes:
+        """
+        Returns some data for sending out of the internal data buffer.
+
+        This method is analogous to ``read`` on a file-like object, but it
+        doesn't block. Instead, it returns as much data as the user asks for,
+        or less if that much data is not available. It does not perform any
+        I/O, and so uses a different name.
+
+        :param amount: (optional) The maximum amount of data to return. If not
+            set, or set to ``None``, will return as much data as possible.
+        :type amount: ``int``
+        :returns: A bytestring containing the data to send on the wire.
+        :rtype: ``bytes``
+        """
+        if amount is None:
+            data = bytes(self._data_to_send)
+            self._data_to_send = bytearray()
+            return data
+        data = bytes(self._data_to_send[:amount])
+        self._data_to_send = self._data_to_send[amount:]
+        return data
+
+    def clear_outbound_data_buffer(self) -> None:
+        """
+        Clears the outbound data buffer, such that if this call was immediately
+        followed by a call to
+        :meth:`data_to_send <h2.connection.H2Connection.data_to_send>`, that
+        call would return no data.
+
+        This method should not normally be used, but is made available to avoid
+        exposing implementation details.
+        """
+        self._data_to_send = bytearray()
+
+    def _acknowledge_settings(self) -> list[Frame]:
+        """
+        Acknowledge settings that have been received.
+
+        .. versionchanged:: 2.0.0
+           Removed from public API, removed useless ``event`` parameter, made
+           automatic.
+
+        :returns: Nothing
+        """
+        self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS)
+
+        changes = self.remote_settings.acknowledge()
+
+        if SettingCodes.INITIAL_WINDOW_SIZE in changes:
+            setting = changes[SettingCodes.INITIAL_WINDOW_SIZE]
+            self._flow_control_change_from_settings(
+                setting.original_value,
+                setting.new_value,
+            )
+
+        # HEADER_TABLE_SIZE changes by the remote part affect our encoder: cf.
+        # RFC 7540 Section 6.5.2.
+        if SettingCodes.HEADER_TABLE_SIZE in changes:
+            setting = changes[SettingCodes.HEADER_TABLE_SIZE]
+            self.encoder.header_table_size = setting.new_value
+
+        if SettingCodes.MAX_FRAME_SIZE in changes:
+            setting = changes[SettingCodes.MAX_FRAME_SIZE]
+            self.max_outbound_frame_size = setting.new_value
+            for stream in self.streams.values():
+                stream.max_outbound_frame_size = setting.new_value
+
+        f = SettingsFrame(0)
+        f.flags.add("ACK")
+        return [f]
+
+    def _flow_control_change_from_settings(self, old_value: int | None, new_value: int) -> None:
+        """
+        Update flow control windows in response to a change in the value of
+        SETTINGS_INITIAL_WINDOW_SIZE.
+
+        When this setting is changed, it automatically updates all flow control
+        windows by the delta in the settings values. Note that it does not
+        increment the *connection* flow control window, per section 6.9.2 of
+        RFC 7540.
+        """
+        delta = new_value - (old_value or 0)
+
+        for stream in self.streams.values():
+            stream.outbound_flow_control_window = guard_increment_window(
+                stream.outbound_flow_control_window,
+                delta,
+            )
+
+    def _inbound_flow_control_change_from_settings(self, old_value: int | None, new_value: int) -> None:
+        """
+        Update remote flow control windows in response to a change in the value
+        of SETTINGS_INITIAL_WINDOW_SIZE.
+
+        When this setting is changed, it automatically updates all remote flow
+        control windows by the delta in the settings values.
+        """
+        delta = new_value - (old_value or 0)
+
+        for stream in self.streams.values():
+            stream._inbound_flow_control_change_from_settings(delta)
+
+    def receive_data(self, data: bytes) -> list[Event]:
+        """
+        Pass some received HTTP/2 data to the connection for handling.
+
+        :param data: The data received from the remote peer on the network.
+        :type data: ``bytes``
+        :returns: A list of events that the remote peer triggered by sending
+            this data.
+        """
+        self.config.logger.trace(
+            "Process received data on connection. Received data: %r", data,
+        )
+
+        events: list[Event] = []
+        self.incoming_buffer.add_data(data)
+        self.incoming_buffer.max_frame_size = self.max_inbound_frame_size
+
+        try:
+            for frame in self.incoming_buffer:
+                events.extend(self._receive_frame(frame))
+        except InvalidPaddingError as e:
+            self._terminate_connection(ErrorCodes.PROTOCOL_ERROR)
+            msg = "Received frame with invalid padding."
+            raise ProtocolError(msg) from e
+        except ProtocolError as e:
+            # For whatever reason, receiving the frame caused a protocol error.
+            # We should prepare to emit a GoAway frame before throwing the
+            # exception up further. No need for an event: the exception will
+            # do fine.
+            self._terminate_connection(e.error_code)
+            raise
+
+        return events
+
+    def _receive_frame(self, frame: Frame) -> list[Event]:
+        """
+        Handle a frame received on the connection.
+
+        .. versionchanged:: 2.0.0
+           Removed from the public API.
+        """
+        events: list[Event]
+        self.config.logger.trace("Received frame: %s", repr(frame))
+        try:
+            # I don't love using __class__ here, maybe reconsider it.
+            frames, events = self._frame_dispatch_table[frame.__class__](frame)
+        except StreamClosedError as e:
+            # If the stream was closed by RST_STREAM, we just send a RST_STREAM
+            # to the remote peer. Otherwise, this is a connection error, and so
+            # we will re-raise to trigger one.
+            if self._stream_is_closed_by_reset(e.stream_id):
+                f = RstStreamFrame(e.stream_id)
+                f.error_code = e.error_code
+                self._prepare_for_sending([f])
+                events = e._events
+            else:
+                raise
+        except StreamIDTooLowError as e:
+            # The stream ID seems invalid. This may happen when the closed
+            # stream has been cleaned up, or when the remote peer has opened a
+            # new stream with a higher stream ID than this one, forcing it
+            # closed implicitly.
+            #
+            # Check how the stream was closed: depending on the mechanism, it
+            # is either a stream error or a connection error.
+            if self._stream_is_closed_by_reset(e.stream_id):
+                # Closed by RST_STREAM is a stream error.
+                f = RstStreamFrame(e.stream_id)
+                f.error_code = ErrorCodes.STREAM_CLOSED
+                self._prepare_for_sending([f])
+                events = []
+            elif self._stream_is_closed_by_end(e.stream_id):
+                # Closed by END_STREAM is a connection error.
+                raise StreamClosedError(e.stream_id) from e
+            else:
+                # Closed implicitly, also a connection error, but of type
+                # PROTOCOL_ERROR.
+                raise
+        else:
+            self._prepare_for_sending(frames)
+
+        return events
+
+    def _terminate_connection(self, error_code: ErrorCodes) -> None:
+        """
+        Terminate the connection early. Used in error handling blocks to send
+        GOAWAY frames.
+        """
+        f = GoAwayFrame(0)
+        f.last_stream_id = self.highest_inbound_stream_id
+        f.error_code = error_code
+        self.state_machine.process_input(ConnectionInputs.SEND_GOAWAY)
+        self._prepare_for_sending([f])
+
+    def _receive_headers_frame(self, frame: HeadersFrame) -> tuple[list[Frame], list[Event]]:
+        """
+        Receive a headers frame on the connection.
+        """
+        # If necessary, check we can open the stream. Also validate that the
+        # stream ID is valid.
+        if frame.stream_id not in self.streams:
+            max_open_streams = self.local_settings.max_concurrent_streams
+            if (self.open_inbound_streams + 1) > max_open_streams:
+                msg = f"Max outbound streams is {max_open_streams}, {self.open_outbound_streams} open"
+                raise TooManyStreamsError(msg)
+
+        # Let's decode the headers. We handle headers as bytes internally up
+        # until we hang them off the event, at which point we may optionally
+        # convert them to unicode.
+        headers = _decode_headers(self.decoder, frame.data)
+
+        events = self.state_machine.process_input(
+            ConnectionInputs.RECV_HEADERS,
+        )
+        stream = self._get_or_create_stream(
+            frame.stream_id, AllowedStreamIDs(not self.config.client_side),
+        )
+        frames, stream_events = stream.receive_headers(
+            headers,
+            "END_STREAM" in frame.flags,
+            self.config.header_encoding,
+        )
+
+        if "PRIORITY" in frame.flags:
+            p_frames, p_events = self._receive_priority_frame(frame)
+            expected_frame_types = (RequestReceived, ResponseReceived, TrailersReceived, InformationalResponseReceived)
+            assert isinstance(stream_events[0], expected_frame_types)
+            assert isinstance(p_events[0], PriorityUpdated)
+            stream_events[0].priority_updated = p_events[0]
+            stream_events.extend(p_events)
+            assert not p_frames
+
+        return frames, events + stream_events
+
+    def _receive_push_promise_frame(self, frame: PushPromiseFrame) -> tuple[list[Frame], list[Event]]:
+        """
+        Receive a push-promise frame on the connection.
+        """
+        if not self.local_settings.enable_push:
+            msg = "Received pushed stream"
+            raise ProtocolError(msg)
+
+        pushed_headers = _decode_headers(self.decoder, frame.data)
+
+        events = self.state_machine.process_input(
+            ConnectionInputs.RECV_PUSH_PROMISE,
+        )
+
+        try:
+            stream = self._get_stream_by_id(frame.stream_id)
+        except NoSuchStreamError as e:
+            # We need to check if the parent stream was reset by us. If it was
+            # then we presume that the PUSH_PROMISE was in flight when we reset
+            # the parent stream. Rather than accept the new stream, just reset
+            # it.
+            #
+            # If this was closed naturally, however, we should call this a
+            # PROTOCOL_ERROR: pushing a stream on a naturally closed stream is
+            # a real problem because it creates a brand new stream that the
+            # remote peer now believes exists.
+            if (self._stream_closed_by(frame.stream_id) ==
+                    StreamClosedBy.SEND_RST_STREAM):
+                f = RstStreamFrame(frame.promised_stream_id)
+                f.error_code = ErrorCodes.REFUSED_STREAM
+                return [f], events
+
+            msg = "Attempted to push on closed stream."
+            raise ProtocolError(msg) from e
+
+        # We need to prevent peers pushing streams in response to streams that
+        # they themselves have already pushed: see #163 and RFC 7540 § 6.6. The
+        # easiest way to do that is to assert that the stream_id is not even:
+        # this shortcut works because only servers can push and the state
+        # machine will enforce this.
+        if (frame.stream_id % 2) == 0:
+            msg = "Cannot recursively push streams."
+            raise ProtocolError(msg)
+
+        try:
+            frames, stream_events = stream.receive_push_promise_in_band(
+                frame.promised_stream_id,
+                pushed_headers,
+                self.config.header_encoding,
+            )
+        except StreamClosedError:
+            # The parent stream was reset by us, so we presume that
+            # PUSH_PROMISE was in flight when we reset the parent stream.
+            # So we just reset the new stream.
+            f = RstStreamFrame(frame.promised_stream_id)
+            f.error_code = ErrorCodes.REFUSED_STREAM
+            return [f], events
+
+        new_stream = self._begin_new_stream(
+            frame.promised_stream_id, AllowedStreamIDs.EVEN,
+        )
+        self.streams[frame.promised_stream_id] = new_stream
+        new_stream.remotely_pushed(pushed_headers)
+
+        return frames, events + stream_events
+
+    def _handle_data_on_closed_stream(self,
+                                      events: list[Event],
+                                      exc: StreamClosedError,
+                                      frame: DataFrame) -> tuple[list[Frame], list[Event]]:
+        # This stream is already closed - and yet we received a DATA frame.
+        # The received DATA frame counts towards the connection flow window.
+        # We need to manually to acknowledge the DATA frame to update the flow
+        # window of the connection. Otherwise the whole connection stalls due
+        # the inbound flow window being 0.
+        frames: list[Frame] = []
+        conn_manager = self._inbound_flow_control_window_manager
+        conn_increment = conn_manager.process_bytes(
+            frame.flow_controlled_length,
+        )
+
+        if conn_increment:
+            window_update_frame = WindowUpdateFrame(0)
+            window_update_frame.window_increment = conn_increment
+            frames.append(window_update_frame)
+            self.config.logger.debug(
+                "Received DATA frame on closed stream %d - "
+                "auto-emitted a WINDOW_UPDATE by %d",
+                frame.stream_id, conn_increment,
+            )
+
+        rst_stream_frame = RstStreamFrame(exc.stream_id)
+        rst_stream_frame.error_code = exc.error_code
+        frames.append(rst_stream_frame)
+        self.config.logger.debug(
+            "Stream %s already CLOSED or cleaned up - auto-emitted a RST_FRAME",
+            frame.stream_id,
+        )
+        return frames, events + exc._events
+
+    def _receive_data_frame(self, frame: DataFrame) -> tuple[list[Frame], list[Event]]:
+        """
+        Receive a data frame on the connection.
+        """
+        flow_controlled_length = frame.flow_controlled_length
+
+        events = self.state_machine.process_input(
+            ConnectionInputs.RECV_DATA,
+        )
+        self._inbound_flow_control_window_manager.window_consumed(
+            flow_controlled_length,
+        )
+
+        try:
+            stream = self._get_stream_by_id(frame.stream_id)
+            frames, stream_events = stream.receive_data(
+                frame.data,
+                "END_STREAM" in frame.flags,
+                flow_controlled_length,
+            )
+        except StreamClosedError as e:
+            # This stream is either marked as CLOSED or already gone from our
+            # internal state.
+            return self._handle_data_on_closed_stream(events, e, frame)
+
+        return frames, events + stream_events
+
+    def _receive_settings_frame(self, frame: SettingsFrame) -> tuple[list[Frame], list[Event]]:
+        """
+        Receive a SETTINGS frame on the connection.
+        """
+        events = self.state_machine.process_input(
+            ConnectionInputs.RECV_SETTINGS,
+        )
+
+        # This is an ack of the local settings.
+        if "ACK" in frame.flags:
+            changed_settings = self._local_settings_acked()
+            ack_event = SettingsAcknowledged()
+            ack_event.changed_settings = changed_settings
+            events.append(ack_event)
+            return [], events
+
+        # Add the new settings.
+        self.remote_settings.update(frame.settings)
+        events.append(
+            RemoteSettingsChanged.from_settings(
+                self.remote_settings, frame.settings,
+            ),
+        )
+        frames = self._acknowledge_settings()
+
+        return frames, events
+
+    def _receive_window_update_frame(self, frame: WindowUpdateFrame) -> tuple[list[Frame], list[Event]]:
+        """
+        Receive a WINDOW_UPDATE frame on the connection.
+        """
+        # hyperframe will take care of validating the window_increment.
+        # If we reach in here, we can assume a valid value.
+
+        events = self.state_machine.process_input(
+            ConnectionInputs.RECV_WINDOW_UPDATE,
+        )
+
+        if frame.stream_id:
+            try:
+                stream = self._get_stream_by_id(frame.stream_id)
+                frames, stream_events = stream.receive_window_update(
+                    frame.window_increment,
+                )
+            except StreamClosedError:
+                return [], events
+        else:
+            # Increment our local flow control window.
+            self.outbound_flow_control_window = guard_increment_window(
+                self.outbound_flow_control_window,
+                frame.window_increment,
+            )
+
+            # FIXME: Should we split this into one event per active stream?
+            window_updated_event = WindowUpdated()
+            window_updated_event.stream_id = 0
+            window_updated_event.delta = frame.window_increment
+            stream_events = [window_updated_event]
+            frames = []
+
+        return frames, events + stream_events
+
+    def _receive_ping_frame(self, frame: PingFrame) -> tuple[list[Frame], list[Event]]:
+        """
+        Receive a PING frame on the connection.
+        """
+        events = self.state_machine.process_input(
+            ConnectionInputs.RECV_PING,
+        )
+        frames: list[Frame] = []
+
+        evt: PingReceived | PingAckReceived
+        if "ACK" in frame.flags:
+            evt = PingAckReceived()
+        else:
+            evt = PingReceived()
+
+            # automatically ACK the PING with the same 'opaque data'
+            f = PingFrame(0)
+            f.flags.add("ACK")
+            f.opaque_data = frame.opaque_data
+            frames.append(f)
+
+        evt.ping_data = frame.opaque_data
+        events.append(evt)
+
+        return frames, events
+
+    def _receive_rst_stream_frame(self, frame: RstStreamFrame) -> tuple[list[Frame], list[Event]]:
+        """
+        Receive a RST_STREAM frame on the connection.
+        """
+        events = self.state_machine.process_input(
+            ConnectionInputs.RECV_RST_STREAM,
+        )
+        try:
+            stream = self._get_stream_by_id(frame.stream_id)
+        except NoSuchStreamError:
+            # The stream is missing. That's ok, we just do nothing here.
+            stream_frames: list[Frame] = []
+            stream_events: list[Event] = []
+        else:
+            stream_frames, stream_events = stream.stream_reset(frame)
+
+        return stream_frames, events + stream_events
+
+    def _receive_priority_frame(self, frame: HeadersFrame | PriorityFrame) -> tuple[list[Frame], list[Event]]:
+        """
+        Receive a PRIORITY frame on the connection.
+        """
+        events = self.state_machine.process_input(
+            ConnectionInputs.RECV_PRIORITY,
+        )
+
+        event = PriorityUpdated()
+        event.stream_id = frame.stream_id
+        event.depends_on = frame.depends_on
+        event.exclusive = frame.exclusive
+
+        # Weight is an integer between 1 and 256, but the byte only allows
+        # 0 to 255: add one.
+        event.weight = frame.stream_weight + 1
+
+        # A stream may not depend on itself.
+        if event.depends_on == frame.stream_id:
+            msg = f"Stream {frame.stream_id} may not depend on itself"
+            raise ProtocolError(msg)
+        events.append(event)
+
+        return [], events
+
+    def _receive_goaway_frame(self, frame: GoAwayFrame) -> tuple[list[Frame], list[Event]]:
+        """
+        Receive a GOAWAY frame on the connection.
+        """
+        events = self.state_machine.process_input(
+            ConnectionInputs.RECV_GOAWAY,
+        )
+
+        # Clear the outbound data buffer: we cannot send further data now.
+        self.clear_outbound_data_buffer()
+
+        # Fire an appropriate ConnectionTerminated event.
+        new_event = ConnectionTerminated()
+        new_event.error_code = _error_code_from_int(frame.error_code)
+        new_event.last_stream_id = frame.last_stream_id
+        new_event.additional_data = (frame.additional_data
+                                     if frame.additional_data else None)
+        events.append(new_event)
+
+        return [], events
+
+    def _receive_naked_continuation(self, frame: ContinuationFrame) -> None:
+        """
+        A naked CONTINUATION frame has been received. This is always an error,
+        but the type of error it is depends on the state of the stream and must
+        transition the state of the stream, so we need to pass it to the
+        appropriate stream.
+        """
+        stream = self._get_stream_by_id(frame.stream_id)
+        stream.receive_continuation()
+        msg = "Should not be reachable"  # pragma: no cover
+        raise AssertionError(msg)  # pragma: no cover
+
+    def _receive_alt_svc_frame(self, frame: AltSvcFrame) -> tuple[list[Frame], list[Event]]:
+        """
+        An ALTSVC frame has been received. This frame, specified in RFC 7838,
+        is used to advertise alternative places where the same service can be
+        reached.
+
+        This frame can optionally be received either on a stream or on stream
+        0, and its semantics are different in each case.
+        """
+        events = self.state_machine.process_input(
+            ConnectionInputs.RECV_ALTERNATIVE_SERVICE,
+        )
+        frames = []
+
+        if frame.stream_id:
+            # Given that it makes no sense to receive ALTSVC on a stream
+            # before that stream has been opened with a HEADERS frame, the
+            # ALTSVC frame cannot create a stream. If the stream is not
+            # present, we simply ignore the frame.
+            try:
+                stream = self._get_stream_by_id(frame.stream_id)
+            except (NoSuchStreamError, StreamClosedError):
+                pass
+            else:
+                stream_frames, stream_events = stream.receive_alt_svc(frame)
+                frames.extend(stream_frames)
+                events.extend(stream_events)
+        else:
+            # This frame is sent on stream 0. The origin field on the frame
+            # must be present, though if it isn't it's not a ProtocolError
+            # (annoyingly), we just need to ignore it.
+            if not frame.origin:
+                return frames, events
+
+            # If we're a server, we want to ignore this (RFC 7838 says so).
+            if not self.config.client_side:
+                return frames, events
+
+            event = AlternativeServiceAvailable()
+            event.origin = frame.origin
+            event.field_value = frame.field
+            events.append(event)
+
+        return frames, events
+
+    def _receive_unknown_frame(self, frame: ExtensionFrame) -> tuple[list[Frame], list[Event]]:
+        """
+        We have received a frame that we do not understand. This is almost
+        certainly an extension frame, though it's impossible to be entirely
+        sure.
+
+        RFC 7540 § 5.5 says that we MUST ignore unknown frame types: so we
+        do. We do notify the user that we received one, however.
+        """
+        # All we do here is log.
+        self.config.logger.debug(
+            "Received unknown extension frame (ID %d)", frame.stream_id,
+        )
+        event = UnknownFrameReceived()
+        event.frame = frame
+        return [], [event]
+
+    def _local_settings_acked(self) -> dict[SettingCodes | int, ChangedSetting]:
+        """
+        Handle the local settings being ACKed, update internal state.
+        """
+        changes = self.local_settings.acknowledge()
+
+        if SettingCodes.INITIAL_WINDOW_SIZE in changes:
+            setting = changes[SettingCodes.INITIAL_WINDOW_SIZE]
+            self._inbound_flow_control_change_from_settings(
+                setting.original_value,
+                setting.new_value,
+            )
+
+        if SettingCodes.MAX_HEADER_LIST_SIZE in changes:
+            setting = changes[SettingCodes.MAX_HEADER_LIST_SIZE]
+            self.decoder.max_header_list_size = setting.new_value
+
+        if SettingCodes.MAX_FRAME_SIZE in changes:
+            setting = changes[SettingCodes.MAX_FRAME_SIZE]
+            self.max_inbound_frame_size = setting.new_value
+
+        if SettingCodes.HEADER_TABLE_SIZE in changes:
+            setting = changes[SettingCodes.HEADER_TABLE_SIZE]
+            # This is safe across all hpack versions: some versions just won't
+            # respect it.
+            self.decoder.max_allowed_table_size = setting.new_value
+
+        return changes
+
+    def _stream_id_is_outbound(self, stream_id: int) -> bool:
+        """
+        Returns ``True`` if the stream ID corresponds to an outbound stream
+        (one initiated by this peer), returns ``False`` otherwise.
+        """
+        return (stream_id % 2 == int(self.config.client_side))
+
+    def _stream_closed_by(self, stream_id: int) -> StreamClosedBy | None:
+        """
+        Returns how the stream was closed.
+
+        The return value will be either a member of
+        ``h2.stream.StreamClosedBy`` or ``None``. If ``None``, the stream was
+        closed implicitly by the peer opening a stream with a higher stream ID
+        before opening this one.
+        """
+        if stream_id in self.streams:
+            return self.streams[stream_id].closed_by
+        if stream_id in self._closed_streams:
+            return self._closed_streams[stream_id]
+        return None
+
+    def _stream_is_closed_by_reset(self, stream_id: int) -> bool:
+        """
+        Returns ``True`` if the stream was closed by sending or receiving a
+        RST_STREAM frame. Returns ``False`` otherwise.
+        """
+        return self._stream_closed_by(stream_id) in (
+            StreamClosedBy.RECV_RST_STREAM, StreamClosedBy.SEND_RST_STREAM,
+        )
+
+    def _stream_is_closed_by_end(self, stream_id: int) -> bool:
+        """
+        Returns ``True`` if the stream was closed by sending or receiving an
+        END_STREAM flag in a HEADERS or DATA frame. Returns ``False``
+        otherwise.
+        """
+        return self._stream_closed_by(stream_id) in (
+            StreamClosedBy.RECV_END_STREAM, StreamClosedBy.SEND_END_STREAM,
+        )
+
+
+def _add_frame_priority(frame: PriorityFrame | HeadersFrame,
+                        weight: int | None = None,
+                        depends_on: int | None = None,
+                        exclusive: bool | None = None) -> PriorityFrame | HeadersFrame:
+    """
+    Adds priority data to a given frame. Does not change any flags set on that
+    frame: if the caller is adding priority information to a HEADERS frame they
+    must set that themselves.
+
+    This method also deliberately sets defaults for anything missing.
+
+    This method validates the input values.
+    """
+    # A stream may not depend on itself.
+    if depends_on == frame.stream_id:
+        msg = f"Stream {frame.stream_id} may not depend on itself"
+        raise ProtocolError(msg)
+
+    # Weight must be between 1 and 256.
+    if weight is not None:
+        if weight > 256 or weight < 1:
+            msg = f"Weight must be between 1 and 256, not {weight}"
+            raise ProtocolError(msg)
+        # Weight is an integer between 1 and 256, but the byte only allows
+        # 0 to 255: subtract one.
+        weight -= 1
+
+    # Set defaults for anything not provided.
+    weight = weight if weight is not None else 15
+    depends_on = depends_on if depends_on is not None else 0
+    exclusive = exclusive if exclusive is not None else False
+
+    frame.stream_weight = weight
+    frame.depends_on = depends_on
+    frame.exclusive = exclusive
+
+    return frame
+
+
+def _decode_headers(decoder: Decoder, encoded_header_block: bytes) -> Iterable[Header]:
+    """
+    Decode a HPACK-encoded header block, translating HPACK exceptions into
+    sensible h2 errors.
+
+    This only ever returns bytestring headers: h2 may emit them as
+    unicode later, but internally it processes them as bytestrings only.
+    """
+    try:
+        return decoder.decode(encoded_header_block, raw=True)
+    except OversizedHeaderListError as e:
+        # This is a symptom of a HPACK bomb attack: the user has
+        # disregarded our requirements on how large a header block we'll
+        # accept.
+        msg = f"Oversized header block: {e}"
+        raise DenialOfServiceError(msg) from e
+    except (HPACKError, IndexError, TypeError, UnicodeDecodeError) as e:
+        # We should only need HPACKError here, but versions of HPACK older
+        # than 2.1.0 throw all three others as well. For maximum
+        # compatibility, catch all of them.
+        msg = f"Error decoding header block: {e}"
+        raise ProtocolError(msg) from e