diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/h2')
-rw-r--r-- | .venv/lib/python3.12/site-packages/h2/__init__.py | 6 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/h2/config.py | 212 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/h2/connection.py | 2112 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/h2/errors.py | 77 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/h2/events.py | 639 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/h2/exceptions.py | 194 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/h2/frame_buffer.py | 161 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/h2/py.typed | 0 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/h2/settings.py | 331 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/h2/stream.py | 1417 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/h2/utilities.py | 696 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/h2/windows.py | 133 |
12 files changed, 5978 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/h2/__init__.py b/.venv/lib/python3.12/site-packages/h2/__init__.py new file mode 100644 index 00000000..0764daad --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/__init__.py @@ -0,0 +1,6 @@ +""" +HTTP/2 protocol implementation for Python. +""" +from __future__ import annotations + +__version__ = "4.2.0" diff --git a/.venv/lib/python3.12/site-packages/h2/config.py b/.venv/lib/python3.12/site-packages/h2/config.py new file mode 100644 index 00000000..cbc3b1ea --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/config.py @@ -0,0 +1,212 @@ +""" +h2/config +~~~~~~~~~ + +Objects for controlling the configuration of the HTTP/2 stack. +""" +from __future__ import annotations + +import sys +from typing import Any + + +class _BooleanConfigOption: + """ + Descriptor for handling a boolean config option. This will block + attempts to set boolean config options to non-bools. + """ + + def __init__(self, name: str) -> None: + self.name = name + self.attr_name = f"_{self.name}" + + def __get__(self, instance: Any, owner: Any) -> bool: + return getattr(instance, self.attr_name) # type: ignore + + def __set__(self, instance: Any, value: bool) -> None: + if not isinstance(value, bool): + msg = f"{self.name} must be a bool" + raise ValueError(msg) # noqa: TRY004 + setattr(instance, self.attr_name, value) + + +class DummyLogger: + """ + A Logger object that does not actual logging, hence a DummyLogger. + + For the class the log operation is merely a no-op. The intent is to avoid + conditionals being sprinkled throughout the h2 code for calls to + logging functions when no logger is passed into the corresponding object. + """ + + def __init__(self, *vargs) -> None: # type: ignore + pass + + def debug(self, *vargs, **kwargs) -> None: # type: ignore + """ + No-op logging. Only level needed for now. + """ + + def trace(self, *vargs, **kwargs) -> None: # type: ignore + """ + No-op logging. Only level needed for now. + """ + + +class OutputLogger: + """ + A Logger object that prints to stderr or any other file-like object. + + This class is provided for convenience and not part of the stable API. + + :param file: A file-like object passed to the print function. + Defaults to ``sys.stderr``. + :param trace: Enables trace-level output. Defaults to ``False``. + """ + + def __init__(self, file=None, trace_level=False) -> None: # type: ignore + super().__init__() + self.file = file or sys.stderr + self.trace_level = trace_level + + def debug(self, fmtstr, *args) -> None: # type: ignore + print(f"h2 (debug): {fmtstr % args}", file=self.file) + + def trace(self, fmtstr, *args) -> None: # type: ignore + if self.trace_level: + print(f"h2 (trace): {fmtstr % args}", file=self.file) + + +class H2Configuration: + """ + An object that controls the way a single HTTP/2 connection behaves. + + This object allows the users to customize behaviour. In particular, it + allows users to enable or disable optional features, or to otherwise handle + various unusual behaviours. + + This object has very little behaviour of its own: it mostly just ensures + that configuration is self-consistent. + + :param client_side: Whether this object is to be used on the client side of + a connection, or on the server side. Affects the logic used by the + state machine, the default settings values, the allowable stream IDs, + and several other properties. Defaults to ``True``. + :type client_side: ``bool`` + + :param header_encoding: Controls whether the headers emitted by this object + in events are transparently decoded to ``unicode`` strings, and what + encoding is used to do that decoding. This defaults to ``None``, + meaning that headers will be returned as bytes. To automatically + decode headers (that is, to return them as unicode strings), this can + be set to the string name of any encoding, e.g. ``'utf-8'``. + + .. versionchanged:: 3.0.0 + Changed default value from ``'utf-8'`` to ``None`` + + :type header_encoding: ``str``, ``False``, or ``None`` + + :param validate_outbound_headers: Controls whether the headers emitted + by this object are validated against the rules in RFC 7540. + Disabling this setting will cause outbound header validation to + be skipped, and allow the object to emit headers that may be illegal + according to RFC 7540. Defaults to ``True``. + :type validate_outbound_headers: ``bool`` + + :param normalize_outbound_headers: Controls whether the headers emitted + by this object are normalized before sending. Disabling this setting + will cause outbound header normalization to be skipped, and allow + the object to emit headers that may be illegal according to + RFC 7540. Defaults to ``True``. + :type normalize_outbound_headers: ``bool`` + + :param split_outbound_cookies: Controls whether the outbound cookie + headers are split before sending or not. According to RFC 7540 + - 8.1.2.5 the outbound header cookie headers may be split to improve + headers compression. Default is ``False``. + :type split_outbound_cookies: ``bool`` + + :param validate_inbound_headers: Controls whether the headers received + by this object are validated against the rules in RFC 7540. + Disabling this setting will cause inbound header validation to + be skipped, and allow the object to receive headers that may be illegal + according to RFC 7540. Defaults to ``True``. + :type validate_inbound_headers: ``bool`` + + :param normalize_inbound_headers: Controls whether the headers received by + this object are normalized according to the rules of RFC 7540. + Disabling this setting may lead to h2 emitting header blocks that + some RFCs forbid, e.g. with multiple cookie fields. + + .. versionadded:: 3.0.0 + + :type normalize_inbound_headers: ``bool`` + + :param logger: A logger that conforms to the requirements for this module, + those being no I/O and no context switches, which is needed in order + to run in asynchronous operation. + + .. versionadded:: 2.6.0 + + :type logger: ``logging.Logger`` + """ + + client_side = _BooleanConfigOption("client_side") + validate_outbound_headers = _BooleanConfigOption( + "validate_outbound_headers", + ) + normalize_outbound_headers = _BooleanConfigOption( + "normalize_outbound_headers", + ) + split_outbound_cookies = _BooleanConfigOption( + "split_outbound_cookies", + ) + validate_inbound_headers = _BooleanConfigOption( + "validate_inbound_headers", + ) + normalize_inbound_headers = _BooleanConfigOption( + "normalize_inbound_headers", + ) + + def __init__(self, + client_side: bool = True, + header_encoding: bool | str | None = None, + validate_outbound_headers: bool = True, + normalize_outbound_headers: bool = True, + split_outbound_cookies: bool = False, + validate_inbound_headers: bool = True, + normalize_inbound_headers: bool = True, + logger: DummyLogger | OutputLogger | None = None) -> None: + self.client_side = client_side + self.header_encoding = header_encoding + self.validate_outbound_headers = validate_outbound_headers + self.normalize_outbound_headers = normalize_outbound_headers + self.split_outbound_cookies = split_outbound_cookies + self.validate_inbound_headers = validate_inbound_headers + self.normalize_inbound_headers = normalize_inbound_headers + self.logger = logger or DummyLogger(__name__) + + @property + def header_encoding(self) -> bool | str | None: + """ + Controls whether the headers emitted by this object in events are + transparently decoded to ``unicode`` strings, and what encoding is used + to do that decoding. This defaults to ``None``, meaning that headers + will be returned as bytes. To automatically decode headers (that is, to + return them as unicode strings), this can be set to the string name of + any encoding, e.g. ``'utf-8'``. + """ + return self._header_encoding + + @header_encoding.setter + def header_encoding(self, value: bool | str | None) -> None: + """ + Enforces constraints on the value of header encoding. + """ + if not isinstance(value, (bool, str, type(None))): + msg = "header_encoding must be bool, string, or None" + raise ValueError(msg) # noqa: TRY004 + if value is True: + msg = "header_encoding cannot be True" + raise ValueError(msg) + self._header_encoding = value diff --git a/.venv/lib/python3.12/site-packages/h2/connection.py b/.venv/lib/python3.12/site-packages/h2/connection.py new file mode 100644 index 00000000..28be9fca --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/connection.py @@ -0,0 +1,2112 @@ +""" +h2/connection +~~~~~~~~~~~~~ + +An implementation of a HTTP/2 connection. +""" +from __future__ import annotations + +import base64 +from enum import Enum, IntEnum +from typing import TYPE_CHECKING, Any, Callable + +from hpack.exceptions import HPACKError, OversizedHeaderListError +from hpack.hpack import Decoder, Encoder +from hyperframe.exceptions import InvalidPaddingError +from hyperframe.frame import ( + AltSvcFrame, + ContinuationFrame, + DataFrame, + ExtensionFrame, + Frame, + GoAwayFrame, + HeadersFrame, + PingFrame, + PriorityFrame, + PushPromiseFrame, + RstStreamFrame, + SettingsFrame, + WindowUpdateFrame, +) + +from .config import H2Configuration +from .errors import ErrorCodes, _error_code_from_int +from .events import ( + AlternativeServiceAvailable, + ConnectionTerminated, + Event, + InformationalResponseReceived, + PingAckReceived, + PingReceived, + PriorityUpdated, + RemoteSettingsChanged, + RequestReceived, + ResponseReceived, + SettingsAcknowledged, + TrailersReceived, + UnknownFrameReceived, + WindowUpdated, +) +from .exceptions import ( + DenialOfServiceError, + FlowControlError, + FrameTooLargeError, + NoAvailableStreamIDError, + NoSuchStreamError, + ProtocolError, + RFC1122Error, + StreamClosedError, + StreamIDTooLowError, + TooManyStreamsError, +) +from .frame_buffer import FrameBuffer +from .settings import ChangedSetting, SettingCodes, Settings +from .stream import H2Stream, StreamClosedBy +from .utilities import SizeLimitDict, guard_increment_window +from .windows import WindowManager + +if TYPE_CHECKING: # pragma: no cover + from collections.abc import Iterable + + from hpack.struct import Header, HeaderWeaklyTyped + + +class ConnectionState(Enum): + IDLE = 0 + CLIENT_OPEN = 1 + SERVER_OPEN = 2 + CLOSED = 3 + + +class ConnectionInputs(Enum): + SEND_HEADERS = 0 + SEND_PUSH_PROMISE = 1 + SEND_DATA = 2 + SEND_GOAWAY = 3 + SEND_WINDOW_UPDATE = 4 + SEND_PING = 5 + SEND_SETTINGS = 6 + SEND_RST_STREAM = 7 + SEND_PRIORITY = 8 + RECV_HEADERS = 9 + RECV_PUSH_PROMISE = 10 + RECV_DATA = 11 + RECV_GOAWAY = 12 + RECV_WINDOW_UPDATE = 13 + RECV_PING = 14 + RECV_SETTINGS = 15 + RECV_RST_STREAM = 16 + RECV_PRIORITY = 17 + SEND_ALTERNATIVE_SERVICE = 18 # Added in 2.3.0 + RECV_ALTERNATIVE_SERVICE = 19 # Added in 2.3.0 + + +class AllowedStreamIDs(IntEnum): + EVEN = 0 + ODD = 1 + + +class H2ConnectionStateMachine: + """ + A single HTTP/2 connection state machine. + + This state machine, while defined in its own class, is logically part of + the H2Connection class also defined in this file. The state machine itself + maintains very little state directly, instead focusing entirely on managing + state transitions. + """ + + # For the purposes of this state machine we treat HEADERS and their + # associated CONTINUATION frames as a single jumbo frame. The protocol + # allows/requires this by preventing other frames from being interleved in + # between HEADERS/CONTINUATION frames. + # + # The _transitions dictionary contains a mapping of tuples of + # (state, input) to tuples of (side_effect_function, end_state). This map + # contains all allowed transitions: anything not in this map is invalid + # and immediately causes a transition to ``closed``. + + _transitions = { + # State: idle + (ConnectionState.IDLE, ConnectionInputs.SEND_HEADERS): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.IDLE, ConnectionInputs.RECV_HEADERS): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.IDLE, ConnectionInputs.SEND_SETTINGS): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.RECV_SETTINGS): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.SEND_WINDOW_UPDATE): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.RECV_WINDOW_UPDATE): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.SEND_PING): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.RECV_PING): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.SEND_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.IDLE, ConnectionInputs.RECV_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.IDLE, ConnectionInputs.SEND_PRIORITY): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.RECV_PRIORITY): + (None, ConnectionState.IDLE), + (ConnectionState.IDLE, ConnectionInputs.SEND_ALTERNATIVE_SERVICE): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.IDLE, ConnectionInputs.RECV_ALTERNATIVE_SERVICE): + (None, ConnectionState.CLIENT_OPEN), + + # State: open, client side. + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_HEADERS): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_DATA): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_WINDOW_UPDATE): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_PING): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_SETTINGS): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_PRIORITY): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_HEADERS): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PUSH_PROMISE): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_DATA): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_WINDOW_UPDATE): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PING): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_SETTINGS): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.SEND_RST_STREAM): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_RST_STREAM): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, ConnectionInputs.RECV_PRIORITY): + (None, ConnectionState.CLIENT_OPEN), + (ConnectionState.CLIENT_OPEN, + ConnectionInputs.RECV_ALTERNATIVE_SERVICE): + (None, ConnectionState.CLIENT_OPEN), + + # State: open, server side. + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_HEADERS): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PUSH_PROMISE): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_DATA): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_WINDOW_UPDATE): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PING): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_SETTINGS): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_PRIORITY): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_HEADERS): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_DATA): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_WINDOW_UPDATE): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_PING): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_SETTINGS): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_PRIORITY): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.SEND_RST_STREAM): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, ConnectionInputs.RECV_RST_STREAM): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, + ConnectionInputs.SEND_ALTERNATIVE_SERVICE): + (None, ConnectionState.SERVER_OPEN), + (ConnectionState.SERVER_OPEN, + ConnectionInputs.RECV_ALTERNATIVE_SERVICE): + (None, ConnectionState.SERVER_OPEN), + + # State: closed + (ConnectionState.CLOSED, ConnectionInputs.SEND_GOAWAY): + (None, ConnectionState.CLOSED), + (ConnectionState.CLOSED, ConnectionInputs.RECV_GOAWAY): + (None, ConnectionState.CLOSED), + } + + def __init__(self) -> None: + self.state = ConnectionState.IDLE + + def process_input(self, input_: ConnectionInputs) -> list[Event]: + """ + Process a specific input in the state machine. + """ + if not isinstance(input_, ConnectionInputs): + msg = "Input must be an instance of ConnectionInputs" + raise ValueError(msg) # noqa: TRY004 + + try: + func, target_state = self._transitions[(self.state, input_)] + except KeyError as e: + old_state = self.state + self.state = ConnectionState.CLOSED + msg = f"Invalid input {input_} in state {old_state}" + raise ProtocolError(msg) from e + else: + self.state = target_state + if func is not None: # pragma: no cover + return func() + + return [] + + +class H2Connection: + """ + A low-level HTTP/2 connection object. This handles building and receiving + frames and maintains both connection and per-stream state for all streams + on this connection. + + This wraps a HTTP/2 Connection state machine implementation, ensuring that + frames can only be sent/received when the connection is in a valid state. + It also builds stream state machines on demand to ensure that the + constraints of those state machines are met as well. Attempts to create + frames that cannot be sent will raise a ``ProtocolError``. + + .. versionchanged:: 2.3.0 + Added the ``header_encoding`` keyword argument. + + .. versionchanged:: 2.5.0 + Added the ``config`` keyword argument. Deprecated the ``client_side`` + and ``header_encoding`` parameters. + + .. versionchanged:: 3.0.0 + Removed deprecated parameters and properties. + + :param config: The configuration for the HTTP/2 connection. + + .. versionadded:: 2.5.0 + + :type config: :class:`H2Configuration <h2.config.H2Configuration>` + """ + + # The initial maximum outbound frame size. This can be changed by receiving + # a settings frame. + DEFAULT_MAX_OUTBOUND_FRAME_SIZE = 65535 + + # The initial maximum inbound frame size. This is somewhat arbitrarily + # chosen. + DEFAULT_MAX_INBOUND_FRAME_SIZE = 2**24 + + # The highest acceptable stream ID. + HIGHEST_ALLOWED_STREAM_ID = 2**31 - 1 + + # The largest acceptable window increment. + MAX_WINDOW_INCREMENT = 2**31 - 1 + + # The initial default value of SETTINGS_MAX_HEADER_LIST_SIZE. + DEFAULT_MAX_HEADER_LIST_SIZE = 2**16 + + # Keep in memory limited amount of results for streams closes + MAX_CLOSED_STREAMS = 2**16 + + def __init__(self, config: H2Configuration | None = None) -> None: + self.state_machine = H2ConnectionStateMachine() + self.streams: dict[int, H2Stream] = {} + self.highest_inbound_stream_id = 0 + self.highest_outbound_stream_id = 0 + self.encoder = Encoder() + self.decoder = Decoder() + + # This won't always actually do anything: for versions of HPACK older + # than 2.3.0 it does nothing. However, we have to try! + self.decoder.max_header_list_size = self.DEFAULT_MAX_HEADER_LIST_SIZE + + #: The configuration for this HTTP/2 connection object. + #: + #: .. versionadded:: 2.5.0 + self.config = config or H2Configuration(client_side=True) + + # Objects that store settings, including defaults. + # + # We set the MAX_CONCURRENT_STREAMS value to 100 because its default is + # unbounded, and that's a dangerous default because it allows + # essentially unbounded resources to be allocated regardless of how + # they will be used. 100 should be suitable for the average + # application. This default obviously does not apply to the remote + # peer's settings: the remote peer controls them! + # + # We also set MAX_HEADER_LIST_SIZE to a reasonable value. This is to + # advertise our defence against CVE-2016-6581. However, not all + # versions of HPACK will let us do it. That's ok: we should at least + # suggest that we're not vulnerable. + self.local_settings = Settings( + client=self.config.client_side, + initial_values={ + SettingCodes.MAX_CONCURRENT_STREAMS: 100, + SettingCodes.MAX_HEADER_LIST_SIZE: + self.DEFAULT_MAX_HEADER_LIST_SIZE, + }, + ) + self.remote_settings = Settings(client=not self.config.client_side) + + # The current value of the connection flow control windows on the + # connection. + self.outbound_flow_control_window = ( + self.remote_settings.initial_window_size + ) + + #: The maximum size of a frame that can be emitted by this peer, in + #: bytes. + self.max_outbound_frame_size = self.remote_settings.max_frame_size + + #: The maximum size of a frame that can be received by this peer, in + #: bytes. + self.max_inbound_frame_size = self.local_settings.max_frame_size + + # Buffer for incoming data. + self.incoming_buffer = FrameBuffer(server=not self.config.client_side) + + # A private variable to store a sequence of received header frames + # until completion. + self._header_frames: list[Frame] = [] + + # Data that needs to be sent. + self._data_to_send = bytearray() + + # Keeps track of how streams are closed. + # Used to ensure that we don't blow up in the face of frames that were + # in flight when a RST_STREAM was sent. + # Also used to determine whether we should consider a frame received + # while a stream is closed as either a stream error or a connection + # error. + self._closed_streams: dict[int, StreamClosedBy | None] = SizeLimitDict( + size_limit=self.MAX_CLOSED_STREAMS, + ) + + # The flow control window manager for the connection. + self._inbound_flow_control_window_manager = WindowManager( + max_window_size=self.local_settings.initial_window_size, + ) + + # When in doubt use dict-dispatch. + self._frame_dispatch_table: dict[type[Frame], Callable] = { # type: ignore + HeadersFrame: self._receive_headers_frame, + PushPromiseFrame: self._receive_push_promise_frame, + SettingsFrame: self._receive_settings_frame, + DataFrame: self._receive_data_frame, + WindowUpdateFrame: self._receive_window_update_frame, + PingFrame: self._receive_ping_frame, + RstStreamFrame: self._receive_rst_stream_frame, + PriorityFrame: self._receive_priority_frame, + GoAwayFrame: self._receive_goaway_frame, + ContinuationFrame: self._receive_naked_continuation, + AltSvcFrame: self._receive_alt_svc_frame, + ExtensionFrame: self._receive_unknown_frame, + } + + def _prepare_for_sending(self, frames: list[Frame]) -> None: + if not frames: + return + self._data_to_send += b"".join(f.serialize() for f in frames) + assert all(f.body_len <= self.max_outbound_frame_size for f in frames) + + def _open_streams(self, remainder: int) -> int: + """ + A common method of counting number of open streams. Returns the number + of streams that are open *and* that have (stream ID % 2) == remainder. + While it iterates, also deletes any closed streams. + """ + count = 0 + to_delete = [] + + for stream_id, stream in self.streams.items(): + if stream.open and (stream_id % 2 == remainder): + count += 1 + elif stream.closed: + to_delete.append(stream_id) + + for stream_id in to_delete: + stream = self.streams.pop(stream_id) + self._closed_streams[stream_id] = stream.closed_by + + return count + + @property + def open_outbound_streams(self) -> int: + """ + The current number of open outbound streams. + """ + outbound_numbers = int(self.config.client_side) + return self._open_streams(outbound_numbers) + + @property + def open_inbound_streams(self) -> int: + """ + The current number of open inbound streams. + """ + inbound_numbers = int(not self.config.client_side) + return self._open_streams(inbound_numbers) + + @property + def inbound_flow_control_window(self) -> int: + """ + The size of the inbound flow control window for the connection. This is + rarely publicly useful: instead, use :meth:`remote_flow_control_window + <h2.connection.H2Connection.remote_flow_control_window>`. This + shortcut is largely present to provide a shortcut to this data. + """ + return self._inbound_flow_control_window_manager.current_window_size + + def _begin_new_stream(self, stream_id: int, allowed_ids: AllowedStreamIDs) -> H2Stream: + """ + Initiate a new stream. + + .. versionchanged:: 2.0.0 + Removed this function from the public API. + + :param stream_id: The ID of the stream to open. + :param allowed_ids: What kind of stream ID is allowed. + """ + self.config.logger.debug( + "Attempting to initiate stream ID %d", stream_id, + ) + outbound = self._stream_id_is_outbound(stream_id) + highest_stream_id = ( + self.highest_outbound_stream_id if outbound else + self.highest_inbound_stream_id + ) + + if stream_id <= highest_stream_id: + raise StreamIDTooLowError(stream_id, highest_stream_id) + + if (stream_id % 2) != int(allowed_ids): + msg = "Invalid stream ID for peer." + raise ProtocolError(msg) + + s = H2Stream( + stream_id, + config=self.config, + inbound_window_size=self.local_settings.initial_window_size, + outbound_window_size=self.remote_settings.initial_window_size, + ) + self.config.logger.debug("Stream ID %d created", stream_id) + s.max_outbound_frame_size = self.max_outbound_frame_size + + self.streams[stream_id] = s + self.config.logger.debug("Current streams: %s", self.streams.keys()) + + if outbound: + self.highest_outbound_stream_id = stream_id + else: + self.highest_inbound_stream_id = stream_id + + return s + + def initiate_connection(self) -> None: + """ + Provides any data that needs to be sent at the start of the connection. + Must be called for both clients and servers. + """ + self.config.logger.debug("Initializing connection") + self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS) + if self.config.client_side: + preamble = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" + else: + preamble = b"" + + f = SettingsFrame(0) + for setting, value in self.local_settings.items(): + f.settings[setting] = value + self.config.logger.debug( + "Send Settings frame: %s", self.local_settings, + ) + + self._data_to_send += preamble + f.serialize() + + def initiate_upgrade_connection(self, settings_header: bytes | None = None) -> bytes | None: + """ + Call to initialise the connection object for use with an upgraded + HTTP/2 connection (i.e. a connection negotiated using the + ``Upgrade: h2c`` HTTP header). + + This method differs from :meth:`initiate_connection + <h2.connection.H2Connection.initiate_connection>` in several ways. + Firstly, it handles the additional SETTINGS frame that is sent in the + ``HTTP2-Settings`` header field. When called on a client connection, + this method will return a bytestring that the caller can put in the + ``HTTP2-Settings`` field they send on their initial request. When + called on a server connection, the user **must** provide the value they + received from the client in the ``HTTP2-Settings`` header field to the + ``settings_header`` argument, which will be used appropriately. + + Additionally, this method sets up stream 1 in a half-closed state + appropriate for this side of the connection, to reflect the fact that + the request is already complete. + + Finally, this method also prepares the appropriate preamble to be sent + after the upgrade. + + .. versionadded:: 2.3.0 + + :param settings_header: (optional, server-only): The value of the + ``HTTP2-Settings`` header field received from the client. + :type settings_header: ``bytes`` + + :returns: For clients, a bytestring to put in the ``HTTP2-Settings``. + For servers, returns nothing. + :rtype: ``bytes`` or ``None`` + """ + self.config.logger.debug( + "Upgrade connection. Current settings: %s", self.local_settings, + ) + + frame_data = None + # Begin by getting the preamble in place. + self.initiate_connection() + + if self.config.client_side: + f = SettingsFrame(0) + for setting, value in self.local_settings.items(): + f.settings[setting] = value + + frame_data = f.serialize_body() + frame_data = base64.urlsafe_b64encode(frame_data) + elif settings_header: + # We have a settings header from the client. This needs to be + # applied, but we want to throw away the ACK. We do this by + # inserting the data into a Settings frame and then passing it to + # the state machine, but ignoring the return value. + settings_header = base64.urlsafe_b64decode(settings_header) + f = SettingsFrame(0) + f.parse_body(memoryview(settings_header)) + self._receive_settings_frame(f) + + # Set up appropriate state. Stream 1 in a half-closed state: + # half-closed(local) for clients, half-closed(remote) for servers. + # Additionally, we need to set up the Connection state machine. + connection_input = ( + ConnectionInputs.SEND_HEADERS if self.config.client_side + else ConnectionInputs.RECV_HEADERS + ) + self.config.logger.debug("Process input %s", connection_input) + self.state_machine.process_input(connection_input) + + # Set up stream 1. + self._begin_new_stream(stream_id=1, allowed_ids=AllowedStreamIDs.ODD) + self.streams[1].upgrade(self.config.client_side) + return frame_data + + def _get_or_create_stream(self, stream_id: int, allowed_ids: AllowedStreamIDs) -> H2Stream: + """ + Gets a stream by its stream ID. Will create one if one does not already + exist. Use allowed_ids to circumvent the usual stream ID rules for + clients and servers. + + .. versionchanged:: 2.0.0 + Removed this function from the public API. + """ + try: + return self.streams[stream_id] + except KeyError: + return self._begin_new_stream(stream_id, allowed_ids) + + def _get_stream_by_id(self, stream_id: int | None) -> H2Stream: + """ + Gets a stream by its stream ID. Raises NoSuchStreamError if the stream + ID does not correspond to a known stream and is higher than the current + maximum: raises if it is lower than the current maximum. + + .. versionchanged:: 2.0.0 + Removed this function from the public API. + """ + if not stream_id: + raise NoSuchStreamError(-1) # pragma: no cover + try: + return self.streams[stream_id] + except KeyError as e: + outbound = self._stream_id_is_outbound(stream_id) + highest_stream_id = ( + self.highest_outbound_stream_id if outbound else + self.highest_inbound_stream_id + ) + + if stream_id > highest_stream_id: + raise NoSuchStreamError(stream_id) from e + raise StreamClosedError(stream_id) from e + + def get_next_available_stream_id(self) -> int: + """ + Returns an integer suitable for use as the stream ID for the next + stream created by this endpoint. For server endpoints, this stream ID + will be even. For client endpoints, this stream ID will be odd. If no + stream IDs are available, raises :class:`NoAvailableStreamIDError + <h2.exceptions.NoAvailableStreamIDError>`. + + .. warning:: The return value from this function does not change until + the stream ID has actually been used by sending or pushing + headers on that stream. For that reason, it should be + called as close as possible to the actual use of the + stream ID. + + .. versionadded:: 2.0.0 + + :raises: :class:`NoAvailableStreamIDError + <h2.exceptions.NoAvailableStreamIDError>` + :returns: The next free stream ID this peer can use to initiate a + stream. + :rtype: ``int`` + """ + # No streams have been opened yet, so return the lowest allowed stream + # ID. + if not self.highest_outbound_stream_id: + next_stream_id = 1 if self.config.client_side else 2 + else: + next_stream_id = self.highest_outbound_stream_id + 2 + self.config.logger.debug( + "Next available stream ID %d", next_stream_id, + ) + if next_stream_id > self.HIGHEST_ALLOWED_STREAM_ID: + msg = "Exhausted allowed stream IDs" + raise NoAvailableStreamIDError(msg) + + return next_stream_id + + def send_headers(self, + stream_id: int, + headers: Iterable[HeaderWeaklyTyped], + end_stream: bool = False, + priority_weight: int | None = None, + priority_depends_on: int | None = None, + priority_exclusive: bool | None = None) -> None: + """ + Send headers on a given stream. + + This function can be used to send request or response headers: the kind + that are sent depends on whether this connection has been opened as a + client or server connection, and whether the stream was opened by the + remote peer or not. + + If this is a client connection, calling ``send_headers`` will send the + headers as a request. It will also implicitly open the stream being + used. If this is a client connection and ``send_headers`` has *already* + been called, this will send trailers instead. + + If this is a server connection, calling ``send_headers`` will send the + headers as a response. It is a protocol error for a server to open a + stream by sending headers. If this is a server connection and + ``send_headers`` has *already* been called, this will send trailers + instead. + + When acting as a server, you may call ``send_headers`` any number of + times allowed by the following rules, in this order: + + - zero or more times with ``(':status', '1XX')`` (where ``1XX`` is a + placeholder for any 100-level status code). + - once with any other status header. + - zero or one time for trailers. + + That is, you are allowed to send as many informational responses as you + like, followed by one complete response and zero or one HTTP trailer + blocks. + + Clients may send one or two header blocks: one request block, and + optionally one trailer block. + + If it is important to send HPACK "never indexed" header fields (as + defined in `RFC 7451 Section 7.1.3 + <https://tools.ietf.org/html/rfc7541#section-7.1.3>`_), the user may + instead provide headers using the HPACK library's :class:`HeaderTuple + <hpack:hpack.HeaderTuple>` and :class:`NeverIndexedHeaderTuple + <hpack:hpack.NeverIndexedHeaderTuple>` objects. + + This method also allows users to prioritize the stream immediately, + by sending priority information on the HEADERS frame directly. To do + this, any one of ``priority_weight``, ``priority_depends_on``, or + ``priority_exclusive`` must be set to a value that is not ``None``. For + more information on the priority fields, see :meth:`prioritize + <h2.connection.H2Connection.prioritize>`. + + .. warning:: In HTTP/2, it is mandatory that all the HTTP/2 special + headers (that is, ones whose header keys begin with ``:``) appear + at the start of the header block, before any normal headers. + + .. versionchanged:: 2.3.0 + Added support for using :class:`HeaderTuple + <hpack:hpack.HeaderTuple>` objects to store headers. + + .. versionchanged:: 2.4.0 + Added the ability to provide priority keyword arguments: + ``priority_weight``, ``priority_depends_on``, and + ``priority_exclusive``. + + :param stream_id: The stream ID to send the headers on. If this stream + does not currently exist, it will be created. + :type stream_id: ``int`` + + :param headers: The request/response headers to send. + :type headers: An iterable of two tuples of bytestrings or + :class:`HeaderTuple <hpack:hpack.HeaderTuple>` objects. + + :param end_stream: Whether this headers frame should end the stream + immediately (that is, whether no more data will be sent after this + frame). Defaults to ``False``. + :type end_stream: ``bool`` + + :param priority_weight: Sets the priority weight of the stream. See + :meth:`prioritize <h2.connection.H2Connection.prioritize>` for more + about how this field works. Defaults to ``None``, which means that + no priority information will be sent. + :type priority_weight: ``int`` or ``None`` + + :param priority_depends_on: Sets which stream this one depends on for + priority purposes. See :meth:`prioritize + <h2.connection.H2Connection.prioritize>` for more about how this + field works. Defaults to ``None``, which means that no priority + information will be sent. + :type priority_depends_on: ``int`` or ``None`` + + :param priority_exclusive: Sets whether this stream exclusively depends + on the stream given in ``priority_depends_on`` for priority + purposes. See :meth:`prioritize + <h2.connection.H2Connection.prioritize>` for more about how this + field workds. Defaults to ``None``, which means that no priority + information will be sent. + :type priority_depends_on: ``bool`` or ``None`` + + :returns: Nothing + """ + self.config.logger.debug( + "Send headers on stream ID %d", stream_id, + ) + + # Check we can open the stream. + if stream_id not in self.streams: + max_open_streams = self.remote_settings.max_concurrent_streams + if (self.open_outbound_streams + 1) > max_open_streams: + msg = f"Max outbound streams is {max_open_streams}, {self.open_outbound_streams} open" + raise TooManyStreamsError(msg) + + self.state_machine.process_input(ConnectionInputs.SEND_HEADERS) + stream = self._get_or_create_stream( + stream_id, AllowedStreamIDs(self.config.client_side), + ) + + frames: list[Frame] = [] + frames.extend(stream.send_headers( + headers, self.encoder, end_stream, + )) + + # We may need to send priority information. + priority_present = ( + (priority_weight is not None) or + (priority_depends_on is not None) or + (priority_exclusive is not None) + ) + + if priority_present: + if not self.config.client_side: + msg = "Servers SHOULD NOT prioritize streams." + raise RFC1122Error(msg) + + headers_frame = frames[0] + assert isinstance(headers_frame, HeadersFrame) + + headers_frame.flags.add("PRIORITY") + frames[0] = _add_frame_priority( + headers_frame, + priority_weight, + priority_depends_on, + priority_exclusive, + ) + + self._prepare_for_sending(frames) + + def send_data(self, + stream_id: int, + data: bytes | memoryview, + end_stream: bool = False, + pad_length: Any = None) -> None: + """ + Send data on a given stream. + + This method does no breaking up of data: if the data is larger than the + value returned by :meth:`local_flow_control_window + <h2.connection.H2Connection.local_flow_control_window>` for this stream + then a :class:`FlowControlError <h2.exceptions.FlowControlError>` will + be raised. If the data is larger than :data:`max_outbound_frame_size + <h2.connection.H2Connection.max_outbound_frame_size>` then a + :class:`FrameTooLargeError <h2.exceptions.FrameTooLargeError>` will be + raised. + + h2 does this to avoid buffering the data internally. If the user + has more data to send than h2 will allow, consider breaking it up + and buffering it externally. + + :param stream_id: The ID of the stream on which to send the data. + :type stream_id: ``int`` + :param data: The data to send on the stream. + :type data: ``bytes`` + :param end_stream: (optional) Whether this is the last data to be sent + on the stream. Defaults to ``False``. + :type end_stream: ``bool`` + :param pad_length: (optional) Length of the padding to apply to the + data frame. Defaults to ``None`` for no use of padding. Note that + a value of ``0`` results in padding of length ``0`` + (with the "padding" flag set on the frame). + + .. versionadded:: 2.6.0 + + :type pad_length: ``int`` + :returns: Nothing + """ + self.config.logger.debug( + "Send data on stream ID %d with len %d", stream_id, len(data), + ) + frame_size = len(data) + if pad_length is not None: + if not isinstance(pad_length, int): + msg = "pad_length must be an int" + raise TypeError(msg) + if pad_length < 0 or pad_length > 255: + msg = "pad_length must be within range: [0, 255]" + raise ValueError(msg) + # Account for padding bytes plus the 1-byte padding length field. + frame_size += pad_length + 1 + self.config.logger.debug( + "Frame size on stream ID %d is %d", stream_id, frame_size, + ) + + if frame_size > self.local_flow_control_window(stream_id): + msg = f"Cannot send {frame_size} bytes, flow control window is {self.local_flow_control_window(stream_id)}" + raise FlowControlError(msg) + if frame_size > self.max_outbound_frame_size: + msg = f"Cannot send frame size {frame_size}, max frame size is {self.max_outbound_frame_size}" + raise FrameTooLargeError(msg) + + self.state_machine.process_input(ConnectionInputs.SEND_DATA) + frames = self.streams[stream_id].send_data( + data, end_stream, pad_length=pad_length, + ) + + self._prepare_for_sending(frames) + + self.outbound_flow_control_window -= frame_size + self.config.logger.debug( + "Outbound flow control window size is %d", + self.outbound_flow_control_window, + ) + assert self.outbound_flow_control_window >= 0 + + def end_stream(self, stream_id: int) -> None: + """ + Cleanly end a given stream. + + This method ends a stream by sending an empty DATA frame on that stream + with the ``END_STREAM`` flag set. + + :param stream_id: The ID of the stream to end. + :type stream_id: ``int`` + :returns: Nothing + """ + self.config.logger.debug("End stream ID %d", stream_id) + self.state_machine.process_input(ConnectionInputs.SEND_DATA) + frames = self.streams[stream_id].end_stream() + self._prepare_for_sending(frames) + + def increment_flow_control_window(self, increment: int, stream_id: int | None = None) -> None: + """ + Increment a flow control window, optionally for a single stream. Allows + the remote peer to send more data. + + .. versionchanged:: 2.0.0 + Rejects attempts to increment the flow control window by out of + range values with a ``ValueError``. + + :param increment: The amount to increment the flow control window by. + :type increment: ``int`` + :param stream_id: (optional) The ID of the stream that should have its + flow control window opened. If not present or ``None``, the + connection flow control window will be opened instead. + :type stream_id: ``int`` or ``None`` + :returns: Nothing + :raises: ``ValueError`` + """ + if not (1 <= increment <= self.MAX_WINDOW_INCREMENT): + msg = f"Flow control increment must be between 1 and {self.MAX_WINDOW_INCREMENT}" + raise ValueError(msg) + + self.state_machine.process_input(ConnectionInputs.SEND_WINDOW_UPDATE) + + if stream_id is not None: + stream = self.streams[stream_id] + frames = stream.increase_flow_control_window( + increment, + ) + + self.config.logger.debug( + "Increase stream ID %d flow control window by %d", + stream_id, increment, + ) + else: + self._inbound_flow_control_window_manager.window_opened(increment) + f = WindowUpdateFrame(0) + f.window_increment = increment + frames = [f] + + self.config.logger.debug( + "Increase connection flow control window by %d", increment, + ) + + self._prepare_for_sending(frames) + + def push_stream(self, + stream_id: int, + promised_stream_id: int, + request_headers: Iterable[HeaderWeaklyTyped]) -> None: + """ + Push a response to the client by sending a PUSH_PROMISE frame. + + If it is important to send HPACK "never indexed" header fields (as + defined in `RFC 7451 Section 7.1.3 + <https://tools.ietf.org/html/rfc7541#section-7.1.3>`_), the user may + instead provide headers using the HPACK library's :class:`HeaderTuple + <hpack:hpack.HeaderTuple>` and :class:`NeverIndexedHeaderTuple + <hpack:hpack.NeverIndexedHeaderTuple>` objects. + + :param stream_id: The ID of the stream that this push is a response to. + :type stream_id: ``int`` + :param promised_stream_id: The ID of the stream that the pushed + response will be sent on. + :type promised_stream_id: ``int`` + :param request_headers: The headers of the request that the pushed + response will be responding to. + :type request_headers: An iterable of two tuples of bytestrings or + :class:`HeaderTuple <hpack:hpack.HeaderTuple>` objects. + :returns: Nothing + """ + self.config.logger.debug( + "Send Push Promise frame on stream ID %d", stream_id, + ) + + if not self.remote_settings.enable_push: + msg = "Remote peer has disabled stream push" + raise ProtocolError(msg) + + self.state_machine.process_input(ConnectionInputs.SEND_PUSH_PROMISE) + stream = self._get_stream_by_id(stream_id) + + # We need to prevent users pushing streams in response to streams that + # they themselves have already pushed: see #163 and RFC 7540 § 6.6. The + # easiest way to do that is to assert that the stream_id is not even: + # this shortcut works because only servers can push and the state + # machine will enforce this. + if (stream_id % 2) == 0: + msg = "Cannot recursively push streams." + raise ProtocolError(msg) + + new_stream = self._begin_new_stream( + promised_stream_id, AllowedStreamIDs.EVEN, + ) + self.streams[promised_stream_id] = new_stream + + frames = stream.push_stream_in_band( + promised_stream_id, request_headers, self.encoder, + ) + new_frames = new_stream.locally_pushed() + self._prepare_for_sending(frames + new_frames) + + def ping(self, opaque_data: bytes | str) -> None: + """ + Send a PING frame. + + :param opaque_data: A bytestring of length 8 that will be sent in the + PING frame. + :returns: Nothing + """ + self.config.logger.debug("Send Ping frame") + + if not isinstance(opaque_data, bytes) or len(opaque_data) != 8: + msg = f"Invalid value for ping data: {opaque_data!r}" + raise ValueError(msg) + + self.state_machine.process_input(ConnectionInputs.SEND_PING) + f = PingFrame(0) + f.opaque_data = opaque_data + self._prepare_for_sending([f]) + + def reset_stream(self, stream_id: int, error_code: ErrorCodes | int = 0) -> None: + """ + Reset a stream. + + This method forcibly closes a stream by sending a RST_STREAM frame for + a given stream. This is not a graceful closure. To gracefully end a + stream, try the :meth:`end_stream + <h2.connection.H2Connection.end_stream>` method. + + :param stream_id: The ID of the stream to reset. + :type stream_id: ``int`` + :param error_code: (optional) The error code to use to reset the + stream. Defaults to :data:`ErrorCodes.NO_ERROR + <h2.errors.ErrorCodes.NO_ERROR>`. + :type error_code: ``int`` + :returns: Nothing + """ + self.config.logger.debug("Reset stream ID %d", stream_id) + self.state_machine.process_input(ConnectionInputs.SEND_RST_STREAM) + stream = self._get_stream_by_id(stream_id) + frames = stream.reset_stream(error_code) + self._prepare_for_sending(frames) + + def close_connection(self, + error_code: ErrorCodes | int = 0, + additional_data: bytes | None = None, + last_stream_id: int | None = None) -> None: + """ + Close a connection, emitting a GOAWAY frame. + + .. versionchanged:: 2.4.0 + Added ``additional_data`` and ``last_stream_id`` arguments. + + :param error_code: (optional) The error code to send in the GOAWAY + frame. + :param additional_data: (optional) Additional debug data indicating + a reason for closing the connection. Must be a bytestring. + :param last_stream_id: (optional) The last stream which was processed + by the sender. Defaults to ``highest_inbound_stream_id``. + :returns: Nothing + """ + self.config.logger.debug("Close connection") + self.state_machine.process_input(ConnectionInputs.SEND_GOAWAY) + + # Additional_data must be bytes + if additional_data is not None: + assert isinstance(additional_data, bytes) + + if last_stream_id is None: + last_stream_id = self.highest_inbound_stream_id + + f = GoAwayFrame( + stream_id=0, + last_stream_id=last_stream_id, + error_code=error_code, + additional_data=(additional_data or b""), + ) + self._prepare_for_sending([f]) + + def update_settings(self, new_settings: dict[SettingCodes | int, int]) -> None: + """ + Update the local settings. This will prepare and emit the appropriate + SETTINGS frame. + + :param new_settings: A dictionary of {setting: new value} + """ + self.config.logger.debug( + "Update connection settings to %s", new_settings, + ) + self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS) + self.local_settings.update(new_settings) + s = SettingsFrame(0) + s.settings = new_settings + self._prepare_for_sending([s]) + + def advertise_alternative_service(self, + field_value: bytes | str, + origin: bytes | None = None, + stream_id: int | None = None) -> None: + """ + Notify a client about an available Alternative Service. + + An Alternative Service is defined in `RFC 7838 + <https://tools.ietf.org/html/rfc7838>`_. An Alternative Service + notification informs a client that a given origin is also available + elsewhere. + + Alternative Services can be advertised in two ways. Firstly, they can + be advertised explicitly: that is, a server can say "origin X is also + available at Y". To advertise like this, set the ``origin`` argument + and not the ``stream_id`` argument. Alternatively, they can be + advertised implicitly: that is, a server can say "the origin you're + contacting on stream X is also available at Y". To advertise like this, + set the ``stream_id`` argument and not the ``origin`` argument. + + The explicit method of advertising can be done as long as the + connection is active. The implicit method can only be done after the + client has sent the request headers and before the server has sent the + response headers: outside of those points, h2 will forbid sending + the Alternative Service advertisement by raising a ProtocolError. + + The ``field_value`` parameter is specified in RFC 7838. h2 does + not validate or introspect this argument: the user is required to + ensure that it's well-formed. ``field_value`` corresponds to RFC 7838's + "Alternative Service Field Value". + + .. note:: It is strongly preferred to use the explicit method of + advertising Alternative Services. The implicit method of + advertising Alternative Services has a number of subtleties + and can lead to inconsistencies between the server and + client. h2 allows both mechanisms, but caution is + strongly advised. + + .. versionadded:: 2.3.0 + + :param field_value: The RFC 7838 Alternative Service Field Value. This + argument is not introspected by h2: the user is responsible + for ensuring that it is well-formed. + :type field_value: ``bytes`` + + :param origin: The origin/authority to which the Alternative Service + being advertised applies. Must not be provided at the same time as + ``stream_id``. + :type origin: ``bytes`` or ``None`` + + :param stream_id: The ID of the stream which was sent to the authority + for which this Alternative Service advertisement applies. Must not + be provided at the same time as ``origin``. + :type stream_id: ``int`` or ``None`` + + :returns: Nothing. + """ + if not isinstance(field_value, bytes): + msg = "Field must be bytestring." + raise ValueError(msg) # noqa: TRY004 + + if origin is not None and stream_id is not None: + msg = "Must not provide both origin and stream_id" + raise ValueError(msg) + + self.state_machine.process_input( + ConnectionInputs.SEND_ALTERNATIVE_SERVICE, + ) + + if origin is not None: + # This ALTSVC is sent on stream zero. + f = AltSvcFrame(stream_id=0) + f.origin = origin + f.field = field_value + frames: list[Frame] = [f] + else: + stream = self._get_stream_by_id(stream_id) + frames = stream.advertise_alternative_service(field_value) + + self._prepare_for_sending(frames) + + def prioritize(self, + stream_id: int, + weight: int | None = None, + depends_on: int | None = None, + exclusive: bool | None = None) -> None: + """ + Notify a server about the priority of a stream. + + Stream priorities are a form of guidance to a remote server: they + inform the server about how important a given response is, so that the + server may allocate its resources (e.g. bandwidth, CPU time, etc.) + accordingly. This exists to allow clients to ensure that the most + important data arrives earlier, while less important data does not + starve out the more important data. + + Stream priorities are explained in depth in `RFC 7540 Section 5.3 + <https://tools.ietf.org/html/rfc7540#section-5.3>`_. + + This method updates the priority information of a single stream. It may + be called well before a stream is actively in use, or well after a + stream is closed. + + .. warning:: RFC 7540 allows for servers to change the priority of + streams. However, h2 **does not** allow server + stacks to do this. This is because most clients do not + adequately know how to respond when provided conflicting + priority information, and relatively little utility is + provided by making that functionality available. + + .. note:: h2 **does not** maintain any information about the + RFC 7540 priority tree. That means that h2 does not + prevent incautious users from creating invalid priority + trees, particularly by creating priority loops. While some + basic error checking is provided by h2, users are + strongly recommended to understand their prioritisation + strategies before using the priority tools here. + + .. note:: Priority information is strictly advisory. Servers are + allowed to disregard it entirely. Avoid relying on the idea + that your priority signaling will definitely be obeyed. + + .. versionadded:: 2.4.0 + + :param stream_id: The ID of the stream to prioritize. + :type stream_id: ``int`` + + :param weight: The weight to give the stream. Defaults to ``16``, the + default weight of any stream. May be any value between ``1`` and + ``256`` inclusive. The relative weight of a stream indicates what + proportion of available resources will be allocated to that + stream. + :type weight: ``int`` + + :param depends_on: The ID of the stream on which this stream depends. + This stream will only be progressed if it is impossible to + progress the parent stream (the one on which this one depends). + Passing the value ``0`` means that this stream does not depend on + any other. Defaults to ``0``. + :type depends_on: ``int`` + + :param exclusive: Whether this stream is an exclusive dependency of its + "parent" stream (i.e. the stream given by ``depends_on``). If a + stream is an exclusive dependency of another, that means that all + previously-set children of the parent are moved to become children + of the new exclusively-dependent stream. Defaults to ``False``. + :type exclusive: ``bool`` + """ + if not self.config.client_side: + msg = "Servers SHOULD NOT prioritize streams." + raise RFC1122Error(msg) + + self.state_machine.process_input( + ConnectionInputs.SEND_PRIORITY, + ) + + frame = PriorityFrame(stream_id) + frame_prio = _add_frame_priority(frame, weight, depends_on, exclusive) + + self._prepare_for_sending([frame_prio]) + + def local_flow_control_window(self, stream_id: int) -> int: + """ + Returns the maximum amount of data that can be sent on stream + ``stream_id``. + + This value will never be larger than the total data that can be sent on + the connection: even if the given stream allows more data, the + connection window provides a logical maximum to the amount of data that + can be sent. + + The maximum data that can be sent in a single data frame on a stream + is either this value, or the maximum frame size, whichever is + *smaller*. + + :param stream_id: The ID of the stream whose flow control window is + being queried. + :type stream_id: ``int`` + :returns: The amount of data in bytes that can be sent on the stream + before the flow control window is exhausted. + :rtype: ``int`` + """ + stream = self._get_stream_by_id(stream_id) + return min( + self.outbound_flow_control_window, + stream.outbound_flow_control_window, + ) + + def remote_flow_control_window(self, stream_id: int) -> int: + """ + Returns the maximum amount of data the remote peer can send on stream + ``stream_id``. + + This value will never be larger than the total data that can be sent on + the connection: even if the given stream allows more data, the + connection window provides a logical maximum to the amount of data that + can be sent. + + The maximum data that can be sent in a single data frame on a stream + is either this value, or the maximum frame size, whichever is + *smaller*. + + :param stream_id: The ID of the stream whose flow control window is + being queried. + :type stream_id: ``int`` + :returns: The amount of data in bytes that can be received on the + stream before the flow control window is exhausted. + :rtype: ``int`` + """ + stream = self._get_stream_by_id(stream_id) + return min( + self.inbound_flow_control_window, + stream.inbound_flow_control_window, + ) + + def acknowledge_received_data(self, acknowledged_size: int, stream_id: int) -> None: + """ + Inform the :class:`H2Connection <h2.connection.H2Connection>` that a + certain number of flow-controlled bytes have been processed, and that + the space should be handed back to the remote peer at an opportune + time. + + .. versionadded:: 2.5.0 + + :param acknowledged_size: The total *flow-controlled size* of the data + that has been processed. Note that this must include the amount of + padding that was sent with that data. + :type acknowledged_size: ``int`` + :param stream_id: The ID of the stream on which this data was received. + :type stream_id: ``int`` + :returns: Nothing + :rtype: ``None`` + """ + self.config.logger.debug( + "Ack received data on stream ID %d with size %d", + stream_id, acknowledged_size, + ) + if stream_id <= 0: + msg = f"Stream ID {stream_id} is not valid for acknowledge_received_data" + raise ValueError(msg) + if acknowledged_size < 0: + msg = "Cannot acknowledge negative data" + raise ValueError(msg) + + frames: list[Frame] = [] + + conn_manager = self._inbound_flow_control_window_manager + conn_increment = conn_manager.process_bytes(acknowledged_size) + if conn_increment: + f = WindowUpdateFrame(0) + f.window_increment = conn_increment + frames.append(f) + + try: + stream = self._get_stream_by_id(stream_id) + except StreamClosedError: + # The stream is already gone. We're not worried about incrementing + # the window in this case. + pass + else: + # No point incrementing the windows of closed streams. + if stream.open: + frames.extend( + stream.acknowledge_received_data(acknowledged_size), + ) + + self._prepare_for_sending(frames) + + def data_to_send(self, amount: int | None = None) -> bytes: + """ + Returns some data for sending out of the internal data buffer. + + This method is analogous to ``read`` on a file-like object, but it + doesn't block. Instead, it returns as much data as the user asks for, + or less if that much data is not available. It does not perform any + I/O, and so uses a different name. + + :param amount: (optional) The maximum amount of data to return. If not + set, or set to ``None``, will return as much data as possible. + :type amount: ``int`` + :returns: A bytestring containing the data to send on the wire. + :rtype: ``bytes`` + """ + if amount is None: + data = bytes(self._data_to_send) + self._data_to_send = bytearray() + return data + data = bytes(self._data_to_send[:amount]) + self._data_to_send = self._data_to_send[amount:] + return data + + def clear_outbound_data_buffer(self) -> None: + """ + Clears the outbound data buffer, such that if this call was immediately + followed by a call to + :meth:`data_to_send <h2.connection.H2Connection.data_to_send>`, that + call would return no data. + + This method should not normally be used, but is made available to avoid + exposing implementation details. + """ + self._data_to_send = bytearray() + + def _acknowledge_settings(self) -> list[Frame]: + """ + Acknowledge settings that have been received. + + .. versionchanged:: 2.0.0 + Removed from public API, removed useless ``event`` parameter, made + automatic. + + :returns: Nothing + """ + self.state_machine.process_input(ConnectionInputs.SEND_SETTINGS) + + changes = self.remote_settings.acknowledge() + + if SettingCodes.INITIAL_WINDOW_SIZE in changes: + setting = changes[SettingCodes.INITIAL_WINDOW_SIZE] + self._flow_control_change_from_settings( + setting.original_value, + setting.new_value, + ) + + # HEADER_TABLE_SIZE changes by the remote part affect our encoder: cf. + # RFC 7540 Section 6.5.2. + if SettingCodes.HEADER_TABLE_SIZE in changes: + setting = changes[SettingCodes.HEADER_TABLE_SIZE] + self.encoder.header_table_size = setting.new_value + + if SettingCodes.MAX_FRAME_SIZE in changes: + setting = changes[SettingCodes.MAX_FRAME_SIZE] + self.max_outbound_frame_size = setting.new_value + for stream in self.streams.values(): + stream.max_outbound_frame_size = setting.new_value + + f = SettingsFrame(0) + f.flags.add("ACK") + return [f] + + def _flow_control_change_from_settings(self, old_value: int | None, new_value: int) -> None: + """ + Update flow control windows in response to a change in the value of + SETTINGS_INITIAL_WINDOW_SIZE. + + When this setting is changed, it automatically updates all flow control + windows by the delta in the settings values. Note that it does not + increment the *connection* flow control window, per section 6.9.2 of + RFC 7540. + """ + delta = new_value - (old_value or 0) + + for stream in self.streams.values(): + stream.outbound_flow_control_window = guard_increment_window( + stream.outbound_flow_control_window, + delta, + ) + + def _inbound_flow_control_change_from_settings(self, old_value: int | None, new_value: int) -> None: + """ + Update remote flow control windows in response to a change in the value + of SETTINGS_INITIAL_WINDOW_SIZE. + + When this setting is changed, it automatically updates all remote flow + control windows by the delta in the settings values. + """ + delta = new_value - (old_value or 0) + + for stream in self.streams.values(): + stream._inbound_flow_control_change_from_settings(delta) + + def receive_data(self, data: bytes) -> list[Event]: + """ + Pass some received HTTP/2 data to the connection for handling. + + :param data: The data received from the remote peer on the network. + :type data: ``bytes`` + :returns: A list of events that the remote peer triggered by sending + this data. + """ + self.config.logger.trace( + "Process received data on connection. Received data: %r", data, + ) + + events: list[Event] = [] + self.incoming_buffer.add_data(data) + self.incoming_buffer.max_frame_size = self.max_inbound_frame_size + + try: + for frame in self.incoming_buffer: + events.extend(self._receive_frame(frame)) + except InvalidPaddingError as e: + self._terminate_connection(ErrorCodes.PROTOCOL_ERROR) + msg = "Received frame with invalid padding." + raise ProtocolError(msg) from e + except ProtocolError as e: + # For whatever reason, receiving the frame caused a protocol error. + # We should prepare to emit a GoAway frame before throwing the + # exception up further. No need for an event: the exception will + # do fine. + self._terminate_connection(e.error_code) + raise + + return events + + def _receive_frame(self, frame: Frame) -> list[Event]: + """ + Handle a frame received on the connection. + + .. versionchanged:: 2.0.0 + Removed from the public API. + """ + events: list[Event] + self.config.logger.trace("Received frame: %s", repr(frame)) + try: + # I don't love using __class__ here, maybe reconsider it. + frames, events = self._frame_dispatch_table[frame.__class__](frame) + except StreamClosedError as e: + # If the stream was closed by RST_STREAM, we just send a RST_STREAM + # to the remote peer. Otherwise, this is a connection error, and so + # we will re-raise to trigger one. + if self._stream_is_closed_by_reset(e.stream_id): + f = RstStreamFrame(e.stream_id) + f.error_code = e.error_code + self._prepare_for_sending([f]) + events = e._events + else: + raise + except StreamIDTooLowError as e: + # The stream ID seems invalid. This may happen when the closed + # stream has been cleaned up, or when the remote peer has opened a + # new stream with a higher stream ID than this one, forcing it + # closed implicitly. + # + # Check how the stream was closed: depending on the mechanism, it + # is either a stream error or a connection error. + if self._stream_is_closed_by_reset(e.stream_id): + # Closed by RST_STREAM is a stream error. + f = RstStreamFrame(e.stream_id) + f.error_code = ErrorCodes.STREAM_CLOSED + self._prepare_for_sending([f]) + events = [] + elif self._stream_is_closed_by_end(e.stream_id): + # Closed by END_STREAM is a connection error. + raise StreamClosedError(e.stream_id) from e + else: + # Closed implicitly, also a connection error, but of type + # PROTOCOL_ERROR. + raise + else: + self._prepare_for_sending(frames) + + return events + + def _terminate_connection(self, error_code: ErrorCodes) -> None: + """ + Terminate the connection early. Used in error handling blocks to send + GOAWAY frames. + """ + f = GoAwayFrame(0) + f.last_stream_id = self.highest_inbound_stream_id + f.error_code = error_code + self.state_machine.process_input(ConnectionInputs.SEND_GOAWAY) + self._prepare_for_sending([f]) + + def _receive_headers_frame(self, frame: HeadersFrame) -> tuple[list[Frame], list[Event]]: + """ + Receive a headers frame on the connection. + """ + # If necessary, check we can open the stream. Also validate that the + # stream ID is valid. + if frame.stream_id not in self.streams: + max_open_streams = self.local_settings.max_concurrent_streams + if (self.open_inbound_streams + 1) > max_open_streams: + msg = f"Max outbound streams is {max_open_streams}, {self.open_outbound_streams} open" + raise TooManyStreamsError(msg) + + # Let's decode the headers. We handle headers as bytes internally up + # until we hang them off the event, at which point we may optionally + # convert them to unicode. + headers = _decode_headers(self.decoder, frame.data) + + events = self.state_machine.process_input( + ConnectionInputs.RECV_HEADERS, + ) + stream = self._get_or_create_stream( + frame.stream_id, AllowedStreamIDs(not self.config.client_side), + ) + frames, stream_events = stream.receive_headers( + headers, + "END_STREAM" in frame.flags, + self.config.header_encoding, + ) + + if "PRIORITY" in frame.flags: + p_frames, p_events = self._receive_priority_frame(frame) + expected_frame_types = (RequestReceived, ResponseReceived, TrailersReceived, InformationalResponseReceived) + assert isinstance(stream_events[0], expected_frame_types) + assert isinstance(p_events[0], PriorityUpdated) + stream_events[0].priority_updated = p_events[0] + stream_events.extend(p_events) + assert not p_frames + + return frames, events + stream_events + + def _receive_push_promise_frame(self, frame: PushPromiseFrame) -> tuple[list[Frame], list[Event]]: + """ + Receive a push-promise frame on the connection. + """ + if not self.local_settings.enable_push: + msg = "Received pushed stream" + raise ProtocolError(msg) + + pushed_headers = _decode_headers(self.decoder, frame.data) + + events = self.state_machine.process_input( + ConnectionInputs.RECV_PUSH_PROMISE, + ) + + try: + stream = self._get_stream_by_id(frame.stream_id) + except NoSuchStreamError as e: + # We need to check if the parent stream was reset by us. If it was + # then we presume that the PUSH_PROMISE was in flight when we reset + # the parent stream. Rather than accept the new stream, just reset + # it. + # + # If this was closed naturally, however, we should call this a + # PROTOCOL_ERROR: pushing a stream on a naturally closed stream is + # a real problem because it creates a brand new stream that the + # remote peer now believes exists. + if (self._stream_closed_by(frame.stream_id) == + StreamClosedBy.SEND_RST_STREAM): + f = RstStreamFrame(frame.promised_stream_id) + f.error_code = ErrorCodes.REFUSED_STREAM + return [f], events + + msg = "Attempted to push on closed stream." + raise ProtocolError(msg) from e + + # We need to prevent peers pushing streams in response to streams that + # they themselves have already pushed: see #163 and RFC 7540 § 6.6. The + # easiest way to do that is to assert that the stream_id is not even: + # this shortcut works because only servers can push and the state + # machine will enforce this. + if (frame.stream_id % 2) == 0: + msg = "Cannot recursively push streams." + raise ProtocolError(msg) + + try: + frames, stream_events = stream.receive_push_promise_in_band( + frame.promised_stream_id, + pushed_headers, + self.config.header_encoding, + ) + except StreamClosedError: + # The parent stream was reset by us, so we presume that + # PUSH_PROMISE was in flight when we reset the parent stream. + # So we just reset the new stream. + f = RstStreamFrame(frame.promised_stream_id) + f.error_code = ErrorCodes.REFUSED_STREAM + return [f], events + + new_stream = self._begin_new_stream( + frame.promised_stream_id, AllowedStreamIDs.EVEN, + ) + self.streams[frame.promised_stream_id] = new_stream + new_stream.remotely_pushed(pushed_headers) + + return frames, events + stream_events + + def _handle_data_on_closed_stream(self, + events: list[Event], + exc: StreamClosedError, + frame: DataFrame) -> tuple[list[Frame], list[Event]]: + # This stream is already closed - and yet we received a DATA frame. + # The received DATA frame counts towards the connection flow window. + # We need to manually to acknowledge the DATA frame to update the flow + # window of the connection. Otherwise the whole connection stalls due + # the inbound flow window being 0. + frames: list[Frame] = [] + conn_manager = self._inbound_flow_control_window_manager + conn_increment = conn_manager.process_bytes( + frame.flow_controlled_length, + ) + + if conn_increment: + window_update_frame = WindowUpdateFrame(0) + window_update_frame.window_increment = conn_increment + frames.append(window_update_frame) + self.config.logger.debug( + "Received DATA frame on closed stream %d - " + "auto-emitted a WINDOW_UPDATE by %d", + frame.stream_id, conn_increment, + ) + + rst_stream_frame = RstStreamFrame(exc.stream_id) + rst_stream_frame.error_code = exc.error_code + frames.append(rst_stream_frame) + self.config.logger.debug( + "Stream %s already CLOSED or cleaned up - auto-emitted a RST_FRAME", + frame.stream_id, + ) + return frames, events + exc._events + + def _receive_data_frame(self, frame: DataFrame) -> tuple[list[Frame], list[Event]]: + """ + Receive a data frame on the connection. + """ + flow_controlled_length = frame.flow_controlled_length + + events = self.state_machine.process_input( + ConnectionInputs.RECV_DATA, + ) + self._inbound_flow_control_window_manager.window_consumed( + flow_controlled_length, + ) + + try: + stream = self._get_stream_by_id(frame.stream_id) + frames, stream_events = stream.receive_data( + frame.data, + "END_STREAM" in frame.flags, + flow_controlled_length, + ) + except StreamClosedError as e: + # This stream is either marked as CLOSED or already gone from our + # internal state. + return self._handle_data_on_closed_stream(events, e, frame) + + return frames, events + stream_events + + def _receive_settings_frame(self, frame: SettingsFrame) -> tuple[list[Frame], list[Event]]: + """ + Receive a SETTINGS frame on the connection. + """ + events = self.state_machine.process_input( + ConnectionInputs.RECV_SETTINGS, + ) + + # This is an ack of the local settings. + if "ACK" in frame.flags: + changed_settings = self._local_settings_acked() + ack_event = SettingsAcknowledged() + ack_event.changed_settings = changed_settings + events.append(ack_event) + return [], events + + # Add the new settings. + self.remote_settings.update(frame.settings) + events.append( + RemoteSettingsChanged.from_settings( + self.remote_settings, frame.settings, + ), + ) + frames = self._acknowledge_settings() + + return frames, events + + def _receive_window_update_frame(self, frame: WindowUpdateFrame) -> tuple[list[Frame], list[Event]]: + """ + Receive a WINDOW_UPDATE frame on the connection. + """ + # hyperframe will take care of validating the window_increment. + # If we reach in here, we can assume a valid value. + + events = self.state_machine.process_input( + ConnectionInputs.RECV_WINDOW_UPDATE, + ) + + if frame.stream_id: + try: + stream = self._get_stream_by_id(frame.stream_id) + frames, stream_events = stream.receive_window_update( + frame.window_increment, + ) + except StreamClosedError: + return [], events + else: + # Increment our local flow control window. + self.outbound_flow_control_window = guard_increment_window( + self.outbound_flow_control_window, + frame.window_increment, + ) + + # FIXME: Should we split this into one event per active stream? + window_updated_event = WindowUpdated() + window_updated_event.stream_id = 0 + window_updated_event.delta = frame.window_increment + stream_events = [window_updated_event] + frames = [] + + return frames, events + stream_events + + def _receive_ping_frame(self, frame: PingFrame) -> tuple[list[Frame], list[Event]]: + """ + Receive a PING frame on the connection. + """ + events = self.state_machine.process_input( + ConnectionInputs.RECV_PING, + ) + frames: list[Frame] = [] + + evt: PingReceived | PingAckReceived + if "ACK" in frame.flags: + evt = PingAckReceived() + else: + evt = PingReceived() + + # automatically ACK the PING with the same 'opaque data' + f = PingFrame(0) + f.flags.add("ACK") + f.opaque_data = frame.opaque_data + frames.append(f) + + evt.ping_data = frame.opaque_data + events.append(evt) + + return frames, events + + def _receive_rst_stream_frame(self, frame: RstStreamFrame) -> tuple[list[Frame], list[Event]]: + """ + Receive a RST_STREAM frame on the connection. + """ + events = self.state_machine.process_input( + ConnectionInputs.RECV_RST_STREAM, + ) + try: + stream = self._get_stream_by_id(frame.stream_id) + except NoSuchStreamError: + # The stream is missing. That's ok, we just do nothing here. + stream_frames: list[Frame] = [] + stream_events: list[Event] = [] + else: + stream_frames, stream_events = stream.stream_reset(frame) + + return stream_frames, events + stream_events + + def _receive_priority_frame(self, frame: HeadersFrame | PriorityFrame) -> tuple[list[Frame], list[Event]]: + """ + Receive a PRIORITY frame on the connection. + """ + events = self.state_machine.process_input( + ConnectionInputs.RECV_PRIORITY, + ) + + event = PriorityUpdated() + event.stream_id = frame.stream_id + event.depends_on = frame.depends_on + event.exclusive = frame.exclusive + + # Weight is an integer between 1 and 256, but the byte only allows + # 0 to 255: add one. + event.weight = frame.stream_weight + 1 + + # A stream may not depend on itself. + if event.depends_on == frame.stream_id: + msg = f"Stream {frame.stream_id} may not depend on itself" + raise ProtocolError(msg) + events.append(event) + + return [], events + + def _receive_goaway_frame(self, frame: GoAwayFrame) -> tuple[list[Frame], list[Event]]: + """ + Receive a GOAWAY frame on the connection. + """ + events = self.state_machine.process_input( + ConnectionInputs.RECV_GOAWAY, + ) + + # Clear the outbound data buffer: we cannot send further data now. + self.clear_outbound_data_buffer() + + # Fire an appropriate ConnectionTerminated event. + new_event = ConnectionTerminated() + new_event.error_code = _error_code_from_int(frame.error_code) + new_event.last_stream_id = frame.last_stream_id + new_event.additional_data = (frame.additional_data + if frame.additional_data else None) + events.append(new_event) + + return [], events + + def _receive_naked_continuation(self, frame: ContinuationFrame) -> None: + """ + A naked CONTINUATION frame has been received. This is always an error, + but the type of error it is depends on the state of the stream and must + transition the state of the stream, so we need to pass it to the + appropriate stream. + """ + stream = self._get_stream_by_id(frame.stream_id) + stream.receive_continuation() + msg = "Should not be reachable" # pragma: no cover + raise AssertionError(msg) # pragma: no cover + + def _receive_alt_svc_frame(self, frame: AltSvcFrame) -> tuple[list[Frame], list[Event]]: + """ + An ALTSVC frame has been received. This frame, specified in RFC 7838, + is used to advertise alternative places where the same service can be + reached. + + This frame can optionally be received either on a stream or on stream + 0, and its semantics are different in each case. + """ + events = self.state_machine.process_input( + ConnectionInputs.RECV_ALTERNATIVE_SERVICE, + ) + frames = [] + + if frame.stream_id: + # Given that it makes no sense to receive ALTSVC on a stream + # before that stream has been opened with a HEADERS frame, the + # ALTSVC frame cannot create a stream. If the stream is not + # present, we simply ignore the frame. + try: + stream = self._get_stream_by_id(frame.stream_id) + except (NoSuchStreamError, StreamClosedError): + pass + else: + stream_frames, stream_events = stream.receive_alt_svc(frame) + frames.extend(stream_frames) + events.extend(stream_events) + else: + # This frame is sent on stream 0. The origin field on the frame + # must be present, though if it isn't it's not a ProtocolError + # (annoyingly), we just need to ignore it. + if not frame.origin: + return frames, events + + # If we're a server, we want to ignore this (RFC 7838 says so). + if not self.config.client_side: + return frames, events + + event = AlternativeServiceAvailable() + event.origin = frame.origin + event.field_value = frame.field + events.append(event) + + return frames, events + + def _receive_unknown_frame(self, frame: ExtensionFrame) -> tuple[list[Frame], list[Event]]: + """ + We have received a frame that we do not understand. This is almost + certainly an extension frame, though it's impossible to be entirely + sure. + + RFC 7540 § 5.5 says that we MUST ignore unknown frame types: so we + do. We do notify the user that we received one, however. + """ + # All we do here is log. + self.config.logger.debug( + "Received unknown extension frame (ID %d)", frame.stream_id, + ) + event = UnknownFrameReceived() + event.frame = frame + return [], [event] + + def _local_settings_acked(self) -> dict[SettingCodes | int, ChangedSetting]: + """ + Handle the local settings being ACKed, update internal state. + """ + changes = self.local_settings.acknowledge() + + if SettingCodes.INITIAL_WINDOW_SIZE in changes: + setting = changes[SettingCodes.INITIAL_WINDOW_SIZE] + self._inbound_flow_control_change_from_settings( + setting.original_value, + setting.new_value, + ) + + if SettingCodes.MAX_HEADER_LIST_SIZE in changes: + setting = changes[SettingCodes.MAX_HEADER_LIST_SIZE] + self.decoder.max_header_list_size = setting.new_value + + if SettingCodes.MAX_FRAME_SIZE in changes: + setting = changes[SettingCodes.MAX_FRAME_SIZE] + self.max_inbound_frame_size = setting.new_value + + if SettingCodes.HEADER_TABLE_SIZE in changes: + setting = changes[SettingCodes.HEADER_TABLE_SIZE] + # This is safe across all hpack versions: some versions just won't + # respect it. + self.decoder.max_allowed_table_size = setting.new_value + + return changes + + def _stream_id_is_outbound(self, stream_id: int) -> bool: + """ + Returns ``True`` if the stream ID corresponds to an outbound stream + (one initiated by this peer), returns ``False`` otherwise. + """ + return (stream_id % 2 == int(self.config.client_side)) + + def _stream_closed_by(self, stream_id: int) -> StreamClosedBy | None: + """ + Returns how the stream was closed. + + The return value will be either a member of + ``h2.stream.StreamClosedBy`` or ``None``. If ``None``, the stream was + closed implicitly by the peer opening a stream with a higher stream ID + before opening this one. + """ + if stream_id in self.streams: + return self.streams[stream_id].closed_by + if stream_id in self._closed_streams: + return self._closed_streams[stream_id] + return None + + def _stream_is_closed_by_reset(self, stream_id: int) -> bool: + """ + Returns ``True`` if the stream was closed by sending or receiving a + RST_STREAM frame. Returns ``False`` otherwise. + """ + return self._stream_closed_by(stream_id) in ( + StreamClosedBy.RECV_RST_STREAM, StreamClosedBy.SEND_RST_STREAM, + ) + + def _stream_is_closed_by_end(self, stream_id: int) -> bool: + """ + Returns ``True`` if the stream was closed by sending or receiving an + END_STREAM flag in a HEADERS or DATA frame. Returns ``False`` + otherwise. + """ + return self._stream_closed_by(stream_id) in ( + StreamClosedBy.RECV_END_STREAM, StreamClosedBy.SEND_END_STREAM, + ) + + +def _add_frame_priority(frame: PriorityFrame | HeadersFrame, + weight: int | None = None, + depends_on: int | None = None, + exclusive: bool | None = None) -> PriorityFrame | HeadersFrame: + """ + Adds priority data to a given frame. Does not change any flags set on that + frame: if the caller is adding priority information to a HEADERS frame they + must set that themselves. + + This method also deliberately sets defaults for anything missing. + + This method validates the input values. + """ + # A stream may not depend on itself. + if depends_on == frame.stream_id: + msg = f"Stream {frame.stream_id} may not depend on itself" + raise ProtocolError(msg) + + # Weight must be between 1 and 256. + if weight is not None: + if weight > 256 or weight < 1: + msg = f"Weight must be between 1 and 256, not {weight}" + raise ProtocolError(msg) + # Weight is an integer between 1 and 256, but the byte only allows + # 0 to 255: subtract one. + weight -= 1 + + # Set defaults for anything not provided. + weight = weight if weight is not None else 15 + depends_on = depends_on if depends_on is not None else 0 + exclusive = exclusive if exclusive is not None else False + + frame.stream_weight = weight + frame.depends_on = depends_on + frame.exclusive = exclusive + + return frame + + +def _decode_headers(decoder: Decoder, encoded_header_block: bytes) -> Iterable[Header]: + """ + Decode a HPACK-encoded header block, translating HPACK exceptions into + sensible h2 errors. + + This only ever returns bytestring headers: h2 may emit them as + unicode later, but internally it processes them as bytestrings only. + """ + try: + return decoder.decode(encoded_header_block, raw=True) + except OversizedHeaderListError as e: + # This is a symptom of a HPACK bomb attack: the user has + # disregarded our requirements on how large a header block we'll + # accept. + msg = f"Oversized header block: {e}" + raise DenialOfServiceError(msg) from e + except (HPACKError, IndexError, TypeError, UnicodeDecodeError) as e: + # We should only need HPACKError here, but versions of HPACK older + # than 2.1.0 throw all three others as well. For maximum + # compatibility, catch all of them. + msg = f"Error decoding header block: {e}" + raise ProtocolError(msg) from e diff --git a/.venv/lib/python3.12/site-packages/h2/errors.py b/.venv/lib/python3.12/site-packages/h2/errors.py new file mode 100644 index 00000000..24ebe00f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/errors.py @@ -0,0 +1,77 @@ +""" +h2/errors +~~~~~~~~~ + +Global error code registry containing the established HTTP/2 error codes. + +The current registry is available at: +https://tools.ietf.org/html/rfc7540#section-11.4 +""" +from __future__ import annotations + +import enum + + +class ErrorCodes(enum.IntEnum): + """ + All known HTTP/2 error codes. + + .. versionadded:: 2.5.0 + """ + + #: Graceful shutdown. + NO_ERROR = 0x0 + + #: Protocol error detected. + PROTOCOL_ERROR = 0x1 + + #: Implementation fault. + INTERNAL_ERROR = 0x2 + + #: Flow-control limits exceeded. + FLOW_CONTROL_ERROR = 0x3 + + #: Settings not acknowledged. + SETTINGS_TIMEOUT = 0x4 + + #: Frame received for closed stream. + STREAM_CLOSED = 0x5 + + #: Frame size incorrect. + FRAME_SIZE_ERROR = 0x6 + + #: Stream not processed. + REFUSED_STREAM = 0x7 + + #: Stream cancelled. + CANCEL = 0x8 + + #: Compression state not updated. + COMPRESSION_ERROR = 0x9 + + #: TCP connection error for CONNECT method. + CONNECT_ERROR = 0xa + + #: Processing capacity exceeded. + ENHANCE_YOUR_CALM = 0xb + + #: Negotiated TLS parameters not acceptable. + INADEQUATE_SECURITY = 0xc + + #: Use HTTP/1.1 for the request. + HTTP_1_1_REQUIRED = 0xd + + +def _error_code_from_int(code: int) -> ErrorCodes | int: + """ + Given an integer error code, returns either one of :class:`ErrorCodes + <h2.errors.ErrorCodes>` or, if not present in the known set of codes, + returns the integer directly. + """ + try: + return ErrorCodes(code) + except ValueError: + return code + + +__all__ = ["ErrorCodes"] diff --git a/.venv/lib/python3.12/site-packages/h2/events.py b/.venv/lib/python3.12/site-packages/h2/events.py new file mode 100644 index 00000000..b81fd1a6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/events.py @@ -0,0 +1,639 @@ +""" +h2/events +~~~~~~~~~ + +Defines Event types for HTTP/2. + +Events are returned by the H2 state machine to allow implementations to keep +track of events triggered by receiving data. Each time data is provided to the +H2 state machine it processes the data and returns a list of Event objects. +""" +from __future__ import annotations + +import binascii +from typing import TYPE_CHECKING + +from .settings import ChangedSetting, SettingCodes, Settings, _setting_code_from_int + +if TYPE_CHECKING: # pragma: no cover + from hpack import HeaderTuple + from hyperframe.frame import Frame + + from .errors import ErrorCodes + + +class Event: + """ + Base class for h2 events. + """ + + + +class RequestReceived(Event): + """ + The RequestReceived event is fired whenever all of a request's headers + are received. This event carries the HTTP headers for the given request + and the stream ID of the new stream. + + In HTTP/2, headers may be sent as a HEADERS frame followed by zero or more + CONTINUATION frames with the final frame setting the END_HEADERS flag. + This event is fired after the entire sequence is received. + + .. versionchanged:: 2.3.0 + Changed the type of ``headers`` to :class:`HeaderTuple + <hpack:hpack.HeaderTuple>`. This has no effect on current users. + + .. versionchanged:: 2.4.0 + Added ``stream_ended`` and ``priority_updated`` properties. + """ + + def __init__(self) -> None: + #: The Stream ID for the stream this request was made on. + self.stream_id: int | None = None + + #: The request headers. + self.headers: list[HeaderTuple] | None = None + + #: If this request also ended the stream, the associated + #: :class:`StreamEnded <h2.events.StreamEnded>` event will be available + #: here. + #: + #: .. versionadded:: 2.4.0 + self.stream_ended: StreamEnded | None = None + + #: If this request also had associated priority information, the + #: associated :class:`PriorityUpdated <h2.events.PriorityUpdated>` + #: event will be available here. + #: + #: .. versionadded:: 2.4.0 + self.priority_updated: PriorityUpdated | None = None + + def __repr__(self) -> str: + return f"<RequestReceived stream_id:{self.stream_id}, headers:{self.headers}>" + + +class ResponseReceived(Event): + """ + The ResponseReceived event is fired whenever response headers are received. + This event carries the HTTP headers for the given response and the stream + ID of the new stream. + + .. versionchanged:: 2.3.0 + Changed the type of ``headers`` to :class:`HeaderTuple + <hpack:hpack.HeaderTuple>`. This has no effect on current users. + + .. versionchanged:: 2.4.0 + Added ``stream_ended`` and ``priority_updated`` properties. + """ + + def __init__(self) -> None: + #: The Stream ID for the stream this response was made on. + self.stream_id: int | None = None + + #: The response headers. + self.headers: list[HeaderTuple] | None = None + + #: If this response also ended the stream, the associated + #: :class:`StreamEnded <h2.events.StreamEnded>` event will be available + #: here. + #: + #: .. versionadded:: 2.4.0 + self.stream_ended: StreamEnded | None = None + + #: If this response also had associated priority information, the + #: associated :class:`PriorityUpdated <h2.events.PriorityUpdated>` + #: event will be available here. + #: + #: .. versionadded:: 2.4.0 + self.priority_updated: PriorityUpdated | None = None + + def __repr__(self) -> str: + return f"<ResponseReceived stream_id:{self.stream_id}, headers:{self.headers}>" + + +class TrailersReceived(Event): + """ + The TrailersReceived event is fired whenever trailers are received on a + stream. Trailers are a set of headers sent after the body of the + request/response, and are used to provide information that wasn't known + ahead of time (e.g. content-length). This event carries the HTTP header + fields that form the trailers and the stream ID of the stream on which they + were received. + + .. versionchanged:: 2.3.0 + Changed the type of ``headers`` to :class:`HeaderTuple + <hpack:hpack.HeaderTuple>`. This has no effect on current users. + + .. versionchanged:: 2.4.0 + Added ``stream_ended`` and ``priority_updated`` properties. + """ + + def __init__(self) -> None: + #: The Stream ID for the stream on which these trailers were received. + self.stream_id: int | None = None + + #: The trailers themselves. + self.headers: list[HeaderTuple] | None = None + + #: Trailers always end streams. This property has the associated + #: :class:`StreamEnded <h2.events.StreamEnded>` in it. + #: + #: .. versionadded:: 2.4.0 + self.stream_ended: StreamEnded | None = None + + #: If the trailers also set associated priority information, the + #: associated :class:`PriorityUpdated <h2.events.PriorityUpdated>` + #: event will be available here. + #: + #: .. versionadded:: 2.4.0 + self.priority_updated: PriorityUpdated | None = None + + def __repr__(self) -> str: + return f"<TrailersReceived stream_id:{self.stream_id}, headers:{self.headers}>" + + +class _HeadersSent(Event): + """ + The _HeadersSent event is fired whenever headers are sent. + + This is an internal event, used to determine validation steps on + outgoing header blocks. + """ + + + +class _ResponseSent(_HeadersSent): + """ + The _ResponseSent event is fired whenever response headers are sent + on a stream. + + This is an internal event, used to determine validation steps on + outgoing header blocks. + """ + + + +class _RequestSent(_HeadersSent): + """ + The _RequestSent event is fired whenever request headers are sent + on a stream. + + This is an internal event, used to determine validation steps on + outgoing header blocks. + """ + + + +class _TrailersSent(_HeadersSent): + """ + The _TrailersSent event is fired whenever trailers are sent on a + stream. Trailers are a set of headers sent after the body of the + request/response, and are used to provide information that wasn't known + ahead of time (e.g. content-length). + + This is an internal event, used to determine validation steps on + outgoing header blocks. + """ + + + +class _PushedRequestSent(_HeadersSent): + """ + The _PushedRequestSent event is fired whenever pushed request headers are + sent. + + This is an internal event, used to determine validation steps on outgoing + header blocks. + """ + + + +class InformationalResponseReceived(Event): + """ + The InformationalResponseReceived event is fired when an informational + response (that is, one whose status code is a 1XX code) is received from + the remote peer. + + The remote peer may send any number of these, from zero upwards. These + responses are most commonly sent in response to requests that have the + ``expect: 100-continue`` header field present. Most users can safely + ignore this event unless you are intending to use the + ``expect: 100-continue`` flow, or are for any reason expecting a different + 1XX status code. + + .. versionadded:: 2.2.0 + + .. versionchanged:: 2.3.0 + Changed the type of ``headers`` to :class:`HeaderTuple + <hpack:hpack.HeaderTuple>`. This has no effect on current users. + + .. versionchanged:: 2.4.0 + Added ``priority_updated`` property. + """ + + def __init__(self) -> None: + #: The Stream ID for the stream this informational response was made + #: on. + self.stream_id: int | None = None + + #: The headers for this informational response. + self.headers: list[HeaderTuple] | None = None + + #: If this response also had associated priority information, the + #: associated :class:`PriorityUpdated <h2.events.PriorityUpdated>` + #: event will be available here. + #: + #: .. versionadded:: 2.4.0 + self.priority_updated: PriorityUpdated | None = None + + def __repr__(self) -> str: + return f"<InformationalResponseReceived stream_id:{self.stream_id}, headers:{self.headers}>" + + +class DataReceived(Event): + """ + The DataReceived event is fired whenever data is received on a stream from + the remote peer. The event carries the data itself, and the stream ID on + which the data was received. + + .. versionchanged:: 2.4.0 + Added ``stream_ended`` property. + """ + + def __init__(self) -> None: + #: The Stream ID for the stream this data was received on. + self.stream_id: int | None = None + + #: The data itself. + self.data: bytes | None = None + + #: The amount of data received that counts against the flow control + #: window. Note that padding counts against the flow control window, so + #: when adjusting flow control you should always use this field rather + #: than ``len(data)``. + self.flow_controlled_length: int | None = None + + #: If this data chunk also completed the stream, the associated + #: :class:`StreamEnded <h2.events.StreamEnded>` event will be available + #: here. + #: + #: .. versionadded:: 2.4.0 + self.stream_ended: StreamEnded | None = None + + def __repr__(self) -> str: + return ( + "<DataReceived stream_id:{}, " + "flow_controlled_length:{}, " + "data:{}>".format( + self.stream_id, + self.flow_controlled_length, + _bytes_representation(self.data[:20]) if self.data else "", + ) + ) + + +class WindowUpdated(Event): + """ + The WindowUpdated event is fired whenever a flow control window changes + size. HTTP/2 defines flow control windows for connections and streams: this + event fires for both connections and streams. The event carries the ID of + the stream to which it applies (set to zero if the window update applies to + the connection), and the delta in the window size. + """ + + def __init__(self) -> None: + #: The Stream ID of the stream whose flow control window was changed. + #: May be ``0`` if the connection window was changed. + self.stream_id: int | None = None + + #: The window delta. + self.delta: int | None = None + + def __repr__(self) -> str: + return f"<WindowUpdated stream_id:{self.stream_id}, delta:{self.delta}>" + + +class RemoteSettingsChanged(Event): + """ + The RemoteSettingsChanged event is fired whenever the remote peer changes + its settings. It contains a complete inventory of changed settings, + including their previous values. + + In HTTP/2, settings changes need to be acknowledged. h2 automatically + acknowledges settings changes for efficiency. However, it is possible that + the caller may not be happy with the changed setting. + + When this event is received, the caller should confirm that the new + settings are acceptable. If they are not acceptable, the user should close + the connection with the error code :data:`PROTOCOL_ERROR + <h2.errors.ErrorCodes.PROTOCOL_ERROR>`. + + .. versionchanged:: 2.0.0 + Prior to this version the user needed to acknowledge settings changes. + This is no longer the case: h2 now automatically acknowledges + them. + """ + + def __init__(self) -> None: + #: A dictionary of setting byte to + #: :class:`ChangedSetting <h2.settings.ChangedSetting>`, representing + #: the changed settings. + self.changed_settings: dict[int, ChangedSetting] = {} + + @classmethod + def from_settings(cls, + old_settings: Settings | dict[int, int], + new_settings: dict[int, int]) -> RemoteSettingsChanged: + """ + Build a RemoteSettingsChanged event from a set of changed settings. + + :param old_settings: A complete collection of old settings, in the form + of a dictionary of ``{setting: value}``. + :param new_settings: All the changed settings and their new values, in + the form of a dictionary of ``{setting: value}``. + """ + e = cls() + for setting, new_value in new_settings.items(): + s = _setting_code_from_int(setting) + original_value = old_settings.get(s) + change = ChangedSetting(s, original_value, new_value) + e.changed_settings[s] = change + + return e + + def __repr__(self) -> str: + return "<RemoteSettingsChanged changed_settings:{{{}}}>".format( + ", ".join(repr(cs) for cs in self.changed_settings.values()), + ) + + +class PingReceived(Event): + """ + The PingReceived event is fired whenever a PING is received. It contains + the 'opaque data' of the PING frame. A ping acknowledgment with the same + 'opaque data' is automatically emitted after receiving a ping. + + .. versionadded:: 3.1.0 + """ + + def __init__(self) -> None: + #: The data included on the ping. + self.ping_data: bytes | None = None + + def __repr__(self) -> str: + return f"<PingReceived ping_data:{_bytes_representation(self.ping_data)}>" + + +class PingAckReceived(Event): + """ + The PingAckReceived event is fired whenever a PING acknowledgment is + received. It contains the 'opaque data' of the PING+ACK frame, allowing the + user to correlate PINGs and calculate RTT. + + .. versionadded:: 3.1.0 + + .. versionchanged:: 4.0.0 + Removed deprecated but equivalent ``PingAcknowledged``. + """ + + def __init__(self) -> None: + #: The data included on the ping. + self.ping_data: bytes | None = None + + def __repr__(self) -> str: + return f"<PingAckReceived ping_data:{_bytes_representation(self.ping_data)}>" + + +class StreamEnded(Event): + """ + The StreamEnded event is fired whenever a stream is ended by a remote + party. The stream may not be fully closed if it has not been closed + locally, but no further data or headers should be expected on that stream. + """ + + def __init__(self) -> None: + #: The Stream ID of the stream that was closed. + self.stream_id: int | None = None + + def __repr__(self) -> str: + return f"<StreamEnded stream_id:{self.stream_id}>" + + +class StreamReset(Event): + """ + The StreamReset event is fired in two situations. The first is when the + remote party forcefully resets the stream. The second is when the remote + party has made a protocol error which only affects a single stream. In this + case, h2 will terminate the stream early and return this event. + + .. versionchanged:: 2.0.0 + This event is now fired when h2 automatically resets a stream. + """ + + def __init__(self) -> None: + #: The Stream ID of the stream that was reset. + self.stream_id: int | None = None + + #: The error code given. Either one of :class:`ErrorCodes + #: <h2.errors.ErrorCodes>` or ``int`` + self.error_code: ErrorCodes | None = None + + #: Whether the remote peer sent a RST_STREAM or we did. + self.remote_reset = True + + def __repr__(self) -> str: + return f"<StreamReset stream_id:{self.stream_id}, error_code:{self.error_code!s}, remote_reset:{self.remote_reset}>" + + +class PushedStreamReceived(Event): + """ + The PushedStreamReceived event is fired whenever a pushed stream has been + received from a remote peer. The event carries on it the new stream ID, the + ID of the parent stream, and the request headers pushed by the remote peer. + """ + + def __init__(self) -> None: + #: The Stream ID of the stream created by the push. + self.pushed_stream_id: int | None = None + + #: The Stream ID of the stream that the push is related to. + self.parent_stream_id: int | None = None + + #: The request headers, sent by the remote party in the push. + self.headers: list[HeaderTuple] | None = None + + def __repr__(self) -> str: + return ( + f"<PushedStreamReceived pushed_stream_id:{self.pushed_stream_id}, parent_stream_id:{self.parent_stream_id}, " + f"headers:{self.headers}>" + ) + + +class SettingsAcknowledged(Event): + """ + The SettingsAcknowledged event is fired whenever a settings ACK is received + from the remote peer. The event carries on it the settings that were + acknowedged, in the same format as + :class:`h2.events.RemoteSettingsChanged`. + """ + + def __init__(self) -> None: + #: A dictionary of setting byte to + #: :class:`ChangedSetting <h2.settings.ChangedSetting>`, representing + #: the changed settings. + self.changed_settings: dict[SettingCodes | int, ChangedSetting] = {} + + def __repr__(self) -> str: + s = ", ".join(repr(cs) for cs in self.changed_settings.values()) + return f"<SettingsAcknowledged changed_settings:{{{s}}}>" + + +class PriorityUpdated(Event): + """ + The PriorityUpdated event is fired whenever a stream sends updated priority + information. This can occur when the stream is opened, or at any time + during the stream lifetime. + + This event is purely advisory, and does not need to be acted on. + + .. versionadded:: 2.0.0 + """ + + def __init__(self) -> None: + #: The ID of the stream whose priority information is being updated. + self.stream_id: int | None = None + + #: The new stream weight. May be the same as the original stream + #: weight. An integer between 1 and 256. + self.weight: int | None = None + + #: The stream ID this stream now depends on. May be ``0``. + self.depends_on: int | None = None + + #: Whether the stream *exclusively* depends on the parent stream. If it + #: does, this stream should inherit the current children of its new + #: parent. + self.exclusive: bool | None = None + + def __repr__(self) -> str: + return ( + f"<PriorityUpdated stream_id:{self.stream_id}, weight:{self.weight}, depends_on:{self.depends_on}, " + f"exclusive:{self.exclusive}>" + ) + + +class ConnectionTerminated(Event): + """ + The ConnectionTerminated event is fired when a connection is torn down by + the remote peer using a GOAWAY frame. Once received, no further action may + be taken on the connection: a new connection must be established. + """ + + def __init__(self) -> None: + #: The error code cited when tearing down the connection. Should be + #: one of :class:`ErrorCodes <h2.errors.ErrorCodes>`, but may not be if + #: unknown HTTP/2 extensions are being used. + self.error_code: ErrorCodes | int | None = None + + #: The stream ID of the last stream the remote peer saw. This can + #: provide an indication of what data, if any, never reached the remote + #: peer and so can safely be resent. + self.last_stream_id: int | None = None + + #: Additional debug data that can be appended to GOAWAY frame. + self.additional_data: bytes | None = None + + def __repr__(self) -> str: + return ( + "<ConnectionTerminated error_code:{!s}, last_stream_id:{}, " + "additional_data:{}>".format( + self.error_code, + self.last_stream_id, + _bytes_representation( + self.additional_data[:20] + if self.additional_data else None), + ) + ) + + +class AlternativeServiceAvailable(Event): + """ + The AlternativeServiceAvailable event is fired when the remote peer + advertises an `RFC 7838 <https://tools.ietf.org/html/rfc7838>`_ Alternative + Service using an ALTSVC frame. + + This event always carries the origin to which the ALTSVC information + applies. That origin is either supplied by the server directly, or inferred + by h2 from the ``:authority`` pseudo-header field that was sent by + the user when initiating a given stream. + + This event also carries what RFC 7838 calls the "Alternative Service Field + Value", which is formatted like a HTTP header field and contains the + relevant alternative service information. h2 does not parse or in any + way modify that information: the user is required to do that. + + This event can only be fired on the client end of a connection. + + .. versionadded:: 2.3.0 + """ + + def __init__(self) -> None: + #: The origin to which the alternative service field value applies. + #: This field is either supplied by the server directly, or inferred by + #: h2 from the ``:authority`` pseudo-header field that was sent + #: by the user when initiating the stream on which the frame was + #: received. + self.origin: bytes | None = None + + #: The ALTSVC field value. This contains information about the HTTP + #: alternative service being advertised by the server. h2 does + #: not parse this field: it is left exactly as sent by the server. The + #: structure of the data in this field is given by `RFC 7838 Section 3 + #: <https://tools.ietf.org/html/rfc7838#section-3>`_. + self.field_value: bytes | None = None + + def __repr__(self) -> str: + return ( + "<AlternativeServiceAvailable origin:{}, field_value:{}>".format( + (self.origin or b"").decode("utf-8", "ignore"), + (self.field_value or b"").decode("utf-8", "ignore"), + ) + ) + + +class UnknownFrameReceived(Event): + """ + The UnknownFrameReceived event is fired when the remote peer sends a frame + that h2 does not understand. This occurs primarily when the remote + peer is employing HTTP/2 extensions that h2 doesn't know anything + about. + + RFC 7540 requires that HTTP/2 implementations ignore these frames. h2 + does so. However, this event is fired to allow implementations to perform + special processing on those frames if needed (e.g. if the implementation + is capable of handling the frame itself). + + .. versionadded:: 2.7.0 + """ + + def __init__(self) -> None: + #: The hyperframe Frame object that encapsulates the received frame. + self.frame: Frame | None = None + + def __repr__(self) -> str: + return "<UnknownFrameReceived>" + + +def _bytes_representation(data: bytes | None) -> str | None: + """ + Converts a bytestring into something that is safe to print on all Python + platforms. + + This function is relatively expensive, so it should not be called on the + mainline of the code. It's safe to use in things like object repr methods + though. + """ + if data is None: + return None + + return binascii.hexlify(data).decode("ascii") diff --git a/.venv/lib/python3.12/site-packages/h2/exceptions.py b/.venv/lib/python3.12/site-packages/h2/exceptions.py new file mode 100644 index 00000000..e4776795 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/exceptions.py @@ -0,0 +1,194 @@ +""" +h2/exceptions +~~~~~~~~~~~~~ + +Exceptions for the HTTP/2 module. +""" +from __future__ import annotations + +from .errors import ErrorCodes + + +class H2Error(Exception): + """ + The base class for all exceptions for the HTTP/2 module. + """ + + +class ProtocolError(H2Error): + """ + An action was attempted in violation of the HTTP/2 protocol. + """ + + #: The error code corresponds to this kind of Protocol Error. + error_code = ErrorCodes.PROTOCOL_ERROR + + +class FrameTooLargeError(ProtocolError): + """ + The frame that we tried to send or that we received was too large. + """ + + #: The error code corresponds to this kind of Protocol Error. + error_code = ErrorCodes.FRAME_SIZE_ERROR + + +class FrameDataMissingError(ProtocolError): + """ + The frame that we received is missing some data. + + .. versionadded:: 2.0.0 + """ + + #: The error code corresponds to this kind of Protocol Error. + error_code = ErrorCodes.FRAME_SIZE_ERROR + + +class TooManyStreamsError(ProtocolError): + """ + An attempt was made to open a stream that would lead to too many concurrent + streams. + """ + + + +class FlowControlError(ProtocolError): + """ + An attempted action violates flow control constraints. + """ + + #: The error code corresponds to this kind of Protocol Error. + error_code = ErrorCodes.FLOW_CONTROL_ERROR + + +class StreamIDTooLowError(ProtocolError): + """ + An attempt was made to open a stream that had an ID that is lower than the + highest ID we have seen on this connection. + """ + + def __init__(self, stream_id: int, max_stream_id: int) -> None: + #: The ID of the stream that we attempted to open. + self.stream_id = stream_id + + #: The current highest-seen stream ID. + self.max_stream_id = max_stream_id + + def __str__(self) -> str: + return f"StreamIDTooLowError: {self.stream_id} is lower than {self.max_stream_id}" + + +class NoAvailableStreamIDError(ProtocolError): + """ + There are no available stream IDs left to the connection. All stream IDs + have been exhausted. + + .. versionadded:: 2.0.0 + """ + + + +class NoSuchStreamError(ProtocolError): + """ + A stream-specific action referenced a stream that does not exist. + + .. versionchanged:: 2.0.0 + Became a subclass of :class:`ProtocolError + <h2.exceptions.ProtocolError>` + """ + + def __init__(self, stream_id: int) -> None: + #: The stream ID corresponds to the non-existent stream. + self.stream_id = stream_id + + +class StreamClosedError(NoSuchStreamError): + """ + A more specific form of + :class:`NoSuchStreamError <h2.exceptions.NoSuchStreamError>`. Indicates + that the stream has since been closed, and that all state relating to that + stream has been removed. + """ + + def __init__(self, stream_id: int) -> None: + #: The stream ID corresponds to the nonexistent stream. + self.stream_id = stream_id + + #: The relevant HTTP/2 error code. + self.error_code = ErrorCodes.STREAM_CLOSED + + # Any events that internal code may need to fire. Not relevant to + # external users that may receive a StreamClosedError. + self._events = [] # type: ignore + + +class InvalidSettingsValueError(ProtocolError, ValueError): + """ + An attempt was made to set an invalid Settings value. + + .. versionadded:: 2.0.0 + """ + + def __init__(self, msg: str, error_code: ErrorCodes) -> None: + super().__init__(msg) + self.error_code = error_code + + +class InvalidBodyLengthError(ProtocolError): + """ + The remote peer sent more or less data that the Content-Length header + indicated. + + .. versionadded:: 2.0.0 + """ + + def __init__(self, expected: int, actual: int) -> None: + self.expected_length = expected + self.actual_length = actual + + def __str__(self) -> str: + return f"InvalidBodyLengthError: Expected {self.expected_length} bytes, received {self.actual_length}" + + +class UnsupportedFrameError(ProtocolError): + """ + The remote peer sent a frame that is unsupported in this context. + + .. versionadded:: 2.1.0 + + .. versionchanged:: 4.0.0 + Removed deprecated KeyError parent class. + """ + + + +class RFC1122Error(H2Error): + """ + Emitted when users attempt to do something that is literally allowed by the + relevant RFC, but is sufficiently ill-defined that it's unwise to allow + users to actually do it. + + While there is some disagreement about whether or not we should be liberal + in what accept, it is a truth universally acknowledged that we should be + conservative in what emit. + + .. versionadded:: 2.4.0 + """ + + # shazow says I'm going to regret naming the exception this way. If that + # turns out to be true, TELL HIM NOTHING. + + +class DenialOfServiceError(ProtocolError): + """ + Emitted when the remote peer exhibits a behaviour that is likely to be an + attempt to perform a Denial of Service attack on the implementation. This + is a form of ProtocolError that carries a different error code, and allows + more easy detection of this kind of behaviour. + + .. versionadded:: 2.5.0 + """ + + #: The error code corresponds to this kind of + #: :class:`ProtocolError <h2.exceptions.ProtocolError>` + error_code = ErrorCodes.ENHANCE_YOUR_CALM diff --git a/.venv/lib/python3.12/site-packages/h2/frame_buffer.py b/.venv/lib/python3.12/site-packages/h2/frame_buffer.py new file mode 100644 index 00000000..30d96e81 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/frame_buffer.py @@ -0,0 +1,161 @@ +""" +h2/frame_buffer +~~~~~~~~~~~~~~~ + +A data structure that provides a way to iterate over a byte buffer in terms of +frames. +""" +from __future__ import annotations + +from hyperframe.exceptions import InvalidDataError, InvalidFrameError +from hyperframe.frame import ContinuationFrame, Frame, HeadersFrame, PushPromiseFrame + +from .exceptions import FrameDataMissingError, FrameTooLargeError, ProtocolError + +# To avoid a DOS attack based on sending loads of continuation frames, we limit +# the maximum number we're perpared to receive. In this case, we'll set the +# limit to 64, which means the largest encoded header block we can receive by +# default is 262144 bytes long, and the largest possible *at all* is 1073741760 +# bytes long. +# +# This value seems reasonable for now, but in future we may want to evaluate +# making it configurable. +CONTINUATION_BACKLOG = 64 + + +class FrameBuffer: + """ + A buffer data structure for HTTP/2 data that allows iteraton in terms of + H2 frames. + """ + + def __init__(self, server: bool = False) -> None: + self.data = b"" + self.max_frame_size = 0 + self._preamble = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" if server else b"" + self._preamble_len = len(self._preamble) + self._headers_buffer: list[HeadersFrame | ContinuationFrame | PushPromiseFrame] = [] + + def add_data(self, data: bytes) -> None: + """ + Add more data to the frame buffer. + + :param data: A bytestring containing the byte buffer. + """ + if self._preamble_len: + data_len = len(data) + of_which_preamble = min(self._preamble_len, data_len) + + if self._preamble[:of_which_preamble] != data[:of_which_preamble]: + msg = "Invalid HTTP/2 preamble." + raise ProtocolError(msg) + + data = data[of_which_preamble:] + self._preamble_len -= of_which_preamble + self._preamble = self._preamble[of_which_preamble:] + + self.data += data + + def _validate_frame_length(self, length: int) -> None: + """ + Confirm that the frame is an appropriate length. + """ + if length > self.max_frame_size: + msg = f"Received overlong frame: length {length}, max {self.max_frame_size}" + raise FrameTooLargeError(msg) + + def _update_header_buffer(self, f: Frame | None) -> Frame | None: + """ + Updates the internal header buffer. Returns a frame that should replace + the current one. May throw exceptions if this frame is invalid. + """ + # Check if we're in the middle of a headers block. If we are, this + # frame *must* be a CONTINUATION frame with the same stream ID as the + # leading HEADERS or PUSH_PROMISE frame. Anything else is a + # ProtocolError. If the frame *is* valid, append it to the header + # buffer. + if self._headers_buffer: + stream_id = self._headers_buffer[0].stream_id + valid_frame = ( + f is not None and + isinstance(f, ContinuationFrame) and + f.stream_id == stream_id + ) + if not valid_frame: + msg = "Invalid frame during header block." + raise ProtocolError(msg) + assert isinstance(f, ContinuationFrame) + + # Append the frame to the buffer. + self._headers_buffer.append(f) + if len(self._headers_buffer) > CONTINUATION_BACKLOG: + msg = "Too many continuation frames received." + raise ProtocolError(msg) + + # If this is the end of the header block, then we want to build a + # mutant HEADERS frame that's massive. Use the original one we got, + # then set END_HEADERS and set its data appopriately. If it's not + # the end of the block, lose the current frame: we can't yield it. + if "END_HEADERS" in f.flags: + f = self._headers_buffer[0] + f.flags.add("END_HEADERS") + f.data = b"".join(x.data for x in self._headers_buffer) + self._headers_buffer = [] + else: + f = None + elif (isinstance(f, (HeadersFrame, PushPromiseFrame)) and + "END_HEADERS" not in f.flags): + # This is the start of a headers block! Save the frame off and then + # act like we didn't receive one. + self._headers_buffer.append(f) + f = None + + return f + + # The methods below support the iterator protocol. + def __iter__(self) -> FrameBuffer: + return self + + def __next__(self) -> Frame: + # First, check that we have enough data to successfully parse the + # next frame header. If not, bail. Otherwise, parse it. + if len(self.data) < 9: + raise StopIteration + + try: + f, length = Frame.parse_frame_header(memoryview(self.data[:9])) + except (InvalidDataError, InvalidFrameError) as err: # pragma: no cover + msg = f"Received frame with invalid header: {err!s}" + raise ProtocolError(msg) from err + + # Next, check that we have enough length to parse the frame body. If + # not, bail, leaving the frame header data in the buffer for next time. + if len(self.data) < length + 9: + raise StopIteration + + # Confirm the frame has an appropriate length. + self._validate_frame_length(length) + + # Try to parse the frame body + try: + f.parse_body(memoryview(self.data[9:9+length])) + except InvalidDataError as err: + msg = "Received frame with non-compliant data" + raise ProtocolError(msg) from err + except InvalidFrameError as err: + msg = "Frame data missing or invalid" + raise FrameDataMissingError(msg) from err + + # At this point, as we know we'll use or discard the entire frame, we + # can update the data. + self.data = self.data[9+length:] + + # Pass the frame through the header buffer. + new_frame = self._update_header_buffer(f) + + # If we got a frame we didn't understand or shouldn't yield, rather + # than return None it'd be better if we just tried to get the next + # frame in the sequence instead. Recurse back into ourselves to do + # that. This is safe because the amount of work we have to do here is + # strictly bounded by the length of the buffer. + return new_frame if new_frame is not None else self.__next__() diff --git a/.venv/lib/python3.12/site-packages/h2/py.typed b/.venv/lib/python3.12/site-packages/h2/py.typed new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/py.typed diff --git a/.venv/lib/python3.12/site-packages/h2/settings.py b/.venv/lib/python3.12/site-packages/h2/settings.py new file mode 100644 index 00000000..c1be953b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/settings.py @@ -0,0 +1,331 @@ +""" +h2/settings +~~~~~~~~~~~ + +This module contains a HTTP/2 settings object. This object provides a simple +API for manipulating HTTP/2 settings, keeping track of both the current active +state of the settings and the unacknowledged future values of the settings. +""" +from __future__ import annotations + +import collections +import enum +from collections.abc import Iterator, MutableMapping +from typing import Union + +from hyperframe.frame import SettingsFrame + +from .errors import ErrorCodes +from .exceptions import InvalidSettingsValueError + + +class SettingCodes(enum.IntEnum): + """ + All known HTTP/2 setting codes. + + .. versionadded:: 2.6.0 + """ + + #: Allows the sender to inform the remote endpoint of the maximum size of + #: the header compression table used to decode header blocks, in octets. + HEADER_TABLE_SIZE = SettingsFrame.HEADER_TABLE_SIZE + + #: This setting can be used to disable server push. To disable server push + #: on a client, set this to 0. + ENABLE_PUSH = SettingsFrame.ENABLE_PUSH + + #: Indicates the maximum number of concurrent streams that the sender will + #: allow. + MAX_CONCURRENT_STREAMS = SettingsFrame.MAX_CONCURRENT_STREAMS + + #: Indicates the sender's initial window size (in octets) for stream-level + #: flow control. + INITIAL_WINDOW_SIZE = SettingsFrame.INITIAL_WINDOW_SIZE + + #: Indicates the size of the largest frame payload that the sender is + #: willing to receive, in octets. + MAX_FRAME_SIZE = SettingsFrame.MAX_FRAME_SIZE + + #: This advisory setting informs a peer of the maximum size of header list + #: that the sender is prepared to accept, in octets. The value is based on + #: the uncompressed size of header fields, including the length of the name + #: and value in octets plus an overhead of 32 octets for each header field. + MAX_HEADER_LIST_SIZE = SettingsFrame.MAX_HEADER_LIST_SIZE + + #: This setting can be used to enable the connect protocol. To enable on a + #: client set this to 1. + ENABLE_CONNECT_PROTOCOL = SettingsFrame.ENABLE_CONNECT_PROTOCOL + + +def _setting_code_from_int(code: int) -> SettingCodes | int: + """ + Given an integer setting code, returns either one of :class:`SettingCodes + <h2.settings.SettingCodes>` or, if not present in the known set of codes, + returns the integer directly. + """ + try: + return SettingCodes(code) + except ValueError: + return code + + +class ChangedSetting: + + def __init__(self, setting: SettingCodes | int, original_value: int | None, new_value: int) -> None: + #: The setting code given. Either one of :class:`SettingCodes + #: <h2.settings.SettingCodes>` or ``int`` + #: + #: .. versionchanged:: 2.6.0 + self.setting = setting + + #: The original value before being changed. + self.original_value = original_value + + #: The new value after being changed. + self.new_value = new_value + + def __repr__(self) -> str: + return ( + f"ChangedSetting(setting={self.setting!s}, original_value={self.original_value}, new_value={self.new_value})" + ) + + +class Settings(MutableMapping[Union[SettingCodes, int], int]): + """ + An object that encapsulates HTTP/2 settings state. + + HTTP/2 Settings are a complex beast. Each party, remote and local, has its + own settings and a view of the other party's settings. When a settings + frame is emitted by a peer it cannot assume that the new settings values + are in place until the remote peer acknowledges the setting. In principle, + multiple settings changes can be "in flight" at the same time, all with + different values. + + This object encapsulates this mess. It provides a dict-like interface to + settings, which return the *current* values of the settings in question. + Additionally, it keeps track of the stack of proposed values: each time an + acknowledgement is sent/received, it updates the current values with the + stack of proposed values. On top of all that, it validates the values to + make sure they're allowed, and raises :class:`InvalidSettingsValueError + <h2.exceptions.InvalidSettingsValueError>` if they are not. + + Finally, this object understands what the default values of the HTTP/2 + settings are, and sets those defaults appropriately. + + .. versionchanged:: 2.2.0 + Added the ``initial_values`` parameter. + + .. versionchanged:: 2.5.0 + Added the ``max_header_list_size`` property. + + :param client: (optional) Whether these settings should be defaulted for a + client implementation or a server implementation. Defaults to ``True``. + :type client: ``bool`` + :param initial_values: (optional) Any initial values the user would like + set, rather than RFC 7540's defaults. + :type initial_vales: ``MutableMapping`` + """ + + def __init__(self, client: bool = True, initial_values: dict[SettingCodes, int] | None = None) -> None: + # Backing object for the settings. This is a dictionary of + # (setting: [list of values]), where the first value in the list is the + # current value of the setting. Strictly this doesn't use lists but + # instead uses collections.deque to avoid repeated memory allocations. + # + # This contains the default values for HTTP/2. + self._settings: dict[SettingCodes | int, collections.deque[int]] = { + SettingCodes.HEADER_TABLE_SIZE: collections.deque([4096]), + SettingCodes.ENABLE_PUSH: collections.deque([int(client)]), + SettingCodes.INITIAL_WINDOW_SIZE: collections.deque([65535]), + SettingCodes.MAX_FRAME_SIZE: collections.deque([16384]), + SettingCodes.ENABLE_CONNECT_PROTOCOL: collections.deque([0]), + } + if initial_values is not None: + for key, value in initial_values.items(): + invalid = _validate_setting(key, value) + if invalid: + msg = f"Setting {key} has invalid value {value}" + raise InvalidSettingsValueError( + msg, + error_code=invalid, + ) + self._settings[key] = collections.deque([value]) + + def acknowledge(self) -> dict[SettingCodes | int, ChangedSetting]: + """ + The settings have been acknowledged, either by the user (remote + settings) or by the remote peer (local settings). + + :returns: A dict of {setting: ChangedSetting} that were applied. + """ + changed_settings: dict[SettingCodes | int, ChangedSetting] = {} + + # If there is more than one setting in the list, we have a setting + # value outstanding. Update them. + for k, v in self._settings.items(): + if len(v) > 1: + old_setting = v.popleft() + new_setting = v[0] + changed_settings[k] = ChangedSetting( + k, old_setting, new_setting, + ) + + return changed_settings + + # Provide easy-access to well known settings. + @property + def header_table_size(self) -> int: + """ + The current value of the :data:`HEADER_TABLE_SIZE + <h2.settings.SettingCodes.HEADER_TABLE_SIZE>` setting. + """ + return self[SettingCodes.HEADER_TABLE_SIZE] + + @header_table_size.setter + def header_table_size(self, value: int) -> None: + self[SettingCodes.HEADER_TABLE_SIZE] = value + + @property + def enable_push(self) -> int: + """ + The current value of the :data:`ENABLE_PUSH + <h2.settings.SettingCodes.ENABLE_PUSH>` setting. + """ + return self[SettingCodes.ENABLE_PUSH] + + @enable_push.setter + def enable_push(self, value: int) -> None: + self[SettingCodes.ENABLE_PUSH] = value + + @property + def initial_window_size(self) -> int: + """ + The current value of the :data:`INITIAL_WINDOW_SIZE + <h2.settings.SettingCodes.INITIAL_WINDOW_SIZE>` setting. + """ + return self[SettingCodes.INITIAL_WINDOW_SIZE] + + @initial_window_size.setter + def initial_window_size(self, value: int) -> None: + self[SettingCodes.INITIAL_WINDOW_SIZE] = value + + @property + def max_frame_size(self) -> int: + """ + The current value of the :data:`MAX_FRAME_SIZE + <h2.settings.SettingCodes.MAX_FRAME_SIZE>` setting. + """ + return self[SettingCodes.MAX_FRAME_SIZE] + + @max_frame_size.setter + def max_frame_size(self, value: int) -> None: + self[SettingCodes.MAX_FRAME_SIZE] = value + + @property + def max_concurrent_streams(self) -> int: + """ + The current value of the :data:`MAX_CONCURRENT_STREAMS + <h2.settings.SettingCodes.MAX_CONCURRENT_STREAMS>` setting. + """ + return self.get(SettingCodes.MAX_CONCURRENT_STREAMS, 2**32+1) + + @max_concurrent_streams.setter + def max_concurrent_streams(self, value: int) -> None: + self[SettingCodes.MAX_CONCURRENT_STREAMS] = value + + @property + def max_header_list_size(self) -> int | None: + """ + The current value of the :data:`MAX_HEADER_LIST_SIZE + <h2.settings.SettingCodes.MAX_HEADER_LIST_SIZE>` setting. If not set, + returns ``None``, which means unlimited. + + .. versionadded:: 2.5.0 + """ + return self.get(SettingCodes.MAX_HEADER_LIST_SIZE, None) + + @max_header_list_size.setter + def max_header_list_size(self, value: int) -> None: + self[SettingCodes.MAX_HEADER_LIST_SIZE] = value + + @property + def enable_connect_protocol(self) -> int: + """ + The current value of the :data:`ENABLE_CONNECT_PROTOCOL + <h2.settings.SettingCodes.ENABLE_CONNECT_PROTOCOL>` setting. + """ + return self[SettingCodes.ENABLE_CONNECT_PROTOCOL] + + @enable_connect_protocol.setter + def enable_connect_protocol(self, value: int) -> None: + self[SettingCodes.ENABLE_CONNECT_PROTOCOL] = value + + # Implement the MutableMapping API. + def __getitem__(self, key: SettingCodes | int) -> int: + val = self._settings[key][0] + + # Things that were created when a setting was received should stay + # KeyError'd. + if val is None: + raise KeyError + + return val + + def __setitem__(self, key: SettingCodes | int, value: int) -> None: + invalid = _validate_setting(key, value) + if invalid: + msg = f"Setting {key} has invalid value {value}" + raise InvalidSettingsValueError( + msg, + error_code=invalid, + ) + + try: + items = self._settings[key] + except KeyError: + items = collections.deque([None]) # type: ignore + self._settings[key] = items + + items.append(value) + + def __delitem__(self, key: SettingCodes | int) -> None: + del self._settings[key] + + def __iter__(self) -> Iterator[SettingCodes | int]: + return self._settings.__iter__() + + def __len__(self) -> int: + return len(self._settings) + + def __eq__(self, other: object) -> bool: + if isinstance(other, Settings): + return self._settings == other._settings + return NotImplemented + + def __ne__(self, other: object) -> bool: + if isinstance(other, Settings): + return not self == other + return NotImplemented + + +def _validate_setting(setting: SettingCodes | int, value: int) -> ErrorCodes: + """ + Confirms that a specific setting has a well-formed value. If the setting is + invalid, returns an error code. Otherwise, returns 0 (NO_ERROR). + """ + if setting == SettingCodes.ENABLE_PUSH: + if value not in (0, 1): + return ErrorCodes.PROTOCOL_ERROR + elif setting == SettingCodes.INITIAL_WINDOW_SIZE: + if not 0 <= value <= 2147483647: # 2^31 - 1 + return ErrorCodes.FLOW_CONTROL_ERROR + elif setting == SettingCodes.MAX_FRAME_SIZE: + if not 16384 <= value <= 16777215: # 2^14 and 2^24 - 1 + return ErrorCodes.PROTOCOL_ERROR + elif setting == SettingCodes.MAX_HEADER_LIST_SIZE: + if value < 0: + return ErrorCodes.PROTOCOL_ERROR + elif setting == SettingCodes.ENABLE_CONNECT_PROTOCOL and value not in (0, 1): + return ErrorCodes.PROTOCOL_ERROR + + return ErrorCodes.NO_ERROR diff --git a/.venv/lib/python3.12/site-packages/h2/stream.py b/.venv/lib/python3.12/site-packages/h2/stream.py new file mode 100644 index 00000000..7d4a12e3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/stream.py @@ -0,0 +1,1417 @@ +""" +h2/stream +~~~~~~~~~ + +An implementation of a HTTP/2 stream. +""" +from __future__ import annotations + +from enum import Enum, IntEnum +from typing import TYPE_CHECKING, Any + +from hpack import HeaderTuple +from hyperframe.frame import AltSvcFrame, ContinuationFrame, DataFrame, Frame, HeadersFrame, PushPromiseFrame, RstStreamFrame, WindowUpdateFrame + +from .errors import ErrorCodes, _error_code_from_int +from .events import ( + AlternativeServiceAvailable, + DataReceived, + Event, + InformationalResponseReceived, + PushedStreamReceived, + RequestReceived, + ResponseReceived, + StreamEnded, + StreamReset, + TrailersReceived, + WindowUpdated, + _PushedRequestSent, + _RequestSent, + _ResponseSent, + _TrailersSent, +) +from .exceptions import FlowControlError, InvalidBodyLengthError, ProtocolError, StreamClosedError +from .utilities import ( + HeaderValidationFlags, + authority_from_headers, + extract_method_header, + guard_increment_window, + is_informational_response, + normalize_inbound_headers, + normalize_outbound_headers, + utf8_encode_headers, + validate_headers, + validate_outbound_headers, +) +from .windows import WindowManager + +if TYPE_CHECKING: # pragma: no cover + from collections.abc import Generator, Iterable + + from hpack.hpack import Encoder + from hpack.struct import Header, HeaderWeaklyTyped + + from .config import H2Configuration + + +class StreamState(IntEnum): + IDLE = 0 + RESERVED_REMOTE = 1 + RESERVED_LOCAL = 2 + OPEN = 3 + HALF_CLOSED_REMOTE = 4 + HALF_CLOSED_LOCAL = 5 + CLOSED = 6 + + +class StreamInputs(Enum): + SEND_HEADERS = 0 + SEND_PUSH_PROMISE = 1 + SEND_RST_STREAM = 2 + SEND_DATA = 3 + SEND_WINDOW_UPDATE = 4 + SEND_END_STREAM = 5 + RECV_HEADERS = 6 + RECV_PUSH_PROMISE = 7 + RECV_RST_STREAM = 8 + RECV_DATA = 9 + RECV_WINDOW_UPDATE = 10 + RECV_END_STREAM = 11 + RECV_CONTINUATION = 12 # Added in 2.0.0 + SEND_INFORMATIONAL_HEADERS = 13 # Added in 2.2.0 + RECV_INFORMATIONAL_HEADERS = 14 # Added in 2.2.0 + SEND_ALTERNATIVE_SERVICE = 15 # Added in 2.3.0 + RECV_ALTERNATIVE_SERVICE = 16 # Added in 2.3.0 + UPGRADE_CLIENT = 17 # Added 2.3.0 + UPGRADE_SERVER = 18 # Added 2.3.0 + + +class StreamClosedBy(Enum): + SEND_END_STREAM = 0 + RECV_END_STREAM = 1 + SEND_RST_STREAM = 2 + RECV_RST_STREAM = 3 + + +# This array is initialized once, and is indexed by the stream states above. +# It indicates whether a stream in the given state is open. The reason we do +# this is that we potentially check whether a stream in a given state is open +# quite frequently: given that we check so often, we should do so in the +# fastest and most performant way possible. +STREAM_OPEN = [False for _ in range(len(StreamState))] +STREAM_OPEN[StreamState.OPEN] = True +STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True +STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True + + +class H2StreamStateMachine: + """ + A single HTTP/2 stream state machine. + + This stream object implements basically the state machine described in + RFC 7540 section 5.1. + + :param stream_id: The stream ID of this stream. This is stored primarily + for logging purposes. + """ + + def __init__(self, stream_id: int) -> None: + self.state = StreamState.IDLE + self.stream_id = stream_id + + #: Whether this peer is the client side of this stream. + self.client: bool | None = None + + # Whether trailers have been sent/received on this stream or not. + self.headers_sent: bool | None = None + self.trailers_sent: bool | None = None + self.headers_received: bool | None = None + self.trailers_received: bool | None = None + + # How the stream was closed. One of StreamClosedBy. + self.stream_closed_by: StreamClosedBy | None = None + + def process_input(self, input_: StreamInputs) -> Any: + """ + Process a specific input in the state machine. + """ + if not isinstance(input_, StreamInputs): + msg = "Input must be an instance of StreamInputs" + raise ValueError(msg) # noqa: TRY004 + + try: + func, target_state = _transitions[(self.state, input_)] + except KeyError as err: + old_state = self.state + self.state = StreamState.CLOSED + msg = f"Invalid input {input_} in state {old_state}" + raise ProtocolError(msg) from err + else: + previous_state = self.state + self.state = target_state + if func is not None: + try: + return func(self, previous_state) + except ProtocolError: + self.state = StreamState.CLOSED + raise + except AssertionError as err: # pragma: no cover + self.state = StreamState.CLOSED + raise ProtocolError(err) from err + + return [] + + def request_sent(self, previous_state: StreamState) -> list[Event]: + """ + Fires when a request is sent. + """ + self.client = True + self.headers_sent = True + event = _RequestSent() + + return [event] + + def response_sent(self, previous_state: StreamState) -> list[Event]: + """ + Fires when something that should be a response is sent. This 'response' + may actually be trailers. + """ + if not self.headers_sent: + if self.client is True or self.client is None: + msg = "Client cannot send responses." + raise ProtocolError(msg) + self.headers_sent = True + return [_ResponseSent()] + assert not self.trailers_sent + self.trailers_sent = True + return [_TrailersSent()] + + def request_received(self, previous_state: StreamState) -> list[Event]: + """ + Fires when a request is received. + """ + assert not self.headers_received + assert not self.trailers_received + + self.client = False + self.headers_received = True + event = RequestReceived() + event.stream_id = self.stream_id + return [event] + + def response_received(self, previous_state: StreamState) -> list[Event]: + """ + Fires when a response is received. Also disambiguates between responses + and trailers. + """ + event: ResponseReceived | TrailersReceived + if not self.headers_received: + assert self.client is True + self.headers_received = True + event = ResponseReceived() + else: + assert not self.trailers_received + self.trailers_received = True + event = TrailersReceived() + + event.stream_id = self.stream_id + return [event] + + def data_received(self, previous_state: StreamState) -> list[Event]: + """ + Fires when data is received. + """ + if not self.headers_received: + msg = "cannot receive data before headers" + raise ProtocolError(msg) + event = DataReceived() + event.stream_id = self.stream_id + return [event] + + def window_updated(self, previous_state: StreamState) -> list[Event]: + """ + Fires when a window update frame is received. + """ + event = WindowUpdated() + event.stream_id = self.stream_id + return [event] + + def stream_half_closed(self, previous_state: StreamState) -> list[Event]: + """ + Fires when an END_STREAM flag is received in the OPEN state, + transitioning this stream to a HALF_CLOSED_REMOTE state. + """ + event = StreamEnded() + event.stream_id = self.stream_id + return [event] + + def stream_ended(self, previous_state: StreamState) -> list[Event]: + """ + Fires when a stream is cleanly ended. + """ + self.stream_closed_by = StreamClosedBy.RECV_END_STREAM + event = StreamEnded() + event.stream_id = self.stream_id + return [event] + + def stream_reset(self, previous_state: StreamState) -> list[Event]: + """ + Fired when a stream is forcefully reset. + """ + self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM + event = StreamReset() + event.stream_id = self.stream_id + return [event] + + def send_new_pushed_stream(self, previous_state: StreamState) -> list[Event]: + """ + Fires on the newly pushed stream, when pushed by the local peer. + + No event here, but definitionally this peer must be a server. + """ + assert self.client is None + self.client = False + self.headers_received = True + return [] + + def recv_new_pushed_stream(self, previous_state: StreamState) -> list[Event]: + """ + Fires on the newly pushed stream, when pushed by the remote peer. + + No event here, but definitionally this peer must be a client. + """ + assert self.client is None + self.client = True + self.headers_sent = True + return [] + + def send_push_promise(self, previous_state: StreamState) -> list[Event]: + """ + Fires on the already-existing stream when a PUSH_PROMISE frame is sent. + We may only send PUSH_PROMISE frames if we're a server. + """ + if self.client is True: + msg = "Cannot push streams from client peers." + raise ProtocolError(msg) + + event = _PushedRequestSent() + return [event] + + def recv_push_promise(self, previous_state: StreamState) -> list[Event]: + """ + Fires on the already-existing stream when a PUSH_PROMISE frame is + received. We may only receive PUSH_PROMISE frames if we're a client. + + Fires a PushedStreamReceived event. + """ + if not self.client: + if self.client is None: # pragma: no cover + msg = "Idle streams cannot receive pushes" + else: # pragma: no cover + msg = "Cannot receive pushed streams as a server" + raise ProtocolError(msg) + + event = PushedStreamReceived() + event.parent_stream_id = self.stream_id + return [event] + + def send_end_stream(self, previous_state: StreamState) -> None: + """ + Called when an attempt is made to send END_STREAM in the + HALF_CLOSED_REMOTE state. + """ + self.stream_closed_by = StreamClosedBy.SEND_END_STREAM + + def send_reset_stream(self, previous_state: StreamState) -> None: + """ + Called when an attempt is made to send RST_STREAM in a non-closed + stream state. + """ + self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM + + def reset_stream_on_error(self, previous_state: StreamState) -> None: + """ + Called when we need to forcefully emit another RST_STREAM frame on + behalf of the state machine. + + If this is the first time we've done this, we should also hang an event + off the StreamClosedError so that the user can be informed. We know + it's the first time we've done this if the stream is currently in a + state other than CLOSED. + """ + self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM + + error = StreamClosedError(self.stream_id) + + event = StreamReset() + event.stream_id = self.stream_id + event.error_code = ErrorCodes.STREAM_CLOSED + event.remote_reset = False + error._events = [event] + raise error + + def recv_on_closed_stream(self, previous_state: StreamState) -> None: + """ + Called when an unexpected frame is received on an already-closed + stream. + + An endpoint that receives an unexpected frame should treat it as + a stream error or connection error with type STREAM_CLOSED, depending + on the specific frame. The error handling is done at a higher level: + this just raises the appropriate error. + """ + raise StreamClosedError(self.stream_id) + + def send_on_closed_stream(self, previous_state: StreamState) -> None: + """ + Called when an attempt is made to send data on an already-closed + stream. + + This essentially overrides the standard logic by throwing a + more-specific error: StreamClosedError. This is a ProtocolError, so it + matches the standard API of the state machine, but provides more detail + to the user. + """ + raise StreamClosedError(self.stream_id) + + def recv_push_on_closed_stream(self, previous_state: StreamState) -> None: + """ + Called when a PUSH_PROMISE frame is received on a full stop + stream. + + If the stream was closed by us sending a RST_STREAM frame, then we + presume that the PUSH_PROMISE was in flight when we reset the parent + stream. Rathen than accept the new stream, we just reset it. + Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a + naturally closed stream is a real problem because it creates a brand + new stream that the remote peer now believes exists. + """ + assert self.stream_closed_by is not None + + if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM: + raise StreamClosedError(self.stream_id) + msg = "Attempted to push on closed stream." + raise ProtocolError(msg) + + def send_push_on_closed_stream(self, previous_state: StreamState) -> None: + """ + Called when an attempt is made to push on an already-closed stream. + + This essentially overrides the standard logic by providing a more + useful error message. It's necessary because simply indicating that the + stream is closed is not enough: there is now a new stream that is not + allowed to be there. The only recourse is to tear the whole connection + down. + """ + msg = "Attempted to push on closed stream." + raise ProtocolError(msg) + + def send_informational_response(self, previous_state: StreamState) -> list[Event]: + """ + Called when an informational header block is sent (that is, a block + where the :status header has a 1XX value). + + Only enforces that these are sent *before* final headers are sent. + """ + if self.headers_sent: + msg = "Information response after final response" + raise ProtocolError(msg) + + event = _ResponseSent() + return [event] + + def recv_informational_response(self, previous_state: StreamState) -> list[Event]: + """ + Called when an informational header block is received (that is, a block + where the :status header has a 1XX value). + """ + if self.headers_received: + msg = "Informational response after final response" + raise ProtocolError(msg) + + event = InformationalResponseReceived() + event.stream_id = self.stream_id + return [event] + + def recv_alt_svc(self, previous_state: StreamState) -> list[Event]: + """ + Called when receiving an ALTSVC frame. + + RFC 7838 allows us to receive ALTSVC frames at any stream state, which + is really absurdly overzealous. For that reason, we want to limit the + states in which we can actually receive it. It's really only sensible + to receive it after we've sent our own headers and before the server + has sent its header block: the server can't guarantee that we have any + state around after it completes its header block, and the server + doesn't know what origin we're talking about before we've sent ours. + + For that reason, this function applies a few extra checks on both state + and some of the little state variables we keep around. If those suggest + an unreasonable situation for the ALTSVC frame to have been sent in, + we quietly ignore it (as RFC 7838 suggests). + + This function is also *not* always called by the state machine. In some + states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it, + because we know the frame cannot be valid in that state (IDLE because + the server cannot know what origin the stream applies to, CLOSED + because the server cannot assume we still have state around, + RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL + state then *we* are the server). + """ + # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore + # them. + if self.client is False: + return [] + + # If we've received the response headers from the server they can't + # guarantee we still have any state around. Other implementations + # (like nghttp2) ignore ALTSVC in this state, so we will too. + if self.headers_received: + return [] + + # Otherwise, this is a sensible enough frame to have received. Return + # the event and let it get populated. + return [AlternativeServiceAvailable()] + + def send_alt_svc(self, previous_state: StreamState) -> None: + """ + Called when sending an ALTSVC frame on this stream. + + For consistency with the restrictions we apply on receiving ALTSVC + frames in ``recv_alt_svc``, we want to restrict when users can send + ALTSVC frames to the situations when we ourselves would accept them. + + That means: when we are a server, when we have received the request + headers, and when we have not yet sent our own response headers. + """ + # We should not send ALTSVC after we've sent response headers, as the + # client may have disposed of its state. + if self.headers_sent: + msg = "Cannot send ALTSVC after sending response headers." + raise ProtocolError(msg) + + + +# STATE MACHINE +# +# The stream state machine is defined here to avoid the need to allocate it +# repeatedly for each stream. It cannot be defined in the stream class because +# it needs to be able to reference the callbacks defined on the class, but +# because Python's scoping rules are weird the class object is not actually in +# scope during the body of the class object. +# +# For the sake of clarity, we reproduce the RFC 7540 state machine here: +# +# +--------+ +# send PP | | recv PP +# ,--------| idle |--------. +# / | | \ +# v +--------+ v +# +----------+ | +----------+ +# | | | send H / | | +# ,------| reserved | | recv H | reserved |------. +# | | (local) | | | (remote) | | +# | +----------+ v +----------+ | +# | | +--------+ | | +# | | recv ES | | send ES | | +# | send H | ,-------| open |-------. | recv H | +# | | / | | \ | | +# | v v +--------+ v v | +# | +----------+ | +----------+ | +# | | half | | | half | | +# | | closed | | send R / | closed | | +# | | (remote) | | recv R | (local) | | +# | +----------+ | +----------+ | +# | | | | | +# | | send ES / | recv ES / | | +# | | send R / v send R / | | +# | | recv R +--------+ recv R | | +# | send R / `----------->| |<-----------' send R / | +# | recv R | closed | recv R | +# `----------------------->| |<----------------------' +# +--------+ +# +# send: endpoint sends this frame +# recv: endpoint receives this frame +# +# H: HEADERS frame (with implied CONTINUATIONs) +# PP: PUSH_PROMISE frame (with implied CONTINUATIONs) +# ES: END_STREAM flag +# R: RST_STREAM frame +# +# For the purposes of this state machine we treat HEADERS and their +# associated CONTINUATION frames as a single jumbo frame. The protocol +# allows/requires this by preventing other frames from being interleved in +# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is +# received without a prior HEADERS frame, it *will* be passed to this state +# machine. The state machine should always reject that frame, either as an +# invalid transition or because the stream is closed. +# +# There is a confusing relationship around PUSH_PROMISE frames. The state +# machine above considers them to be frames belonging to the new stream, +# which is *somewhat* true. However, they are sent with the stream ID of +# their related stream, and are only sendable in some cases. +# For this reason, our state machine implementation below allows for +# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also +# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states. +# Essentially, for h2, PUSH_PROMISE frames are effectively sent on +# two streams. +# +# The _transitions dictionary contains a mapping of tuples of +# (state, input) to tuples of (side_effect_function, end_state). This +# map contains all allowed transitions: anything not in this map is +# invalid and immediately causes a transition to ``closed``. +_transitions = { + # State: idle + (StreamState.IDLE, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.request_sent, StreamState.OPEN), + (StreamState.IDLE, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.request_received, StreamState.OPEN), + (StreamState.IDLE, StreamInputs.RECV_DATA): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE): + (H2StreamStateMachine.send_new_pushed_stream, + StreamState.RESERVED_LOCAL), + (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.recv_new_pushed_stream, + StreamState.RESERVED_REMOTE), + (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (None, StreamState.IDLE), + (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT): + (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL), + (StreamState.IDLE, StreamInputs.UPGRADE_SERVER): + (H2StreamStateMachine.request_received, + StreamState.HALF_CLOSED_REMOTE), + + # State: reserved local + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.RESERVED_LOCAL), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL), + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (None, StreamState.RESERVED_LOCAL), + + # State: reserved remote + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.response_received, + StreamState.HALF_CLOSED_LOCAL), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.RESERVED_REMOTE), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE), + (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE), + + # State: open + (StreamState.OPEN, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.response_sent, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.response_received, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_DATA): + (None, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_DATA): + (H2StreamStateMachine.data_received, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_END_STREAM): + (None, StreamState.HALF_CLOSED_LOCAL), + (StreamState.OPEN, StreamInputs.RECV_END_STREAM): + (H2StreamStateMachine.stream_half_closed, + StreamState.HALF_CLOSED_REMOTE), + (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.OPEN, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE): + (H2StreamStateMachine.send_push_promise, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.recv_push_promise, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS): + (H2StreamStateMachine.send_informational_response, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS): + (H2StreamStateMachine.recv_informational_response, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.send_alt_svc, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN), + + # State: half-closed remote + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA): + (None, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM): + (H2StreamStateMachine.send_end_stream, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE): + (H2StreamStateMachine.send_push_promise, + StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS): + (H2StreamStateMachine.send_informational_response, + StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE), + + # State: half-closed local + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.response_received, + StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA): + (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM): + (H2StreamStateMachine.stream_ended, StreamState.CLOSED), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.recv_push_promise, + StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS): + (H2StreamStateMachine.recv_informational_response, + StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL), + + # State: closed + (StreamState.CLOSED, StreamInputs.RECV_END_STREAM): + (None, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (None, StreamState.CLOSED), + + # RFC 7540 Section 5.1 defines how the end point should react when + # receiving a frame on a closed stream with the following statements: + # + # > An endpoint that receives any frame other than PRIORITY after receiving + # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED. + # > An endpoint that receives any frames after receiving a frame with the + # > END_STREAM flag set MUST treat that as a connection error of type + # > STREAM_CLOSED. + (StreamState.CLOSED, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.RECV_DATA): + (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), + + # > WINDOW_UPDATE or RST_STREAM frames can be received in this state + # > for a short period after a DATA or HEADERS frame containing a + # > END_STREAM flag is sent, as instructed in RFC 7540 Section 5.1. But we + # > don't have access to a clock so we just always allow it. + (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE): + (None, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM): + (None, StreamState.CLOSED), + + # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is + # > neither "open" nor "half-closed (local)" as a connection error of type + # > PROTOCOL_ERROR. + (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED), + + # Also, users should be forbidden from sending on closed streams. + (StreamState.CLOSED, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE): + (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_DATA): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_END_STREAM): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), +} + + +class H2Stream: + """ + A low-level HTTP/2 stream object. This handles building and receiving + frames and maintains per-stream state. + + This wraps a HTTP/2 Stream state machine implementation, ensuring that + frames can only be sent/received when the stream is in a valid state. + Attempts to create frames that cannot be sent will raise a + ``ProtocolError``. + """ + + def __init__(self, + stream_id: int, + config: H2Configuration, + inbound_window_size: int, + outbound_window_size: int) -> None: + self.state_machine = H2StreamStateMachine(stream_id) + self.stream_id = stream_id + self.max_outbound_frame_size: int | None = None + self.request_method: bytes | None = None + + # The current value of the outbound stream flow control window + self.outbound_flow_control_window = outbound_window_size + + # The flow control manager. + self._inbound_window_manager = WindowManager(inbound_window_size) + + # The expected content length, if any. + self._expected_content_length: int | None = None + + # The actual received content length. Always tracked. + self._actual_content_length = 0 + + # The authority we believe this stream belongs to. + self._authority: bytes | None = None + + # The configuration for this stream. + self.config = config + + def __repr__(self) -> str: + return f"<{type(self).__name__} id:{self.stream_id} state:{self.state_machine.state!r}>" + + @property + def inbound_flow_control_window(self) -> int: + """ + The size of the inbound flow control window for the stream. This is + rarely publicly useful: instead, use :meth:`remote_flow_control_window + <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is + largely present to provide a shortcut to this data. + """ + return self._inbound_window_manager.current_window_size + + @property + def open(self) -> bool: + """ + Whether the stream is 'open' in any sense: that is, whether it counts + against the number of concurrent streams. + """ + # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either + # the OPEN state or either of the HALF_CLOSED states. Perplexingly, + # this excludes the reserved states. + # For more detail on why we're doing this in this slightly weird way, + # see the comment on ``STREAM_OPEN`` at the top of the file. + return STREAM_OPEN[self.state_machine.state] + + @property + def closed(self) -> bool: + """ + Whether the stream is closed. + """ + return self.state_machine.state == StreamState.CLOSED + + @property + def closed_by(self) -> StreamClosedBy | None: + """ + Returns how the stream was closed, as one of StreamClosedBy. + """ + return self.state_machine.stream_closed_by + + def upgrade(self, client_side: bool) -> None: + """ + Called by the connection to indicate that this stream is the initial + request/response of an upgraded connection. Places the stream into an + appropriate state. + """ + self.config.logger.debug("Upgrading %r", self) + + assert self.stream_id == 1 + input_ = ( + StreamInputs.UPGRADE_CLIENT if client_side + else StreamInputs.UPGRADE_SERVER + ) + + # This may return events, we deliberately don't want them. + self.state_machine.process_input(input_) + + def send_headers(self, + headers: Iterable[HeaderWeaklyTyped], + encoder: Encoder, + end_stream: bool = False) -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]: + """ + Returns a list of HEADERS/CONTINUATION frames to emit as either headers + or trailers. + """ + self.config.logger.debug("Send headers %s on %r", headers, self) + + # Because encoding headers makes an irreversible change to the header + # compression context, we make the state transition before we encode + # them. + + # First, check if we're a client. If we are, no problem: if we aren't, + # we need to scan the header block to see if this is an informational + # response. + input_ = StreamInputs.SEND_HEADERS + + bytes_headers = utf8_encode_headers(headers) + + if ((not self.state_machine.client) and + is_informational_response(bytes_headers)): + if end_stream: + msg = "Cannot set END_STREAM on informational responses." + raise ProtocolError(msg) + + input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS + + events = self.state_machine.process_input(input_) + + hf = HeadersFrame(self.stream_id) + hdr_validation_flags = self._build_hdr_validation_flags(events) + frames = self._build_headers_frames( + bytes_headers, encoder, hf, hdr_validation_flags, + ) + + if end_stream: + # Not a bug: the END_STREAM flag is valid on the initial HEADERS + # frame, not the CONTINUATION frames that follow. + self.state_machine.process_input(StreamInputs.SEND_END_STREAM) + frames[0].flags.add("END_STREAM") + + if self.state_machine.trailers_sent and not end_stream: + msg = "Trailers must have END_STREAM set." + raise ProtocolError(msg) + + if self.state_machine.client and self._authority is None: + self._authority = authority_from_headers(bytes_headers) + + # store request method for _initialize_content_length + self.request_method = extract_method_header(bytes_headers) + + return frames + + def push_stream_in_band(self, + related_stream_id: int, + headers: Iterable[HeaderWeaklyTyped], + encoder: Encoder) -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]: + """ + Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed + stream header. Called on the stream that has the PUSH_PROMISE frame + sent on it. + """ + self.config.logger.debug("Push stream %r", self) + + # Because encoding headers makes an irreversible change to the header + # compression context, we make the state transition *first*. + + events = self.state_machine.process_input( + StreamInputs.SEND_PUSH_PROMISE, + ) + + ppf = PushPromiseFrame(self.stream_id) + ppf.promised_stream_id = related_stream_id + hdr_validation_flags = self._build_hdr_validation_flags(events) + + bytes_headers = utf8_encode_headers(headers) + + return self._build_headers_frames( + bytes_headers, encoder, ppf, hdr_validation_flags, + ) + + + def locally_pushed(self) -> list[Frame]: + """ + Mark this stream as one that was pushed by this peer. Must be called + immediately after initialization. Sends no frames, simply updates the + state machine. + """ + # This does not trigger any events. + events = self.state_machine.process_input( + StreamInputs.SEND_PUSH_PROMISE, + ) + assert not events + return [] + + def send_data(self, + data: bytes | memoryview, + end_stream: bool = False, + pad_length: int | None = None) -> list[Frame]: + """ + Prepare some data frames. Optionally end the stream. + + .. warning:: Does not perform flow control checks. + """ + self.config.logger.debug( + "Send data on %r with end stream set to %s", self, end_stream, + ) + + self.state_machine.process_input(StreamInputs.SEND_DATA) + + df = DataFrame(self.stream_id) + df.data = data + if end_stream: + self.state_machine.process_input(StreamInputs.SEND_END_STREAM) + df.flags.add("END_STREAM") + if pad_length is not None: + df.flags.add("PADDED") + df.pad_length = pad_length + + # Subtract flow_controlled_length to account for possible padding + self.outbound_flow_control_window -= df.flow_controlled_length + assert self.outbound_flow_control_window >= 0 + + return [df] + + def end_stream(self) -> list[Frame]: + """ + End a stream without sending data. + """ + self.config.logger.debug("End stream %r", self) + + self.state_machine.process_input(StreamInputs.SEND_END_STREAM) + df = DataFrame(self.stream_id) + df.flags.add("END_STREAM") + return [df] + + def advertise_alternative_service(self, field_value: bytes) -> list[Frame]: + """ + Advertise an RFC 7838 alternative service. The semantics of this are + better documented in the ``H2Connection`` class. + """ + self.config.logger.debug( + "Advertise alternative service of %r for %r", field_value, self, + ) + self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE) + asf = AltSvcFrame(self.stream_id) + asf.field = field_value + return [asf] + + def increase_flow_control_window(self, increment: int) -> list[Frame]: + """ + Increase the size of the flow control window for the remote side. + """ + self.config.logger.debug( + "Increase flow control window for %r by %d", + self, increment, + ) + self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE) + self._inbound_window_manager.window_opened(increment) + + wuf = WindowUpdateFrame(self.stream_id) + wuf.window_increment = increment + return [wuf] + + def receive_push_promise_in_band(self, + promised_stream_id: int, + headers: Iterable[Header], + header_encoding: bool | str | None) -> tuple[list[Frame], list[Event]]: + """ + Receives a push promise frame sent on this stream, pushing a remote + stream. This is called on the stream that has the PUSH_PROMISE sent + on it. + """ + self.config.logger.debug( + "Receive Push Promise on %r for remote stream %d", + self, promised_stream_id, + ) + events = self.state_machine.process_input( + StreamInputs.RECV_PUSH_PROMISE, + ) + events[0].pushed_stream_id = promised_stream_id + + hdr_validation_flags = self._build_hdr_validation_flags(events) + events[0].headers = self._process_received_headers( + headers, hdr_validation_flags, header_encoding, + ) + return [], events + + def remotely_pushed(self, pushed_headers: Iterable[Header]) -> tuple[list[Frame], list[Event]]: + """ + Mark this stream as one that was pushed by the remote peer. Must be + called immediately after initialization. Sends no frames, simply + updates the state machine. + """ + self.config.logger.debug("%r pushed by remote peer", self) + events = self.state_machine.process_input( + StreamInputs.RECV_PUSH_PROMISE, + ) + self._authority = authority_from_headers(pushed_headers) + return [], events + + def receive_headers(self, + headers: Iterable[Header], + end_stream: bool, + header_encoding: bool | str | None) -> tuple[list[Frame], list[Event]]: + """ + Receive a set of headers (or trailers). + """ + if is_informational_response(headers): + if end_stream: + msg = "Cannot set END_STREAM on informational responses" + raise ProtocolError(msg) + input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS + else: + input_ = StreamInputs.RECV_HEADERS + + events = self.state_machine.process_input(input_) + + if end_stream: + es_events = self.state_machine.process_input( + StreamInputs.RECV_END_STREAM, + ) + events[0].stream_ended = es_events[0] + events += es_events + + self._initialize_content_length(headers) + + if isinstance(events[0], TrailersReceived) and not end_stream: + msg = "Trailers must have END_STREAM set" + raise ProtocolError(msg) + + hdr_validation_flags = self._build_hdr_validation_flags(events) + events[0].headers = self._process_received_headers( + headers, hdr_validation_flags, header_encoding, + ) + return [], events + + def receive_data(self, data: bytes, end_stream: bool, flow_control_len: int) -> tuple[list[Frame], list[Event]]: + """ + Receive some data. + """ + self.config.logger.debug( + "Receive data on %r with end stream %s and flow control length " + "set to %d", self, end_stream, flow_control_len, + ) + events = self.state_machine.process_input(StreamInputs.RECV_DATA) + self._inbound_window_manager.window_consumed(flow_control_len) + self._track_content_length(len(data), end_stream) + + if end_stream: + es_events = self.state_machine.process_input( + StreamInputs.RECV_END_STREAM, + ) + events[0].stream_ended = es_events[0] + events.extend(es_events) + + events[0].data = data + events[0].flow_controlled_length = flow_control_len + return [], events + + def receive_window_update(self, increment: int) -> tuple[list[Frame], list[Event]]: + """ + Handle a WINDOW_UPDATE increment. + """ + self.config.logger.debug( + "Receive Window Update on %r for increment of %d", + self, increment, + ) + events = self.state_machine.process_input( + StreamInputs.RECV_WINDOW_UPDATE, + ) + frames = [] + + # If we encounter a problem with incrementing the flow control window, + # this should be treated as a *stream* error, not a *connection* error. + # That means we need to catch the error and forcibly close the stream. + if events: + events[0].delta = increment + try: + self.outbound_flow_control_window = guard_increment_window( + self.outbound_flow_control_window, + increment, + ) + except FlowControlError: + # Ok, this is bad. We're going to need to perform a local + # reset. + event = StreamReset() + event.stream_id = self.stream_id + event.error_code = ErrorCodes.FLOW_CONTROL_ERROR + event.remote_reset = False + + events = [event] + frames = self.reset_stream(event.error_code) + + return frames, events + + def receive_continuation(self) -> None: + """ + A naked CONTINUATION frame has been received. This is always an error, + but the type of error it is depends on the state of the stream and must + transition the state of the stream, so we need to handle it. + """ + self.config.logger.debug("Receive Continuation frame on %r", self) + self.state_machine.process_input( + StreamInputs.RECV_CONTINUATION, + ) + msg = "Should not be reachable" # pragma: no cover + raise AssertionError(msg) # pragma: no cover + + def receive_alt_svc(self, frame: AltSvcFrame) -> tuple[list[Frame], list[Event]]: + """ + An Alternative Service frame was received on the stream. This frame + inherits the origin associated with this stream. + """ + self.config.logger.debug( + "Receive Alternative Service frame on stream %r", self, + ) + + # If the origin is present, RFC 7838 says we have to ignore it. + if frame.origin: + return [], [] + + events = self.state_machine.process_input( + StreamInputs.RECV_ALTERNATIVE_SERVICE, + ) + + # There are lots of situations where we want to ignore the ALTSVC + # frame. If we need to pay attention, we'll have an event and should + # fill it out. + if events: + assert isinstance(events[0], AlternativeServiceAvailable) + events[0].origin = self._authority + events[0].field_value = frame.field + + return [], events + + def reset_stream(self, error_code: ErrorCodes | int = 0) -> list[Frame]: + """ + Close the stream locally. Reset the stream with an error code. + """ + self.config.logger.debug( + "Local reset %r with error code: %d", self, error_code, + ) + self.state_machine.process_input(StreamInputs.SEND_RST_STREAM) + + rsf = RstStreamFrame(self.stream_id) + rsf.error_code = error_code + return [rsf] + + def stream_reset(self, frame: RstStreamFrame) -> tuple[list[Frame], list[Event]]: + """ + Handle a stream being reset remotely. + """ + self.config.logger.debug( + "Remote reset %r with error code: %d", self, frame.error_code, + ) + events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM) + + if events: + # We don't fire an event if this stream is already closed. + events[0].error_code = _error_code_from_int(frame.error_code) + + return [], events + + def acknowledge_received_data(self, acknowledged_size: int) -> list[Frame]: + """ + The user has informed us that they've processed some amount of data + that was received on this stream. Pass that to the window manager and + potentially return some WindowUpdate frames. + """ + self.config.logger.debug( + "Acknowledge received data with size %d on %r", + acknowledged_size, self, + ) + increment = self._inbound_window_manager.process_bytes( + acknowledged_size, + ) + if increment: + f = WindowUpdateFrame(self.stream_id) + f.window_increment = increment + return [f] + + return [] + + def _build_hdr_validation_flags(self, events: Any) -> HeaderValidationFlags: + """ + Constructs a set of header validation flags for use when normalizing + and validating header blocks. + """ + is_trailer = isinstance( + events[0], (_TrailersSent, TrailersReceived), + ) + is_response_header = isinstance( + events[0], + ( + _ResponseSent, + ResponseReceived, + InformationalResponseReceived, + ), + ) + is_push_promise = isinstance( + events[0], (PushedStreamReceived, _PushedRequestSent), + ) + + return HeaderValidationFlags( + is_client=self.state_machine.client or False, + is_trailer=is_trailer, + is_response_header=is_response_header, + is_push_promise=is_push_promise, + ) + + def _build_headers_frames(self, + headers: Iterable[Header], + encoder: Encoder, + first_frame: HeadersFrame | PushPromiseFrame, + hdr_validation_flags: HeaderValidationFlags) \ + -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]: + """ + Helper method to build headers or push promise frames. + """ + # We need to lowercase the header names, and to ensure that secure + # header fields are kept out of compression contexts. + if self.config.normalize_outbound_headers: + # also we may want to split outbound cookies to improve + # headers compression + should_split_outbound_cookies = self.config.split_outbound_cookies + + headers = normalize_outbound_headers( + headers, hdr_validation_flags, should_split_outbound_cookies, + ) + if self.config.validate_outbound_headers: + headers = validate_outbound_headers( + headers, hdr_validation_flags, + ) + + encoded_headers = encoder.encode(headers) + + # Slice into blocks of max_outbound_frame_size. Be careful with this: + # it only works right because we never send padded frames or priority + # information on the frames. Revisit this if we do. + header_blocks = [ + encoded_headers[i:i+(self.max_outbound_frame_size or 0)] + for i in range( + 0, len(encoded_headers), (self.max_outbound_frame_size or 0), + ) + ] + + frames: list[HeadersFrame | ContinuationFrame | PushPromiseFrame] = [] + first_frame.data = header_blocks[0] + frames.append(first_frame) + + for block in header_blocks[1:]: + cf = ContinuationFrame(self.stream_id) + cf.data = block + frames.append(cf) + + frames[-1].flags.add("END_HEADERS") + return frames + + def _process_received_headers(self, + headers: Iterable[Header], + header_validation_flags: HeaderValidationFlags, + header_encoding: bool | str | None) -> Iterable[Header]: + """ + When headers have been received from the remote peer, run a processing + pipeline on them to transform them into the appropriate form for + attaching to an event. + """ + if self.config.normalize_inbound_headers: + headers = normalize_inbound_headers( + headers, header_validation_flags, + ) + + if self.config.validate_inbound_headers: + headers = validate_headers(headers, header_validation_flags) + + if isinstance(header_encoding, str): + headers = _decode_headers(headers, header_encoding) + + # The above steps are all generators, so we need to concretize the + # headers now. + return list(headers) + + def _initialize_content_length(self, headers: Iterable[Header]) -> None: + """ + Checks the headers for a content-length header and initializes the + _expected_content_length field from it. It's not an error for no + Content-Length header to be present. + """ + if self.request_method == b"HEAD": + self._expected_content_length = 0 + return + + for n, v in headers: + if n == b"content-length": + try: + self._expected_content_length = int(v, 10) + except ValueError as err: + msg = f"Invalid content-length header: {v!r}" + raise ProtocolError(msg) from err + + return + + def _track_content_length(self, length: int, end_stream: bool) -> None: + """ + Update the expected content length in response to data being received. + Validates that the appropriate amount of data is sent. Always updates + the received data, but only validates the length against the + content-length header if one was sent. + + :param length: The length of the body chunk received. + :param end_stream: If this is the last body chunk received. + """ + self._actual_content_length += length + actual = self._actual_content_length + expected = self._expected_content_length + + if expected is not None: + if expected < actual: + raise InvalidBodyLengthError(expected, actual) + + if end_stream and expected != actual: + raise InvalidBodyLengthError(expected, actual) + + def _inbound_flow_control_change_from_settings(self, delta: int) -> None: + """ + We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to + update the target window size for flow control. For our flow control + strategy, this means we need to do two things: we need to adjust the + current window size, but we also need to set the target maximum window + size to the new value. + """ + new_max_size = self._inbound_window_manager.max_window_size + delta + self._inbound_window_manager.window_opened(delta) + self._inbound_window_manager.max_window_size = new_max_size + + +def _decode_headers(headers: Iterable[HeaderWeaklyTyped], encoding: str) -> Generator[HeaderTuple, None, None]: + """ + Given an iterable of header two-tuples and an encoding, decodes those + headers using that encoding while preserving the type of the header tuple. + This ensures that the use of ``HeaderTuple`` is preserved. + """ + for header in headers: + # This function expects to work on decoded headers, which are always + # HeaderTuple objects. + assert isinstance(header, HeaderTuple) + + name, value = header + assert isinstance(name, bytes) + assert isinstance(value, bytes) + + n = name.decode(encoding) + v = value.decode(encoding) + yield header.__class__(n, v) diff --git a/.venv/lib/python3.12/site-packages/h2/utilities.py b/.venv/lib/python3.12/site-packages/h2/utilities.py new file mode 100644 index 00000000..8cafdbd5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/utilities.py @@ -0,0 +1,696 @@ +""" +h2/utilities +~~~~~~~~~~~~ + +Utility functions that do not belong in a separate module. +""" +from __future__ import annotations + +import collections +import re +from string import whitespace +from typing import TYPE_CHECKING, Any, NamedTuple + +from hpack.struct import HeaderTuple, NeverIndexedHeaderTuple + +from .exceptions import FlowControlError, ProtocolError + +if TYPE_CHECKING: # pragma: no cover + from collections.abc import Generator, Iterable + + from hpack.struct import Header, HeaderWeaklyTyped + +UPPER_RE = re.compile(b"[A-Z]") +SIGIL = ord(b":") +INFORMATIONAL_START = ord(b"1") + + +# A set of headers that are hop-by-hop or connection-specific and thus +# forbidden in HTTP/2. This list comes from RFC 7540 § 8.1.2.2. +CONNECTION_HEADERS = frozenset([ + b"connection", + b"proxy-connection", + b"keep-alive", + b"transfer-encoding", + b"upgrade", +]) + + +_ALLOWED_PSEUDO_HEADER_FIELDS = frozenset([ + b":method", + b":scheme", + b":authority", + b":path", + b":status", + b":protocol", +]) + + +_SECURE_HEADERS = frozenset([ + # May have basic credentials which are vulnerable to dictionary attacks. + b"authorization", + b"proxy-authorization", +]) + + +_REQUEST_ONLY_HEADERS = frozenset([ + b":scheme", + b":path", + b":authority", + b":method", + b":protocol", +]) + + +_RESPONSE_ONLY_HEADERS = frozenset([b":status"]) + + +# A Set of pseudo headers that are only valid if the method is +# CONNECT, see RFC 8441 § 5 +_CONNECT_REQUEST_ONLY_HEADERS = frozenset([b":protocol"]) + + +_WHITESPACE = frozenset(map(ord, whitespace)) + + +def _secure_headers(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags | None) -> Generator[Header, None, None]: + """ + Certain headers are at risk of being attacked during the header compression + phase, and so need to be kept out of header compression contexts. This + function automatically transforms certain specific headers into HPACK + never-indexed fields to ensure they don't get added to header compression + contexts. + + This function currently implements two rules: + + - 'authorization' and 'proxy-authorization' fields are automatically made + never-indexed. + - Any 'cookie' header field shorter than 20 bytes long is made + never-indexed. + + These fields are the most at-risk. These rules are inspired by Firefox + and nghttp2. + """ + for header in headers: + assert isinstance(header[0], bytes) + if header[0] in _SECURE_HEADERS or (header[0] in b"cookie" and len(header[1]) < 20): + yield NeverIndexedHeaderTuple(header[0], header[1]) + else: + yield header + + +def extract_method_header(headers: Iterable[Header]) -> bytes | None: + """ + Extracts the request method from the headers list. + """ + for k, v in headers: + if isinstance(v, bytes) and k == b":method": + return v + if isinstance(v, str) and k == ":method": + return v.encode("utf-8") # pragma: no cover + return None + + +def is_informational_response(headers: Iterable[Header]) -> bool: + """ + Searches headers list for a :status header to confirm that a given + collection of headers are an informational response. Assumes the header + are well formed and encoded as bytes: that is, that the HTTP/2 special + headers are first in the block, and so that it can stop looking when it + finds the first header field whose name does not begin with a colon. + + :param headers: The HTTP/2 headers. + :returns: A boolean indicating if this is an informational response. + """ + for n, v in headers: + if not n.startswith(b":"): + return False + if n != b":status": + # If we find a non-special header, we're done here: stop looping. + continue + # If the first digit is a 1, we've got informational headers. + return v.startswith(b"1") + return False + + +def guard_increment_window(current: int, increment: int) -> int: + """ + Increments a flow control window, guarding against that window becoming too + large. + + :param current: The current value of the flow control window. + :param increment: The increment to apply to that window. + :returns: The new value of the window. + :raises: ``FlowControlError`` + """ + # The largest value the flow control window may take. + LARGEST_FLOW_CONTROL_WINDOW = 2**31 - 1 # noqa: N806 + + new_size = current + increment + + if new_size > LARGEST_FLOW_CONTROL_WINDOW: + msg = f"May not increment flow control window past {LARGEST_FLOW_CONTROL_WINDOW}" + raise FlowControlError(msg) + + return new_size + + +def authority_from_headers(headers: Iterable[Header]) -> bytes | None: + """ + Given a header set, searches for the authority header and returns the + value. + + Note that this doesn't use indexing, so should only be called if the + headers are for a client request. Otherwise, will loop over the entire + header set, which is potentially unwise. + + :param headers: The HTTP header set. + :returns: The value of the authority header, or ``None``. + :rtype: ``bytes`` or ``None``. + """ + for n, v in headers: + if n == b":authority": + return v + + return None + + +# Flags used by the validate_headers pipeline to determine which checks +# should be applied to a given set of headers. +class HeaderValidationFlags(NamedTuple): + is_client: bool + is_trailer: bool + is_response_header: bool + is_push_promise: bool + + +def validate_headers(headers: Iterable[Header], hdr_validation_flags: HeaderValidationFlags) -> Iterable[Header]: + """ + Validates a header sequence against a set of constraints from RFC 7540. + + :param headers: The HTTP header set. + :param hdr_validation_flags: An instance of HeaderValidationFlags. + """ + # This validation logic is built on a sequence of generators that are + # iterated over to provide the final header list. This reduces some of the + # overhead of doing this checking. However, it's worth noting that this + # checking remains somewhat expensive, and attempts should be made wherever + # possible to reduce the time spent doing them. + # + # For example, we avoid tuple unpacking in loops because it represents a + # fixed cost that we don't want to spend, instead indexing into the header + # tuples. + headers = _reject_empty_header_names( + headers, hdr_validation_flags, + ) + headers = _reject_uppercase_header_fields( + headers, hdr_validation_flags, + ) + headers = _reject_surrounding_whitespace( + headers, hdr_validation_flags, + ) + headers = _reject_te( + headers, hdr_validation_flags, + ) + headers = _reject_connection_header( + headers, hdr_validation_flags, + ) + headers = _reject_pseudo_header_fields( + headers, hdr_validation_flags, + ) + headers = _check_host_authority_header( + headers, hdr_validation_flags, + ) + return _check_path_header(headers, hdr_validation_flags) + + + +def _reject_empty_header_names(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]: + """ + Raises a ProtocolError if any header names are empty (length 0). + While hpack decodes such headers without errors, they are semantically + forbidden in HTTP, see RFC 7230, stating that they must be at least one + character long. + """ + for header in headers: + if len(header[0]) == 0: + msg = "Received header name with zero length." + raise ProtocolError(msg) + yield header + + +def _reject_uppercase_header_fields(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]: + """ + Raises a ProtocolError if any uppercase character is found in a header + block. + """ + for header in headers: + if UPPER_RE.search(header[0]): + msg = f"Received uppercase header name {header[0]!r}." + raise ProtocolError(msg) + yield header + + +def _reject_surrounding_whitespace(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]: + """ + Raises a ProtocolError if any header name or value is surrounded by + whitespace characters. + """ + # For compatibility with RFC 7230 header fields, we need to allow the field + # value to be an empty string. This is ludicrous, but technically allowed. + # The field name may not be empty, though, so we can safely assume that it + # must have at least one character in it and throw exceptions if it + # doesn't. + for header in headers: + if header[0][0] in _WHITESPACE or header[0][-1] in _WHITESPACE: + msg = f"Received header name surrounded by whitespace {header[0]!r}" + raise ProtocolError(msg) + if header[1] and ((header[1][0] in _WHITESPACE) or + (header[1][-1] in _WHITESPACE)): + msg = f"Received header value surrounded by whitespace {header[1]!r}" + raise ProtocolError(msg) + yield header + + +def _reject_te(headers: Iterable[Header], hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]: + """ + Raises a ProtocolError if the TE header is present in a header block and + its value is anything other than "trailers". + """ + for header in headers: + if header[0] == b"te" and header[1].lower() != b"trailers": + msg = f"Invalid value for TE header: {header[1]!r}" + raise ProtocolError(msg) + + yield header + + +def _reject_connection_header(headers: Iterable[Header], hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]: + """ + Raises a ProtocolError if the Connection header is present in a header + block. + """ + for header in headers: + if header[0] in CONNECTION_HEADERS: + msg = f"Connection-specific header field present: {header[0]!r}." + raise ProtocolError(msg) + + yield header + + +def _assert_header_in_set(bytes_header: bytes, + header_set: set[bytes | str] | set[bytes] | set[str]) -> None: + """ + Given a set of header names, checks whether the string or byte version of + the header name is present. Raises a Protocol error with the appropriate + error if it's missing. + """ + if bytes_header not in header_set: + msg = f"Header block missing mandatory {bytes_header!r} header" + raise ProtocolError(msg) + + +def _reject_pseudo_header_fields(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]: + """ + Raises a ProtocolError if duplicate pseudo-header fields are found in a + header block or if a pseudo-header field appears in a block after an + ordinary header field. + + Raises a ProtocolError if pseudo-header fields are found in trailers. + """ + seen_pseudo_header_fields = set() + seen_regular_header = False + method = None + + for header in headers: + if header[0][0] == SIGIL: + if header[0] in seen_pseudo_header_fields: + msg = f"Received duplicate pseudo-header field {header[0]!r}" + raise ProtocolError(msg) + + seen_pseudo_header_fields.add(header[0]) + + if seen_regular_header: + msg = f"Received pseudo-header field out of sequence: {header[0]!r}" + raise ProtocolError(msg) + + if header[0] not in _ALLOWED_PSEUDO_HEADER_FIELDS: + msg = f"Received custom pseudo-header field {header[0]!r}" + raise ProtocolError(msg) + + if header[0] in b":method": + method = header[1] + + else: + seen_regular_header = True + + yield header + + # Check the pseudo-headers we got to confirm they're acceptable. + _check_pseudo_header_field_acceptability( + seen_pseudo_header_fields, method, hdr_validation_flags, + ) + + +def _check_pseudo_header_field_acceptability(pseudo_headers: set[bytes | str] | set[bytes] | set[str], + method: bytes | None, + hdr_validation_flags: HeaderValidationFlags) -> None: + """ + Given the set of pseudo-headers present in a header block and the + validation flags, confirms that RFC 7540 allows them. + """ + # Pseudo-header fields MUST NOT appear in trailers - RFC 7540 § 8.1.2.1 + if hdr_validation_flags.is_trailer and pseudo_headers: + msg = f"Received pseudo-header in trailer {pseudo_headers}" + raise ProtocolError(msg) + + # If ':status' pseudo-header is not there in a response header, reject it. + # Similarly, if ':path', ':method', or ':scheme' are not there in a request + # header, reject it. Additionally, if a response contains any request-only + # headers or vice-versa, reject it. + # Relevant RFC section: RFC 7540 § 8.1.2.4 + # https://tools.ietf.org/html/rfc7540#section-8.1.2.4 + if hdr_validation_flags.is_response_header: + _assert_header_in_set(b":status", pseudo_headers) + invalid_response_headers = pseudo_headers & _REQUEST_ONLY_HEADERS + if invalid_response_headers: + msg = f"Encountered request-only headers {invalid_response_headers}" + raise ProtocolError(msg) + elif (not hdr_validation_flags.is_response_header and + not hdr_validation_flags.is_trailer): + # This is a request, so we need to have seen :path, :method, and + # :scheme. + _assert_header_in_set(b":path", pseudo_headers) + _assert_header_in_set(b":method", pseudo_headers) + _assert_header_in_set(b":scheme", pseudo_headers) + invalid_request_headers = pseudo_headers & _RESPONSE_ONLY_HEADERS + if invalid_request_headers: + msg = f"Encountered response-only headers {invalid_request_headers}" + raise ProtocolError(msg) + if method != b"CONNECT": + invalid_headers = pseudo_headers & _CONNECT_REQUEST_ONLY_HEADERS + if invalid_headers: + msg = f"Encountered connect-request-only headers {invalid_headers!r}" + raise ProtocolError(msg) + + +def _validate_host_authority_header(headers: Iterable[Header]) -> Generator[Header, None, None]: + """ + Given the :authority and Host headers from a request block that isn't + a trailer, check that: + 1. At least one of these headers is set. + 2. If both headers are set, they match. + + :param headers: The HTTP header set. + :raises: ``ProtocolError`` + """ + # We use None as a sentinel value. Iterate over the list of headers, + # and record the value of these headers (if present). We don't need + # to worry about receiving duplicate :authority headers, as this is + # enforced by the _reject_pseudo_header_fields() pipeline. + # + # TODO: We should also guard against receiving duplicate Host headers, + # and against sending duplicate headers. + authority_header_val = None + host_header_val = None + + for header in headers: + if header[0] == b":authority": + authority_header_val = header[1] + elif header[0] == b"host": + host_header_val = header[1] + + yield header + + # If we have not-None values for these variables, then we know we saw + # the corresponding header. + authority_present = (authority_header_val is not None) + host_present = (host_header_val is not None) + + # It is an error for a request header block to contain neither + # an :authority header nor a Host header. + if not authority_present and not host_present: + msg = "Request header block does not have an :authority or Host header." + raise ProtocolError(msg) + + # If we receive both headers, they should definitely match. + if authority_present and host_present and authority_header_val != host_header_val: + msg = ( + "Request header block has mismatched :authority and " + f"Host headers: {authority_header_val!r} / {host_header_val!r}" + ) + raise ProtocolError(msg) + + +def _check_host_authority_header(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]: + """ + Raises a ProtocolError if a header block arrives that does not contain an + :authority or a Host header, or if a header block contains both fields, + but their values do not match. + """ + # We only expect to see :authority and Host headers on request header + # blocks that aren't trailers, so skip this validation if this is a + # response header or we're looking at trailer blocks. + skip_validation = ( + hdr_validation_flags.is_response_header or + hdr_validation_flags.is_trailer + ) + if skip_validation: + return (h for h in headers) + + return _validate_host_authority_header(headers) + + +def _check_path_header(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]: + """ + Raise a ProtocolError if a header block arrives or is sent that contains an + empty :path header. + """ + def inner() -> Generator[Header, None, None]: + for header in headers: + if header[0] == b":path" and not header[1]: + msg = "An empty :path header is forbidden" + raise ProtocolError(msg) + + yield header + + # We only expect to see :authority and Host headers on request header + # blocks that aren't trailers, so skip this validation if this is a + # response header or we're looking at trailer blocks. + skip_validation = ( + hdr_validation_flags.is_response_header or + hdr_validation_flags.is_trailer + ) + if skip_validation: + return (h for h in headers) + return inner() + + +def _to_bytes(v: bytes | str) -> bytes: + """ + Given an assumed `str` (or anything that supports `.encode()`), + encodes it using utf-8 into bytes. Returns the unmodified object + if it is already a `bytes` object. + """ + return v if isinstance(v, bytes) else v.encode("utf-8") + + +def utf8_encode_headers(headers: Iterable[HeaderWeaklyTyped]) -> list[Header]: + """ + Given an iterable of header two-tuples, rebuilds that as a list with the + header names and values encoded as utf-8 bytes. This function produces + tuples that preserve the original type of the header tuple for tuple and + any ``HeaderTuple``. + """ + encoded_headers: list[Header] = [] + for header in headers: + h = (_to_bytes(header[0]), _to_bytes(header[1])) + if isinstance(header, HeaderTuple): + encoded_headers.append(header.__class__(h[0], h[1])) + else: + encoded_headers.append(h) + return encoded_headers + + +def _lowercase_header_names(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags | None) -> Generator[Header, None, None]: + """ + Given an iterable of header two-tuples, rebuilds that iterable with the + header names lowercased. This generator produces tuples that preserve the + original type of the header tuple for tuple and any ``HeaderTuple``. + """ + for header in headers: + if isinstance(header, HeaderTuple): + yield header.__class__(header[0].lower(), header[1]) + else: + yield (header[0].lower(), header[1]) + + +def _strip_surrounding_whitespace(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags | None) -> Generator[Header, None, None]: + """ + Given an iterable of header two-tuples, strip both leading and trailing + whitespace from both header names and header values. This generator + produces tuples that preserve the original type of the header tuple for + tuple and any ``HeaderTuple``. + """ + for header in headers: + if isinstance(header, HeaderTuple): + yield header.__class__(header[0].strip(), header[1].strip()) + else: + yield (header[0].strip(), header[1].strip()) + + +def _strip_connection_headers(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags | None) -> Generator[Header, None, None]: + """ + Strip any connection headers as per RFC7540 § 8.1.2.2. + """ + for header in headers: + if header[0] not in CONNECTION_HEADERS: + yield header + + +def _check_sent_host_authority_header(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]: + """ + Raises an InvalidHeaderBlockError if we try to send a header block + that does not contain an :authority or a Host header, or if + the header block contains both fields, but their values do not match. + """ + # We only expect to see :authority and Host headers on request header + # blocks that aren't trailers, so skip this validation if this is a + # response header or we're looking at trailer blocks. + skip_validation = ( + hdr_validation_flags.is_response_header or + hdr_validation_flags.is_trailer + ) + if skip_validation: + return (h for h in headers) + + return _validate_host_authority_header(headers) + + +def _combine_cookie_fields(headers: Iterable[Header], hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]: + """ + RFC 7540 § 8.1.2.5 allows HTTP/2 clients to split the Cookie header field, + which must normally appear only once, into multiple fields for better + compression. However, they MUST be joined back up again when received. + This normalization step applies that transform. The side-effect is that + all cookie fields now appear *last* in the header block. + """ + # There is a problem here about header indexing. Specifically, it's + # possible that all these cookies are sent with different header indexing + # values. At this point it shouldn't matter too much, so we apply our own + # logic and make them never-indexed. + cookies: list[bytes] = [] + for header in headers: + if header[0] == b"cookie": + cookies.append(header[1]) + else: + yield header + if cookies: + cookie_val = b"; ".join(cookies) + yield NeverIndexedHeaderTuple(b"cookie", cookie_val) + + +def _split_outbound_cookie_fields(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags | None) -> Generator[Header, None, None]: + """ + RFC 7540 § 8.1.2.5 allows for better compression efficiency, + to split the Cookie header field into separate header fields + + We want to do it for outbound requests, as we are doing for + inbound. + """ + for header in headers: + assert isinstance(header[0], bytes) + assert isinstance(header[1], bytes) + if header[0] == b"cookie": + for cookie_val in header[1].split(b"; "): + if isinstance(header, HeaderTuple): + yield header.__class__(header[0], cookie_val) + else: + yield header[0], cookie_val + else: + yield header + + +def normalize_outbound_headers(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags | None, + should_split_outbound_cookies: bool=False) -> Generator[Header, None, None]: + """ + Normalizes a header sequence that we are about to send. + + :param headers: The HTTP header set. + :param hdr_validation_flags: An instance of HeaderValidationFlags. + :param should_split_outbound_cookies: boolean flag + """ + headers = _lowercase_header_names(headers, hdr_validation_flags) + if should_split_outbound_cookies: + headers = _split_outbound_cookie_fields(headers, hdr_validation_flags) + headers = _strip_surrounding_whitespace(headers, hdr_validation_flags) + headers = _strip_connection_headers(headers, hdr_validation_flags) + return _secure_headers(headers, hdr_validation_flags) + + + +def normalize_inbound_headers(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]: + """ + Normalizes a header sequence that we have received. + + :param headers: The HTTP header set. + :param hdr_validation_flags: An instance of HeaderValidationFlags + """ + return _combine_cookie_fields(headers, hdr_validation_flags) + + +def validate_outbound_headers(headers: Iterable[Header], + hdr_validation_flags: HeaderValidationFlags) -> Generator[Header, None, None]: + """ + Validates and normalizes a header sequence that we are about to send. + + :param headers: The HTTP header set. + :param hdr_validation_flags: An instance of HeaderValidationFlags. + """ + headers = _reject_te( + headers, hdr_validation_flags, + ) + headers = _reject_connection_header( + headers, hdr_validation_flags, + ) + headers = _reject_pseudo_header_fields( + headers, hdr_validation_flags, + ) + headers = _check_sent_host_authority_header( + headers, hdr_validation_flags, + ) + return _check_path_header(headers, hdr_validation_flags) + + + +class SizeLimitDict(collections.OrderedDict[int, Any]): + + def __init__(self, *args: dict[int, int], **kwargs: Any) -> None: + self._size_limit = kwargs.pop("size_limit", None) + super().__init__(*args, **kwargs) + + self._check_size_limit() + + def __setitem__(self, key: int, value: Any | int) -> None: + super().__setitem__(key, value) + + self._check_size_limit() + + def _check_size_limit(self) -> None: + if self._size_limit is not None: + while len(self) > self._size_limit: + self.popitem(last=False) diff --git a/.venv/lib/python3.12/site-packages/h2/windows.py b/.venv/lib/python3.12/site-packages/h2/windows.py new file mode 100644 index 00000000..0efdd9fe --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/windows.py @@ -0,0 +1,133 @@ +""" +h2/windows +~~~~~~~~~~ + +Defines tools for managing HTTP/2 flow control windows. + +The objects defined in this module are used to automatically manage HTTP/2 +flow control windows. Specifically, they keep track of what the size of the +window is, how much data has been consumed from that window, and how much data +the user has already used. It then implements a basic algorithm that attempts +to manage the flow control window without user input, trying to ensure that it +does not emit too many WINDOW_UPDATE frames. +""" +from __future__ import annotations + +from .exceptions import FlowControlError + +# The largest acceptable value for a HTTP/2 flow control window. +LARGEST_FLOW_CONTROL_WINDOW = 2**31 - 1 + + +class WindowManager: + """ + A basic HTTP/2 window manager. + + :param max_window_size: The maximum size of the flow control window. + :type max_window_size: ``int`` + """ + + def __init__(self, max_window_size: int) -> None: + assert max_window_size <= LARGEST_FLOW_CONTROL_WINDOW + self.max_window_size = max_window_size + self.current_window_size = max_window_size + self._bytes_processed = 0 + + def window_consumed(self, size: int) -> None: + """ + We have received a certain number of bytes from the remote peer. This + necessarily shrinks the flow control window! + + :param size: The number of flow controlled bytes we received from the + remote peer. + :type size: ``int`` + :returns: Nothing. + :rtype: ``None`` + """ + self.current_window_size -= size + if self.current_window_size < 0: + msg = "Flow control window shrunk below 0" + raise FlowControlError(msg) + + def window_opened(self, size: int) -> None: + """ + The flow control window has been incremented, either because of manual + flow control management or because of the user changing the flow + control settings. This can have the effect of increasing what we + consider to be the "maximum" flow control window size. + + This does not increase our view of how many bytes have been processed, + only of how much space is in the window. + + :param size: The increment to the flow control window we received. + :type size: ``int`` + :returns: Nothing + :rtype: ``None`` + """ + self.current_window_size += size + + if self.current_window_size > LARGEST_FLOW_CONTROL_WINDOW: + msg = f"Flow control window mustn't exceed {LARGEST_FLOW_CONTROL_WINDOW}" + raise FlowControlError(msg) + + self.max_window_size = max(self.current_window_size, self.max_window_size) + + def process_bytes(self, size: int) -> int | None: + """ + The application has informed us that it has processed a certain number + of bytes. This may cause us to want to emit a window update frame. If + we do want to emit a window update frame, this method will return the + number of bytes that we should increment the window by. + + :param size: The number of flow controlled bytes that the application + has processed. + :type size: ``int`` + :returns: The number of bytes to increment the flow control window by, + or ``None``. + :rtype: ``int`` or ``None`` + """ + self._bytes_processed += size + return self._maybe_update_window() + + def _maybe_update_window(self) -> int | None: + """ + Run the algorithm. + + Our current algorithm can be described like this. + + 1. If no bytes have been processed, we immediately return 0. There is + no meaningful way for us to hand space in the window back to the + remote peer, so let's not even try. + 2. If there is no space in the flow control window, and we have + processed at least 1024 bytes (or 1/4 of the window, if the window + is smaller), we will emit a window update frame. This is to avoid + the risk of blocking a stream altogether. + 3. If there is space in the flow control window, and we have processed + at least 1/2 of the window worth of bytes, we will emit a window + update frame. This is to minimise the number of window update frames + we have to emit. + + In a healthy system with large flow control windows, this will + irregularly emit WINDOW_UPDATE frames. This prevents us starving the + connection by emitting eleventy bajillion WINDOW_UPDATE frames, + especially in situations where the remote peer is sending a lot of very + small DATA frames. + """ + # TODO: Can the window be smaller than 1024 bytes? If not, we can + # streamline this algorithm. + if not self._bytes_processed: + return None + + max_increment = (self.max_window_size - self.current_window_size) + increment = 0 + + # Note that, even though we may increment less than _bytes_processed, + # we still want to set it to zero whenever we emit an increment. This + # is because we'll always increment up to the maximum we can. + if ((self.current_window_size == 0) and ( + self._bytes_processed > min(1024, self.max_window_size // 4))) or self._bytes_processed >= (self.max_window_size // 2): + increment = min(self._bytes_processed, max_increment) + self._bytes_processed = 0 + + self.current_window_size += increment + return increment |