From 4a52a71956a8d46fcb7294ac71734504bb09bcc2 Mon Sep 17 00:00:00 2001 From: S. Solomon Darnell Date: Fri, 28 Mar 2025 21:52:21 -0500 Subject: two version of R2R are here --- .venv/lib/python3.12/site-packages/h2/stream.py | 1417 +++++++++++++++++++++++ 1 file changed, 1417 insertions(+) create mode 100644 .venv/lib/python3.12/site-packages/h2/stream.py (limited to '.venv/lib/python3.12/site-packages/h2/stream.py') diff --git a/.venv/lib/python3.12/site-packages/h2/stream.py b/.venv/lib/python3.12/site-packages/h2/stream.py new file mode 100644 index 00000000..7d4a12e3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/h2/stream.py @@ -0,0 +1,1417 @@ +""" +h2/stream +~~~~~~~~~ + +An implementation of a HTTP/2 stream. +""" +from __future__ import annotations + +from enum import Enum, IntEnum +from typing import TYPE_CHECKING, Any + +from hpack import HeaderTuple +from hyperframe.frame import AltSvcFrame, ContinuationFrame, DataFrame, Frame, HeadersFrame, PushPromiseFrame, RstStreamFrame, WindowUpdateFrame + +from .errors import ErrorCodes, _error_code_from_int +from .events import ( + AlternativeServiceAvailable, + DataReceived, + Event, + InformationalResponseReceived, + PushedStreamReceived, + RequestReceived, + ResponseReceived, + StreamEnded, + StreamReset, + TrailersReceived, + WindowUpdated, + _PushedRequestSent, + _RequestSent, + _ResponseSent, + _TrailersSent, +) +from .exceptions import FlowControlError, InvalidBodyLengthError, ProtocolError, StreamClosedError +from .utilities import ( + HeaderValidationFlags, + authority_from_headers, + extract_method_header, + guard_increment_window, + is_informational_response, + normalize_inbound_headers, + normalize_outbound_headers, + utf8_encode_headers, + validate_headers, + validate_outbound_headers, +) +from .windows import WindowManager + +if TYPE_CHECKING: # pragma: no cover + from collections.abc import Generator, Iterable + + from hpack.hpack import Encoder + from hpack.struct import Header, HeaderWeaklyTyped + + from .config import H2Configuration + + +class StreamState(IntEnum): + IDLE = 0 + RESERVED_REMOTE = 1 + RESERVED_LOCAL = 2 + OPEN = 3 + HALF_CLOSED_REMOTE = 4 + HALF_CLOSED_LOCAL = 5 + CLOSED = 6 + + +class StreamInputs(Enum): + SEND_HEADERS = 0 + SEND_PUSH_PROMISE = 1 + SEND_RST_STREAM = 2 + SEND_DATA = 3 + SEND_WINDOW_UPDATE = 4 + SEND_END_STREAM = 5 + RECV_HEADERS = 6 + RECV_PUSH_PROMISE = 7 + RECV_RST_STREAM = 8 + RECV_DATA = 9 + RECV_WINDOW_UPDATE = 10 + RECV_END_STREAM = 11 + RECV_CONTINUATION = 12 # Added in 2.0.0 + SEND_INFORMATIONAL_HEADERS = 13 # Added in 2.2.0 + RECV_INFORMATIONAL_HEADERS = 14 # Added in 2.2.0 + SEND_ALTERNATIVE_SERVICE = 15 # Added in 2.3.0 + RECV_ALTERNATIVE_SERVICE = 16 # Added in 2.3.0 + UPGRADE_CLIENT = 17 # Added 2.3.0 + UPGRADE_SERVER = 18 # Added 2.3.0 + + +class StreamClosedBy(Enum): + SEND_END_STREAM = 0 + RECV_END_STREAM = 1 + SEND_RST_STREAM = 2 + RECV_RST_STREAM = 3 + + +# This array is initialized once, and is indexed by the stream states above. +# It indicates whether a stream in the given state is open. The reason we do +# this is that we potentially check whether a stream in a given state is open +# quite frequently: given that we check so often, we should do so in the +# fastest and most performant way possible. +STREAM_OPEN = [False for _ in range(len(StreamState))] +STREAM_OPEN[StreamState.OPEN] = True +STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True +STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True + + +class H2StreamStateMachine: + """ + A single HTTP/2 stream state machine. + + This stream object implements basically the state machine described in + RFC 7540 section 5.1. + + :param stream_id: The stream ID of this stream. This is stored primarily + for logging purposes. + """ + + def __init__(self, stream_id: int) -> None: + self.state = StreamState.IDLE + self.stream_id = stream_id + + #: Whether this peer is the client side of this stream. + self.client: bool | None = None + + # Whether trailers have been sent/received on this stream or not. + self.headers_sent: bool | None = None + self.trailers_sent: bool | None = None + self.headers_received: bool | None = None + self.trailers_received: bool | None = None + + # How the stream was closed. One of StreamClosedBy. + self.stream_closed_by: StreamClosedBy | None = None + + def process_input(self, input_: StreamInputs) -> Any: + """ + Process a specific input in the state machine. + """ + if not isinstance(input_, StreamInputs): + msg = "Input must be an instance of StreamInputs" + raise ValueError(msg) # noqa: TRY004 + + try: + func, target_state = _transitions[(self.state, input_)] + except KeyError as err: + old_state = self.state + self.state = StreamState.CLOSED + msg = f"Invalid input {input_} in state {old_state}" + raise ProtocolError(msg) from err + else: + previous_state = self.state + self.state = target_state + if func is not None: + try: + return func(self, previous_state) + except ProtocolError: + self.state = StreamState.CLOSED + raise + except AssertionError as err: # pragma: no cover + self.state = StreamState.CLOSED + raise ProtocolError(err) from err + + return [] + + def request_sent(self, previous_state: StreamState) -> list[Event]: + """ + Fires when a request is sent. + """ + self.client = True + self.headers_sent = True + event = _RequestSent() + + return [event] + + def response_sent(self, previous_state: StreamState) -> list[Event]: + """ + Fires when something that should be a response is sent. This 'response' + may actually be trailers. + """ + if not self.headers_sent: + if self.client is True or self.client is None: + msg = "Client cannot send responses." + raise ProtocolError(msg) + self.headers_sent = True + return [_ResponseSent()] + assert not self.trailers_sent + self.trailers_sent = True + return [_TrailersSent()] + + def request_received(self, previous_state: StreamState) -> list[Event]: + """ + Fires when a request is received. + """ + assert not self.headers_received + assert not self.trailers_received + + self.client = False + self.headers_received = True + event = RequestReceived() + event.stream_id = self.stream_id + return [event] + + def response_received(self, previous_state: StreamState) -> list[Event]: + """ + Fires when a response is received. Also disambiguates between responses + and trailers. + """ + event: ResponseReceived | TrailersReceived + if not self.headers_received: + assert self.client is True + self.headers_received = True + event = ResponseReceived() + else: + assert not self.trailers_received + self.trailers_received = True + event = TrailersReceived() + + event.stream_id = self.stream_id + return [event] + + def data_received(self, previous_state: StreamState) -> list[Event]: + """ + Fires when data is received. + """ + if not self.headers_received: + msg = "cannot receive data before headers" + raise ProtocolError(msg) + event = DataReceived() + event.stream_id = self.stream_id + return [event] + + def window_updated(self, previous_state: StreamState) -> list[Event]: + """ + Fires when a window update frame is received. + """ + event = WindowUpdated() + event.stream_id = self.stream_id + return [event] + + def stream_half_closed(self, previous_state: StreamState) -> list[Event]: + """ + Fires when an END_STREAM flag is received in the OPEN state, + transitioning this stream to a HALF_CLOSED_REMOTE state. + """ + event = StreamEnded() + event.stream_id = self.stream_id + return [event] + + def stream_ended(self, previous_state: StreamState) -> list[Event]: + """ + Fires when a stream is cleanly ended. + """ + self.stream_closed_by = StreamClosedBy.RECV_END_STREAM + event = StreamEnded() + event.stream_id = self.stream_id + return [event] + + def stream_reset(self, previous_state: StreamState) -> list[Event]: + """ + Fired when a stream is forcefully reset. + """ + self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM + event = StreamReset() + event.stream_id = self.stream_id + return [event] + + def send_new_pushed_stream(self, previous_state: StreamState) -> list[Event]: + """ + Fires on the newly pushed stream, when pushed by the local peer. + + No event here, but definitionally this peer must be a server. + """ + assert self.client is None + self.client = False + self.headers_received = True + return [] + + def recv_new_pushed_stream(self, previous_state: StreamState) -> list[Event]: + """ + Fires on the newly pushed stream, when pushed by the remote peer. + + No event here, but definitionally this peer must be a client. + """ + assert self.client is None + self.client = True + self.headers_sent = True + return [] + + def send_push_promise(self, previous_state: StreamState) -> list[Event]: + """ + Fires on the already-existing stream when a PUSH_PROMISE frame is sent. + We may only send PUSH_PROMISE frames if we're a server. + """ + if self.client is True: + msg = "Cannot push streams from client peers." + raise ProtocolError(msg) + + event = _PushedRequestSent() + return [event] + + def recv_push_promise(self, previous_state: StreamState) -> list[Event]: + """ + Fires on the already-existing stream when a PUSH_PROMISE frame is + received. We may only receive PUSH_PROMISE frames if we're a client. + + Fires a PushedStreamReceived event. + """ + if not self.client: + if self.client is None: # pragma: no cover + msg = "Idle streams cannot receive pushes" + else: # pragma: no cover + msg = "Cannot receive pushed streams as a server" + raise ProtocolError(msg) + + event = PushedStreamReceived() + event.parent_stream_id = self.stream_id + return [event] + + def send_end_stream(self, previous_state: StreamState) -> None: + """ + Called when an attempt is made to send END_STREAM in the + HALF_CLOSED_REMOTE state. + """ + self.stream_closed_by = StreamClosedBy.SEND_END_STREAM + + def send_reset_stream(self, previous_state: StreamState) -> None: + """ + Called when an attempt is made to send RST_STREAM in a non-closed + stream state. + """ + self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM + + def reset_stream_on_error(self, previous_state: StreamState) -> None: + """ + Called when we need to forcefully emit another RST_STREAM frame on + behalf of the state machine. + + If this is the first time we've done this, we should also hang an event + off the StreamClosedError so that the user can be informed. We know + it's the first time we've done this if the stream is currently in a + state other than CLOSED. + """ + self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM + + error = StreamClosedError(self.stream_id) + + event = StreamReset() + event.stream_id = self.stream_id + event.error_code = ErrorCodes.STREAM_CLOSED + event.remote_reset = False + error._events = [event] + raise error + + def recv_on_closed_stream(self, previous_state: StreamState) -> None: + """ + Called when an unexpected frame is received on an already-closed + stream. + + An endpoint that receives an unexpected frame should treat it as + a stream error or connection error with type STREAM_CLOSED, depending + on the specific frame. The error handling is done at a higher level: + this just raises the appropriate error. + """ + raise StreamClosedError(self.stream_id) + + def send_on_closed_stream(self, previous_state: StreamState) -> None: + """ + Called when an attempt is made to send data on an already-closed + stream. + + This essentially overrides the standard logic by throwing a + more-specific error: StreamClosedError. This is a ProtocolError, so it + matches the standard API of the state machine, but provides more detail + to the user. + """ + raise StreamClosedError(self.stream_id) + + def recv_push_on_closed_stream(self, previous_state: StreamState) -> None: + """ + Called when a PUSH_PROMISE frame is received on a full stop + stream. + + If the stream was closed by us sending a RST_STREAM frame, then we + presume that the PUSH_PROMISE was in flight when we reset the parent + stream. Rathen than accept the new stream, we just reset it. + Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a + naturally closed stream is a real problem because it creates a brand + new stream that the remote peer now believes exists. + """ + assert self.stream_closed_by is not None + + if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM: + raise StreamClosedError(self.stream_id) + msg = "Attempted to push on closed stream." + raise ProtocolError(msg) + + def send_push_on_closed_stream(self, previous_state: StreamState) -> None: + """ + Called when an attempt is made to push on an already-closed stream. + + This essentially overrides the standard logic by providing a more + useful error message. It's necessary because simply indicating that the + stream is closed is not enough: there is now a new stream that is not + allowed to be there. The only recourse is to tear the whole connection + down. + """ + msg = "Attempted to push on closed stream." + raise ProtocolError(msg) + + def send_informational_response(self, previous_state: StreamState) -> list[Event]: + """ + Called when an informational header block is sent (that is, a block + where the :status header has a 1XX value). + + Only enforces that these are sent *before* final headers are sent. + """ + if self.headers_sent: + msg = "Information response after final response" + raise ProtocolError(msg) + + event = _ResponseSent() + return [event] + + def recv_informational_response(self, previous_state: StreamState) -> list[Event]: + """ + Called when an informational header block is received (that is, a block + where the :status header has a 1XX value). + """ + if self.headers_received: + msg = "Informational response after final response" + raise ProtocolError(msg) + + event = InformationalResponseReceived() + event.stream_id = self.stream_id + return [event] + + def recv_alt_svc(self, previous_state: StreamState) -> list[Event]: + """ + Called when receiving an ALTSVC frame. + + RFC 7838 allows us to receive ALTSVC frames at any stream state, which + is really absurdly overzealous. For that reason, we want to limit the + states in which we can actually receive it. It's really only sensible + to receive it after we've sent our own headers and before the server + has sent its header block: the server can't guarantee that we have any + state around after it completes its header block, and the server + doesn't know what origin we're talking about before we've sent ours. + + For that reason, this function applies a few extra checks on both state + and some of the little state variables we keep around. If those suggest + an unreasonable situation for the ALTSVC frame to have been sent in, + we quietly ignore it (as RFC 7838 suggests). + + This function is also *not* always called by the state machine. In some + states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it, + because we know the frame cannot be valid in that state (IDLE because + the server cannot know what origin the stream applies to, CLOSED + because the server cannot assume we still have state around, + RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL + state then *we* are the server). + """ + # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore + # them. + if self.client is False: + return [] + + # If we've received the response headers from the server they can't + # guarantee we still have any state around. Other implementations + # (like nghttp2) ignore ALTSVC in this state, so we will too. + if self.headers_received: + return [] + + # Otherwise, this is a sensible enough frame to have received. Return + # the event and let it get populated. + return [AlternativeServiceAvailable()] + + def send_alt_svc(self, previous_state: StreamState) -> None: + """ + Called when sending an ALTSVC frame on this stream. + + For consistency with the restrictions we apply on receiving ALTSVC + frames in ``recv_alt_svc``, we want to restrict when users can send + ALTSVC frames to the situations when we ourselves would accept them. + + That means: when we are a server, when we have received the request + headers, and when we have not yet sent our own response headers. + """ + # We should not send ALTSVC after we've sent response headers, as the + # client may have disposed of its state. + if self.headers_sent: + msg = "Cannot send ALTSVC after sending response headers." + raise ProtocolError(msg) + + + +# STATE MACHINE +# +# The stream state machine is defined here to avoid the need to allocate it +# repeatedly for each stream. It cannot be defined in the stream class because +# it needs to be able to reference the callbacks defined on the class, but +# because Python's scoping rules are weird the class object is not actually in +# scope during the body of the class object. +# +# For the sake of clarity, we reproduce the RFC 7540 state machine here: +# +# +--------+ +# send PP | | recv PP +# ,--------| idle |--------. +# / | | \ +# v +--------+ v +# +----------+ | +----------+ +# | | | send H / | | +# ,------| reserved | | recv H | reserved |------. +# | | (local) | | | (remote) | | +# | +----------+ v +----------+ | +# | | +--------+ | | +# | | recv ES | | send ES | | +# | send H | ,-------| open |-------. | recv H | +# | | / | | \ | | +# | v v +--------+ v v | +# | +----------+ | +----------+ | +# | | half | | | half | | +# | | closed | | send R / | closed | | +# | | (remote) | | recv R | (local) | | +# | +----------+ | +----------+ | +# | | | | | +# | | send ES / | recv ES / | | +# | | send R / v send R / | | +# | | recv R +--------+ recv R | | +# | send R / `----------->| |<-----------' send R / | +# | recv R | closed | recv R | +# `----------------------->| |<----------------------' +# +--------+ +# +# send: endpoint sends this frame +# recv: endpoint receives this frame +# +# H: HEADERS frame (with implied CONTINUATIONs) +# PP: PUSH_PROMISE frame (with implied CONTINUATIONs) +# ES: END_STREAM flag +# R: RST_STREAM frame +# +# For the purposes of this state machine we treat HEADERS and their +# associated CONTINUATION frames as a single jumbo frame. The protocol +# allows/requires this by preventing other frames from being interleved in +# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is +# received without a prior HEADERS frame, it *will* be passed to this state +# machine. The state machine should always reject that frame, either as an +# invalid transition or because the stream is closed. +# +# There is a confusing relationship around PUSH_PROMISE frames. The state +# machine above considers them to be frames belonging to the new stream, +# which is *somewhat* true. However, they are sent with the stream ID of +# their related stream, and are only sendable in some cases. +# For this reason, our state machine implementation below allows for +# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also +# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states. +# Essentially, for h2, PUSH_PROMISE frames are effectively sent on +# two streams. +# +# The _transitions dictionary contains a mapping of tuples of +# (state, input) to tuples of (side_effect_function, end_state). This +# map contains all allowed transitions: anything not in this map is +# invalid and immediately causes a transition to ``closed``. +_transitions = { + # State: idle + (StreamState.IDLE, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.request_sent, StreamState.OPEN), + (StreamState.IDLE, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.request_received, StreamState.OPEN), + (StreamState.IDLE, StreamInputs.RECV_DATA): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE): + (H2StreamStateMachine.send_new_pushed_stream, + StreamState.RESERVED_LOCAL), + (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.recv_new_pushed_stream, + StreamState.RESERVED_REMOTE), + (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (None, StreamState.IDLE), + (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT): + (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL), + (StreamState.IDLE, StreamInputs.UPGRADE_SERVER): + (H2StreamStateMachine.request_received, + StreamState.HALF_CLOSED_REMOTE), + + # State: reserved local + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.RESERVED_LOCAL), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL), + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL), + (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (None, StreamState.RESERVED_LOCAL), + + # State: reserved remote + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.response_received, + StreamState.HALF_CLOSED_LOCAL), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.RESERVED_REMOTE), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE), + (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE), + + # State: open + (StreamState.OPEN, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.response_sent, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.response_received, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_DATA): + (None, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_DATA): + (H2StreamStateMachine.data_received, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_END_STREAM): + (None, StreamState.HALF_CLOSED_LOCAL), + (StreamState.OPEN, StreamInputs.RECV_END_STREAM): + (H2StreamStateMachine.stream_half_closed, + StreamState.HALF_CLOSED_REMOTE), + (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.OPEN, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE): + (H2StreamStateMachine.send_push_promise, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.recv_push_promise, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS): + (H2StreamStateMachine.send_informational_response, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS): + (H2StreamStateMachine.recv_informational_response, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.send_alt_svc, StreamState.OPEN), + (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN), + + # State: half-closed remote + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA): + (None, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM): + (H2StreamStateMachine.send_end_stream, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE): + (H2StreamStateMachine.send_push_promise, + StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS): + (H2StreamStateMachine.send_informational_response, + StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE), + (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE), + + # State: half-closed local + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.response_received, + StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA): + (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM): + (H2StreamStateMachine.stream_ended, StreamState.CLOSED), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE): + (None, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE): + (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM): + (H2StreamStateMachine.stream_reset, StreamState.CLOSED), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.recv_push_promise, + StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS): + (H2StreamStateMachine.recv_informational_response, + StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL), + (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL), + + # State: closed + (StreamState.CLOSED, StreamInputs.RECV_END_STREAM): + (None, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE): + (None, StreamState.CLOSED), + + # RFC 7540 Section 5.1 defines how the end point should react when + # receiving a frame on a closed stream with the following statements: + # + # > An endpoint that receives any frame other than PRIORITY after receiving + # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED. + # > An endpoint that receives any frames after receiving a frame with the + # > END_STREAM flag set MUST treat that as a connection error of type + # > STREAM_CLOSED. + (StreamState.CLOSED, StreamInputs.RECV_HEADERS): + (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.RECV_DATA): + (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED), + + # > WINDOW_UPDATE or RST_STREAM frames can be received in this state + # > for a short period after a DATA or HEADERS frame containing a + # > END_STREAM flag is sent, as instructed in RFC 7540 Section 5.1. But we + # > don't have access to a clock so we just always allow it. + (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE): + (None, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM): + (None, StreamState.CLOSED), + + # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is + # > neither "open" nor "half-closed (local)" as a connection error of type + # > PROTOCOL_ERROR. + (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE): + (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED), + + # Also, users should be forbidden from sending on closed streams. + (StreamState.CLOSED, StreamInputs.SEND_HEADERS): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE): + (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_DATA): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), + (StreamState.CLOSED, StreamInputs.SEND_END_STREAM): + (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED), +} + + +class H2Stream: + """ + A low-level HTTP/2 stream object. This handles building and receiving + frames and maintains per-stream state. + + This wraps a HTTP/2 Stream state machine implementation, ensuring that + frames can only be sent/received when the stream is in a valid state. + Attempts to create frames that cannot be sent will raise a + ``ProtocolError``. + """ + + def __init__(self, + stream_id: int, + config: H2Configuration, + inbound_window_size: int, + outbound_window_size: int) -> None: + self.state_machine = H2StreamStateMachine(stream_id) + self.stream_id = stream_id + self.max_outbound_frame_size: int | None = None + self.request_method: bytes | None = None + + # The current value of the outbound stream flow control window + self.outbound_flow_control_window = outbound_window_size + + # The flow control manager. + self._inbound_window_manager = WindowManager(inbound_window_size) + + # The expected content length, if any. + self._expected_content_length: int | None = None + + # The actual received content length. Always tracked. + self._actual_content_length = 0 + + # The authority we believe this stream belongs to. + self._authority: bytes | None = None + + # The configuration for this stream. + self.config = config + + def __repr__(self) -> str: + return f"<{type(self).__name__} id:{self.stream_id} state:{self.state_machine.state!r}>" + + @property + def inbound_flow_control_window(self) -> int: + """ + The size of the inbound flow control window for the stream. This is + rarely publicly useful: instead, use :meth:`remote_flow_control_window +