aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/h2/stream.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/h2/stream.py')
-rw-r--r--.venv/lib/python3.12/site-packages/h2/stream.py1417
1 files changed, 1417 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/h2/stream.py b/.venv/lib/python3.12/site-packages/h2/stream.py
new file mode 100644
index 00000000..7d4a12e3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/h2/stream.py
@@ -0,0 +1,1417 @@
+"""
+h2/stream
+~~~~~~~~~
+
+An implementation of a HTTP/2 stream.
+"""
+from __future__ import annotations
+
+from enum import Enum, IntEnum
+from typing import TYPE_CHECKING, Any
+
+from hpack import HeaderTuple
+from hyperframe.frame import AltSvcFrame, ContinuationFrame, DataFrame, Frame, HeadersFrame, PushPromiseFrame, RstStreamFrame, WindowUpdateFrame
+
+from .errors import ErrorCodes, _error_code_from_int
+from .events import (
+ AlternativeServiceAvailable,
+ DataReceived,
+ Event,
+ InformationalResponseReceived,
+ PushedStreamReceived,
+ RequestReceived,
+ ResponseReceived,
+ StreamEnded,
+ StreamReset,
+ TrailersReceived,
+ WindowUpdated,
+ _PushedRequestSent,
+ _RequestSent,
+ _ResponseSent,
+ _TrailersSent,
+)
+from .exceptions import FlowControlError, InvalidBodyLengthError, ProtocolError, StreamClosedError
+from .utilities import (
+ HeaderValidationFlags,
+ authority_from_headers,
+ extract_method_header,
+ guard_increment_window,
+ is_informational_response,
+ normalize_inbound_headers,
+ normalize_outbound_headers,
+ utf8_encode_headers,
+ validate_headers,
+ validate_outbound_headers,
+)
+from .windows import WindowManager
+
+if TYPE_CHECKING: # pragma: no cover
+ from collections.abc import Generator, Iterable
+
+ from hpack.hpack import Encoder
+ from hpack.struct import Header, HeaderWeaklyTyped
+
+ from .config import H2Configuration
+
+
+class StreamState(IntEnum):
+ IDLE = 0
+ RESERVED_REMOTE = 1
+ RESERVED_LOCAL = 2
+ OPEN = 3
+ HALF_CLOSED_REMOTE = 4
+ HALF_CLOSED_LOCAL = 5
+ CLOSED = 6
+
+
+class StreamInputs(Enum):
+ SEND_HEADERS = 0
+ SEND_PUSH_PROMISE = 1
+ SEND_RST_STREAM = 2
+ SEND_DATA = 3
+ SEND_WINDOW_UPDATE = 4
+ SEND_END_STREAM = 5
+ RECV_HEADERS = 6
+ RECV_PUSH_PROMISE = 7
+ RECV_RST_STREAM = 8
+ RECV_DATA = 9
+ RECV_WINDOW_UPDATE = 10
+ RECV_END_STREAM = 11
+ RECV_CONTINUATION = 12 # Added in 2.0.0
+ SEND_INFORMATIONAL_HEADERS = 13 # Added in 2.2.0
+ RECV_INFORMATIONAL_HEADERS = 14 # Added in 2.2.0
+ SEND_ALTERNATIVE_SERVICE = 15 # Added in 2.3.0
+ RECV_ALTERNATIVE_SERVICE = 16 # Added in 2.3.0
+ UPGRADE_CLIENT = 17 # Added 2.3.0
+ UPGRADE_SERVER = 18 # Added 2.3.0
+
+
+class StreamClosedBy(Enum):
+ SEND_END_STREAM = 0
+ RECV_END_STREAM = 1
+ SEND_RST_STREAM = 2
+ RECV_RST_STREAM = 3
+
+
+# This array is initialized once, and is indexed by the stream states above.
+# It indicates whether a stream in the given state is open. The reason we do
+# this is that we potentially check whether a stream in a given state is open
+# quite frequently: given that we check so often, we should do so in the
+# fastest and most performant way possible.
+STREAM_OPEN = [False for _ in range(len(StreamState))]
+STREAM_OPEN[StreamState.OPEN] = True
+STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True
+STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True
+
+
+class H2StreamStateMachine:
+ """
+ A single HTTP/2 stream state machine.
+
+ This stream object implements basically the state machine described in
+ RFC 7540 section 5.1.
+
+ :param stream_id: The stream ID of this stream. This is stored primarily
+ for logging purposes.
+ """
+
+ def __init__(self, stream_id: int) -> None:
+ self.state = StreamState.IDLE
+ self.stream_id = stream_id
+
+ #: Whether this peer is the client side of this stream.
+ self.client: bool | None = None
+
+ # Whether trailers have been sent/received on this stream or not.
+ self.headers_sent: bool | None = None
+ self.trailers_sent: bool | None = None
+ self.headers_received: bool | None = None
+ self.trailers_received: bool | None = None
+
+ # How the stream was closed. One of StreamClosedBy.
+ self.stream_closed_by: StreamClosedBy | None = None
+
+ def process_input(self, input_: StreamInputs) -> Any:
+ """
+ Process a specific input in the state machine.
+ """
+ if not isinstance(input_, StreamInputs):
+ msg = "Input must be an instance of StreamInputs"
+ raise ValueError(msg) # noqa: TRY004
+
+ try:
+ func, target_state = _transitions[(self.state, input_)]
+ except KeyError as err:
+ old_state = self.state
+ self.state = StreamState.CLOSED
+ msg = f"Invalid input {input_} in state {old_state}"
+ raise ProtocolError(msg) from err
+ else:
+ previous_state = self.state
+ self.state = target_state
+ if func is not None:
+ try:
+ return func(self, previous_state)
+ except ProtocolError:
+ self.state = StreamState.CLOSED
+ raise
+ except AssertionError as err: # pragma: no cover
+ self.state = StreamState.CLOSED
+ raise ProtocolError(err) from err
+
+ return []
+
+ def request_sent(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fires when a request is sent.
+ """
+ self.client = True
+ self.headers_sent = True
+ event = _RequestSent()
+
+ return [event]
+
+ def response_sent(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fires when something that should be a response is sent. This 'response'
+ may actually be trailers.
+ """
+ if not self.headers_sent:
+ if self.client is True or self.client is None:
+ msg = "Client cannot send responses."
+ raise ProtocolError(msg)
+ self.headers_sent = True
+ return [_ResponseSent()]
+ assert not self.trailers_sent
+ self.trailers_sent = True
+ return [_TrailersSent()]
+
+ def request_received(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fires when a request is received.
+ """
+ assert not self.headers_received
+ assert not self.trailers_received
+
+ self.client = False
+ self.headers_received = True
+ event = RequestReceived()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def response_received(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fires when a response is received. Also disambiguates between responses
+ and trailers.
+ """
+ event: ResponseReceived | TrailersReceived
+ if not self.headers_received:
+ assert self.client is True
+ self.headers_received = True
+ event = ResponseReceived()
+ else:
+ assert not self.trailers_received
+ self.trailers_received = True
+ event = TrailersReceived()
+
+ event.stream_id = self.stream_id
+ return [event]
+
+ def data_received(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fires when data is received.
+ """
+ if not self.headers_received:
+ msg = "cannot receive data before headers"
+ raise ProtocolError(msg)
+ event = DataReceived()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def window_updated(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fires when a window update frame is received.
+ """
+ event = WindowUpdated()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def stream_half_closed(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fires when an END_STREAM flag is received in the OPEN state,
+ transitioning this stream to a HALF_CLOSED_REMOTE state.
+ """
+ event = StreamEnded()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def stream_ended(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fires when a stream is cleanly ended.
+ """
+ self.stream_closed_by = StreamClosedBy.RECV_END_STREAM
+ event = StreamEnded()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def stream_reset(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fired when a stream is forcefully reset.
+ """
+ self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM
+ event = StreamReset()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def send_new_pushed_stream(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fires on the newly pushed stream, when pushed by the local peer.
+
+ No event here, but definitionally this peer must be a server.
+ """
+ assert self.client is None
+ self.client = False
+ self.headers_received = True
+ return []
+
+ def recv_new_pushed_stream(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fires on the newly pushed stream, when pushed by the remote peer.
+
+ No event here, but definitionally this peer must be a client.
+ """
+ assert self.client is None
+ self.client = True
+ self.headers_sent = True
+ return []
+
+ def send_push_promise(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fires on the already-existing stream when a PUSH_PROMISE frame is sent.
+ We may only send PUSH_PROMISE frames if we're a server.
+ """
+ if self.client is True:
+ msg = "Cannot push streams from client peers."
+ raise ProtocolError(msg)
+
+ event = _PushedRequestSent()
+ return [event]
+
+ def recv_push_promise(self, previous_state: StreamState) -> list[Event]:
+ """
+ Fires on the already-existing stream when a PUSH_PROMISE frame is
+ received. We may only receive PUSH_PROMISE frames if we're a client.
+
+ Fires a PushedStreamReceived event.
+ """
+ if not self.client:
+ if self.client is None: # pragma: no cover
+ msg = "Idle streams cannot receive pushes"
+ else: # pragma: no cover
+ msg = "Cannot receive pushed streams as a server"
+ raise ProtocolError(msg)
+
+ event = PushedStreamReceived()
+ event.parent_stream_id = self.stream_id
+ return [event]
+
+ def send_end_stream(self, previous_state: StreamState) -> None:
+ """
+ Called when an attempt is made to send END_STREAM in the
+ HALF_CLOSED_REMOTE state.
+ """
+ self.stream_closed_by = StreamClosedBy.SEND_END_STREAM
+
+ def send_reset_stream(self, previous_state: StreamState) -> None:
+ """
+ Called when an attempt is made to send RST_STREAM in a non-closed
+ stream state.
+ """
+ self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
+
+ def reset_stream_on_error(self, previous_state: StreamState) -> None:
+ """
+ Called when we need to forcefully emit another RST_STREAM frame on
+ behalf of the state machine.
+
+ If this is the first time we've done this, we should also hang an event
+ off the StreamClosedError so that the user can be informed. We know
+ it's the first time we've done this if the stream is currently in a
+ state other than CLOSED.
+ """
+ self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
+
+ error = StreamClosedError(self.stream_id)
+
+ event = StreamReset()
+ event.stream_id = self.stream_id
+ event.error_code = ErrorCodes.STREAM_CLOSED
+ event.remote_reset = False
+ error._events = [event]
+ raise error
+
+ def recv_on_closed_stream(self, previous_state: StreamState) -> None:
+ """
+ Called when an unexpected frame is received on an already-closed
+ stream.
+
+ An endpoint that receives an unexpected frame should treat it as
+ a stream error or connection error with type STREAM_CLOSED, depending
+ on the specific frame. The error handling is done at a higher level:
+ this just raises the appropriate error.
+ """
+ raise StreamClosedError(self.stream_id)
+
+ def send_on_closed_stream(self, previous_state: StreamState) -> None:
+ """
+ Called when an attempt is made to send data on an already-closed
+ stream.
+
+ This essentially overrides the standard logic by throwing a
+ more-specific error: StreamClosedError. This is a ProtocolError, so it
+ matches the standard API of the state machine, but provides more detail
+ to the user.
+ """
+ raise StreamClosedError(self.stream_id)
+
+ def recv_push_on_closed_stream(self, previous_state: StreamState) -> None:
+ """
+ Called when a PUSH_PROMISE frame is received on a full stop
+ stream.
+
+ If the stream was closed by us sending a RST_STREAM frame, then we
+ presume that the PUSH_PROMISE was in flight when we reset the parent
+ stream. Rathen than accept the new stream, we just reset it.
+ Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a
+ naturally closed stream is a real problem because it creates a brand
+ new stream that the remote peer now believes exists.
+ """
+ assert self.stream_closed_by is not None
+
+ if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM:
+ raise StreamClosedError(self.stream_id)
+ msg = "Attempted to push on closed stream."
+ raise ProtocolError(msg)
+
+ def send_push_on_closed_stream(self, previous_state: StreamState) -> None:
+ """
+ Called when an attempt is made to push on an already-closed stream.
+
+ This essentially overrides the standard logic by providing a more
+ useful error message. It's necessary because simply indicating that the
+ stream is closed is not enough: there is now a new stream that is not
+ allowed to be there. The only recourse is to tear the whole connection
+ down.
+ """
+ msg = "Attempted to push on closed stream."
+ raise ProtocolError(msg)
+
+ def send_informational_response(self, previous_state: StreamState) -> list[Event]:
+ """
+ Called when an informational header block is sent (that is, a block
+ where the :status header has a 1XX value).
+
+ Only enforces that these are sent *before* final headers are sent.
+ """
+ if self.headers_sent:
+ msg = "Information response after final response"
+ raise ProtocolError(msg)
+
+ event = _ResponseSent()
+ return [event]
+
+ def recv_informational_response(self, previous_state: StreamState) -> list[Event]:
+ """
+ Called when an informational header block is received (that is, a block
+ where the :status header has a 1XX value).
+ """
+ if self.headers_received:
+ msg = "Informational response after final response"
+ raise ProtocolError(msg)
+
+ event = InformationalResponseReceived()
+ event.stream_id = self.stream_id
+ return [event]
+
+ def recv_alt_svc(self, previous_state: StreamState) -> list[Event]:
+ """
+ Called when receiving an ALTSVC frame.
+
+ RFC 7838 allows us to receive ALTSVC frames at any stream state, which
+ is really absurdly overzealous. For that reason, we want to limit the
+ states in which we can actually receive it. It's really only sensible
+ to receive it after we've sent our own headers and before the server
+ has sent its header block: the server can't guarantee that we have any
+ state around after it completes its header block, and the server
+ doesn't know what origin we're talking about before we've sent ours.
+
+ For that reason, this function applies a few extra checks on both state
+ and some of the little state variables we keep around. If those suggest
+ an unreasonable situation for the ALTSVC frame to have been sent in,
+ we quietly ignore it (as RFC 7838 suggests).
+
+ This function is also *not* always called by the state machine. In some
+ states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it,
+ because we know the frame cannot be valid in that state (IDLE because
+ the server cannot know what origin the stream applies to, CLOSED
+ because the server cannot assume we still have state around,
+ RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL
+ state then *we* are the server).
+ """
+ # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore
+ # them.
+ if self.client is False:
+ return []
+
+ # If we've received the response headers from the server they can't
+ # guarantee we still have any state around. Other implementations
+ # (like nghttp2) ignore ALTSVC in this state, so we will too.
+ if self.headers_received:
+ return []
+
+ # Otherwise, this is a sensible enough frame to have received. Return
+ # the event and let it get populated.
+ return [AlternativeServiceAvailable()]
+
+ def send_alt_svc(self, previous_state: StreamState) -> None:
+ """
+ Called when sending an ALTSVC frame on this stream.
+
+ For consistency with the restrictions we apply on receiving ALTSVC
+ frames in ``recv_alt_svc``, we want to restrict when users can send
+ ALTSVC frames to the situations when we ourselves would accept them.
+
+ That means: when we are a server, when we have received the request
+ headers, and when we have not yet sent our own response headers.
+ """
+ # We should not send ALTSVC after we've sent response headers, as the
+ # client may have disposed of its state.
+ if self.headers_sent:
+ msg = "Cannot send ALTSVC after sending response headers."
+ raise ProtocolError(msg)
+
+
+
+# STATE MACHINE
+#
+# The stream state machine is defined here to avoid the need to allocate it
+# repeatedly for each stream. It cannot be defined in the stream class because
+# it needs to be able to reference the callbacks defined on the class, but
+# because Python's scoping rules are weird the class object is not actually in
+# scope during the body of the class object.
+#
+# For the sake of clarity, we reproduce the RFC 7540 state machine here:
+#
+# +--------+
+# send PP | | recv PP
+# ,--------| idle |--------.
+# / | | \
+# v +--------+ v
+# +----------+ | +----------+
+# | | | send H / | |
+# ,------| reserved | | recv H | reserved |------.
+# | | (local) | | | (remote) | |
+# | +----------+ v +----------+ |
+# | | +--------+ | |
+# | | recv ES | | send ES | |
+# | send H | ,-------| open |-------. | recv H |
+# | | / | | \ | |
+# | v v +--------+ v v |
+# | +----------+ | +----------+ |
+# | | half | | | half | |
+# | | closed | | send R / | closed | |
+# | | (remote) | | recv R | (local) | |
+# | +----------+ | +----------+ |
+# | | | | |
+# | | send ES / | recv ES / | |
+# | | send R / v send R / | |
+# | | recv R +--------+ recv R | |
+# | send R / `----------->| |<-----------' send R / |
+# | recv R | closed | recv R |
+# `----------------------->| |<----------------------'
+# +--------+
+#
+# send: endpoint sends this frame
+# recv: endpoint receives this frame
+#
+# H: HEADERS frame (with implied CONTINUATIONs)
+# PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
+# ES: END_STREAM flag
+# R: RST_STREAM frame
+#
+# For the purposes of this state machine we treat HEADERS and their
+# associated CONTINUATION frames as a single jumbo frame. The protocol
+# allows/requires this by preventing other frames from being interleved in
+# between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is
+# received without a prior HEADERS frame, it *will* be passed to this state
+# machine. The state machine should always reject that frame, either as an
+# invalid transition or because the stream is closed.
+#
+# There is a confusing relationship around PUSH_PROMISE frames. The state
+# machine above considers them to be frames belonging to the new stream,
+# which is *somewhat* true. However, they are sent with the stream ID of
+# their related stream, and are only sendable in some cases.
+# For this reason, our state machine implementation below allows for
+# PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also
+# in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states.
+# Essentially, for h2, PUSH_PROMISE frames are effectively sent on
+# two streams.
+#
+# The _transitions dictionary contains a mapping of tuples of
+# (state, input) to tuples of (side_effect_function, end_state). This
+# map contains all allowed transitions: anything not in this map is
+# invalid and immediately causes a transition to ``closed``.
+_transitions = {
+ # State: idle
+ (StreamState.IDLE, StreamInputs.SEND_HEADERS):
+ (H2StreamStateMachine.request_sent, StreamState.OPEN),
+ (StreamState.IDLE, StreamInputs.RECV_HEADERS):
+ (H2StreamStateMachine.request_received, StreamState.OPEN),
+ (StreamState.IDLE, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
+ (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE):
+ (H2StreamStateMachine.send_new_pushed_stream,
+ StreamState.RESERVED_LOCAL),
+ (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE):
+ (H2StreamStateMachine.recv_new_pushed_stream,
+ StreamState.RESERVED_REMOTE),
+ (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (None, StreamState.IDLE),
+ (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT):
+ (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.IDLE, StreamInputs.UPGRADE_SERVER):
+ (H2StreamStateMachine.request_received,
+ StreamState.HALF_CLOSED_REMOTE),
+
+ # State: reserved local
+ (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS):
+ (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
+ (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
+ (None, StreamState.RESERVED_LOCAL),
+ (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
+ (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL),
+ (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM):
+ (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
+ (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM):
+ (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
+ (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL),
+ (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (None, StreamState.RESERVED_LOCAL),
+
+ # State: reserved remote
+ (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS):
+ (H2StreamStateMachine.response_received,
+ StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
+ (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
+ (None, StreamState.RESERVED_REMOTE),
+ (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
+ (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE),
+ (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM):
+ (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
+ (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM):
+ (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
+ (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE),
+
+ # State: open
+ (StreamState.OPEN, StreamInputs.SEND_HEADERS):
+ (H2StreamStateMachine.response_sent, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.RECV_HEADERS):
+ (H2StreamStateMachine.response_received, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.SEND_DATA):
+ (None, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.data_received, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.SEND_END_STREAM):
+ (None, StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.OPEN, StreamInputs.RECV_END_STREAM):
+ (H2StreamStateMachine.stream_half_closed,
+ StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE):
+ (None, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE):
+ (H2StreamStateMachine.window_updated, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.SEND_RST_STREAM):
+ (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
+ (StreamState.OPEN, StreamInputs.RECV_RST_STREAM):
+ (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
+ (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE):
+ (H2StreamStateMachine.send_push_promise, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE):
+ (H2StreamStateMachine.recv_push_promise, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS):
+ (H2StreamStateMachine.send_informational_response, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS):
+ (H2StreamStateMachine.recv_informational_response, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.send_alt_svc, StreamState.OPEN),
+ (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN),
+
+ # State: half-closed remote
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS):
+ (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS):
+ (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA):
+ (None, StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM):
+ (H2StreamStateMachine.send_end_stream, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
+ (None, StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
+ (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM):
+ (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM):
+ (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE):
+ (H2StreamStateMachine.send_push_promise,
+ StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE):
+ (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS):
+ (H2StreamStateMachine.send_informational_response,
+ StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE),
+ (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE),
+
+ # State: half-closed local
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS):
+ (H2StreamStateMachine.response_received,
+ StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM):
+ (H2StreamStateMachine.stream_ended, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
+ (None, StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
+ (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM):
+ (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM):
+ (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE):
+ (H2StreamStateMachine.recv_push_promise,
+ StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS):
+ (H2StreamStateMachine.recv_informational_response,
+ StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL),
+ (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL),
+
+ # State: closed
+ (StreamState.CLOSED, StreamInputs.RECV_END_STREAM):
+ (None, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE):
+ (None, StreamState.CLOSED),
+
+ # RFC 7540 Section 5.1 defines how the end point should react when
+ # receiving a frame on a closed stream with the following statements:
+ #
+ # > An endpoint that receives any frame other than PRIORITY after receiving
+ # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED.
+ # > An endpoint that receives any frames after receiving a frame with the
+ # > END_STREAM flag set MUST treat that as a connection error of type
+ # > STREAM_CLOSED.
+ (StreamState.CLOSED, StreamInputs.RECV_HEADERS):
+ (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.RECV_DATA):
+ (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
+
+ # > WINDOW_UPDATE or RST_STREAM frames can be received in this state
+ # > for a short period after a DATA or HEADERS frame containing a
+ # > END_STREAM flag is sent, as instructed in RFC 7540 Section 5.1. But we
+ # > don't have access to a clock so we just always allow it.
+ (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE):
+ (None, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM):
+ (None, StreamState.CLOSED),
+
+ # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is
+ # > neither "open" nor "half-closed (local)" as a connection error of type
+ # > PROTOCOL_ERROR.
+ (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE):
+ (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED),
+
+ # Also, users should be forbidden from sending on closed streams.
+ (StreamState.CLOSED, StreamInputs.SEND_HEADERS):
+ (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE):
+ (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM):
+ (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.SEND_DATA):
+ (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE):
+ (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
+ (StreamState.CLOSED, StreamInputs.SEND_END_STREAM):
+ (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
+}
+
+
+class H2Stream:
+ """
+ A low-level HTTP/2 stream object. This handles building and receiving
+ frames and maintains per-stream state.
+
+ This wraps a HTTP/2 Stream state machine implementation, ensuring that
+ frames can only be sent/received when the stream is in a valid state.
+ Attempts to create frames that cannot be sent will raise a
+ ``ProtocolError``.
+ """
+
+ def __init__(self,
+ stream_id: int,
+ config: H2Configuration,
+ inbound_window_size: int,
+ outbound_window_size: int) -> None:
+ self.state_machine = H2StreamStateMachine(stream_id)
+ self.stream_id = stream_id
+ self.max_outbound_frame_size: int | None = None
+ self.request_method: bytes | None = None
+
+ # The current value of the outbound stream flow control window
+ self.outbound_flow_control_window = outbound_window_size
+
+ # The flow control manager.
+ self._inbound_window_manager = WindowManager(inbound_window_size)
+
+ # The expected content length, if any.
+ self._expected_content_length: int | None = None
+
+ # The actual received content length. Always tracked.
+ self._actual_content_length = 0
+
+ # The authority we believe this stream belongs to.
+ self._authority: bytes | None = None
+
+ # The configuration for this stream.
+ self.config = config
+
+ def __repr__(self) -> str:
+ return f"<{type(self).__name__} id:{self.stream_id} state:{self.state_machine.state!r}>"
+
+ @property
+ def inbound_flow_control_window(self) -> int:
+ """
+ The size of the inbound flow control window for the stream. This is
+ rarely publicly useful: instead, use :meth:`remote_flow_control_window
+ <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is
+ largely present to provide a shortcut to this data.
+ """
+ return self._inbound_window_manager.current_window_size
+
+ @property
+ def open(self) -> bool:
+ """
+ Whether the stream is 'open' in any sense: that is, whether it counts
+ against the number of concurrent streams.
+ """
+ # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either
+ # the OPEN state or either of the HALF_CLOSED states. Perplexingly,
+ # this excludes the reserved states.
+ # For more detail on why we're doing this in this slightly weird way,
+ # see the comment on ``STREAM_OPEN`` at the top of the file.
+ return STREAM_OPEN[self.state_machine.state]
+
+ @property
+ def closed(self) -> bool:
+ """
+ Whether the stream is closed.
+ """
+ return self.state_machine.state == StreamState.CLOSED
+
+ @property
+ def closed_by(self) -> StreamClosedBy | None:
+ """
+ Returns how the stream was closed, as one of StreamClosedBy.
+ """
+ return self.state_machine.stream_closed_by
+
+ def upgrade(self, client_side: bool) -> None:
+ """
+ Called by the connection to indicate that this stream is the initial
+ request/response of an upgraded connection. Places the stream into an
+ appropriate state.
+ """
+ self.config.logger.debug("Upgrading %r", self)
+
+ assert self.stream_id == 1
+ input_ = (
+ StreamInputs.UPGRADE_CLIENT if client_side
+ else StreamInputs.UPGRADE_SERVER
+ )
+
+ # This may return events, we deliberately don't want them.
+ self.state_machine.process_input(input_)
+
+ def send_headers(self,
+ headers: Iterable[HeaderWeaklyTyped],
+ encoder: Encoder,
+ end_stream: bool = False) -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]:
+ """
+ Returns a list of HEADERS/CONTINUATION frames to emit as either headers
+ or trailers.
+ """
+ self.config.logger.debug("Send headers %s on %r", headers, self)
+
+ # Because encoding headers makes an irreversible change to the header
+ # compression context, we make the state transition before we encode
+ # them.
+
+ # First, check if we're a client. If we are, no problem: if we aren't,
+ # we need to scan the header block to see if this is an informational
+ # response.
+ input_ = StreamInputs.SEND_HEADERS
+
+ bytes_headers = utf8_encode_headers(headers)
+
+ if ((not self.state_machine.client) and
+ is_informational_response(bytes_headers)):
+ if end_stream:
+ msg = "Cannot set END_STREAM on informational responses."
+ raise ProtocolError(msg)
+
+ input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS
+
+ events = self.state_machine.process_input(input_)
+
+ hf = HeadersFrame(self.stream_id)
+ hdr_validation_flags = self._build_hdr_validation_flags(events)
+ frames = self._build_headers_frames(
+ bytes_headers, encoder, hf, hdr_validation_flags,
+ )
+
+ if end_stream:
+ # Not a bug: the END_STREAM flag is valid on the initial HEADERS
+ # frame, not the CONTINUATION frames that follow.
+ self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
+ frames[0].flags.add("END_STREAM")
+
+ if self.state_machine.trailers_sent and not end_stream:
+ msg = "Trailers must have END_STREAM set."
+ raise ProtocolError(msg)
+
+ if self.state_machine.client and self._authority is None:
+ self._authority = authority_from_headers(bytes_headers)
+
+ # store request method for _initialize_content_length
+ self.request_method = extract_method_header(bytes_headers)
+
+ return frames
+
+ def push_stream_in_band(self,
+ related_stream_id: int,
+ headers: Iterable[HeaderWeaklyTyped],
+ encoder: Encoder) -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]:
+ """
+ Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed
+ stream header. Called on the stream that has the PUSH_PROMISE frame
+ sent on it.
+ """
+ self.config.logger.debug("Push stream %r", self)
+
+ # Because encoding headers makes an irreversible change to the header
+ # compression context, we make the state transition *first*.
+
+ events = self.state_machine.process_input(
+ StreamInputs.SEND_PUSH_PROMISE,
+ )
+
+ ppf = PushPromiseFrame(self.stream_id)
+ ppf.promised_stream_id = related_stream_id
+ hdr_validation_flags = self._build_hdr_validation_flags(events)
+
+ bytes_headers = utf8_encode_headers(headers)
+
+ return self._build_headers_frames(
+ bytes_headers, encoder, ppf, hdr_validation_flags,
+ )
+
+
+ def locally_pushed(self) -> list[Frame]:
+ """
+ Mark this stream as one that was pushed by this peer. Must be called
+ immediately after initialization. Sends no frames, simply updates the
+ state machine.
+ """
+ # This does not trigger any events.
+ events = self.state_machine.process_input(
+ StreamInputs.SEND_PUSH_PROMISE,
+ )
+ assert not events
+ return []
+
+ def send_data(self,
+ data: bytes | memoryview,
+ end_stream: bool = False,
+ pad_length: int | None = None) -> list[Frame]:
+ """
+ Prepare some data frames. Optionally end the stream.
+
+ .. warning:: Does not perform flow control checks.
+ """
+ self.config.logger.debug(
+ "Send data on %r with end stream set to %s", self, end_stream,
+ )
+
+ self.state_machine.process_input(StreamInputs.SEND_DATA)
+
+ df = DataFrame(self.stream_id)
+ df.data = data
+ if end_stream:
+ self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
+ df.flags.add("END_STREAM")
+ if pad_length is not None:
+ df.flags.add("PADDED")
+ df.pad_length = pad_length
+
+ # Subtract flow_controlled_length to account for possible padding
+ self.outbound_flow_control_window -= df.flow_controlled_length
+ assert self.outbound_flow_control_window >= 0
+
+ return [df]
+
+ def end_stream(self) -> list[Frame]:
+ """
+ End a stream without sending data.
+ """
+ self.config.logger.debug("End stream %r", self)
+
+ self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
+ df = DataFrame(self.stream_id)
+ df.flags.add("END_STREAM")
+ return [df]
+
+ def advertise_alternative_service(self, field_value: bytes) -> list[Frame]:
+ """
+ Advertise an RFC 7838 alternative service. The semantics of this are
+ better documented in the ``H2Connection`` class.
+ """
+ self.config.logger.debug(
+ "Advertise alternative service of %r for %r", field_value, self,
+ )
+ self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE)
+ asf = AltSvcFrame(self.stream_id)
+ asf.field = field_value
+ return [asf]
+
+ def increase_flow_control_window(self, increment: int) -> list[Frame]:
+ """
+ Increase the size of the flow control window for the remote side.
+ """
+ self.config.logger.debug(
+ "Increase flow control window for %r by %d",
+ self, increment,
+ )
+ self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE)
+ self._inbound_window_manager.window_opened(increment)
+
+ wuf = WindowUpdateFrame(self.stream_id)
+ wuf.window_increment = increment
+ return [wuf]
+
+ def receive_push_promise_in_band(self,
+ promised_stream_id: int,
+ headers: Iterable[Header],
+ header_encoding: bool | str | None) -> tuple[list[Frame], list[Event]]:
+ """
+ Receives a push promise frame sent on this stream, pushing a remote
+ stream. This is called on the stream that has the PUSH_PROMISE sent
+ on it.
+ """
+ self.config.logger.debug(
+ "Receive Push Promise on %r for remote stream %d",
+ self, promised_stream_id,
+ )
+ events = self.state_machine.process_input(
+ StreamInputs.RECV_PUSH_PROMISE,
+ )
+ events[0].pushed_stream_id = promised_stream_id
+
+ hdr_validation_flags = self._build_hdr_validation_flags(events)
+ events[0].headers = self._process_received_headers(
+ headers, hdr_validation_flags, header_encoding,
+ )
+ return [], events
+
+ def remotely_pushed(self, pushed_headers: Iterable[Header]) -> tuple[list[Frame], list[Event]]:
+ """
+ Mark this stream as one that was pushed by the remote peer. Must be
+ called immediately after initialization. Sends no frames, simply
+ updates the state machine.
+ """
+ self.config.logger.debug("%r pushed by remote peer", self)
+ events = self.state_machine.process_input(
+ StreamInputs.RECV_PUSH_PROMISE,
+ )
+ self._authority = authority_from_headers(pushed_headers)
+ return [], events
+
+ def receive_headers(self,
+ headers: Iterable[Header],
+ end_stream: bool,
+ header_encoding: bool | str | None) -> tuple[list[Frame], list[Event]]:
+ """
+ Receive a set of headers (or trailers).
+ """
+ if is_informational_response(headers):
+ if end_stream:
+ msg = "Cannot set END_STREAM on informational responses"
+ raise ProtocolError(msg)
+ input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS
+ else:
+ input_ = StreamInputs.RECV_HEADERS
+
+ events = self.state_machine.process_input(input_)
+
+ if end_stream:
+ es_events = self.state_machine.process_input(
+ StreamInputs.RECV_END_STREAM,
+ )
+ events[0].stream_ended = es_events[0]
+ events += es_events
+
+ self._initialize_content_length(headers)
+
+ if isinstance(events[0], TrailersReceived) and not end_stream:
+ msg = "Trailers must have END_STREAM set"
+ raise ProtocolError(msg)
+
+ hdr_validation_flags = self._build_hdr_validation_flags(events)
+ events[0].headers = self._process_received_headers(
+ headers, hdr_validation_flags, header_encoding,
+ )
+ return [], events
+
+ def receive_data(self, data: bytes, end_stream: bool, flow_control_len: int) -> tuple[list[Frame], list[Event]]:
+ """
+ Receive some data.
+ """
+ self.config.logger.debug(
+ "Receive data on %r with end stream %s and flow control length "
+ "set to %d", self, end_stream, flow_control_len,
+ )
+ events = self.state_machine.process_input(StreamInputs.RECV_DATA)
+ self._inbound_window_manager.window_consumed(flow_control_len)
+ self._track_content_length(len(data), end_stream)
+
+ if end_stream:
+ es_events = self.state_machine.process_input(
+ StreamInputs.RECV_END_STREAM,
+ )
+ events[0].stream_ended = es_events[0]
+ events.extend(es_events)
+
+ events[0].data = data
+ events[0].flow_controlled_length = flow_control_len
+ return [], events
+
+ def receive_window_update(self, increment: int) -> tuple[list[Frame], list[Event]]:
+ """
+ Handle a WINDOW_UPDATE increment.
+ """
+ self.config.logger.debug(
+ "Receive Window Update on %r for increment of %d",
+ self, increment,
+ )
+ events = self.state_machine.process_input(
+ StreamInputs.RECV_WINDOW_UPDATE,
+ )
+ frames = []
+
+ # If we encounter a problem with incrementing the flow control window,
+ # this should be treated as a *stream* error, not a *connection* error.
+ # That means we need to catch the error and forcibly close the stream.
+ if events:
+ events[0].delta = increment
+ try:
+ self.outbound_flow_control_window = guard_increment_window(
+ self.outbound_flow_control_window,
+ increment,
+ )
+ except FlowControlError:
+ # Ok, this is bad. We're going to need to perform a local
+ # reset.
+ event = StreamReset()
+ event.stream_id = self.stream_id
+ event.error_code = ErrorCodes.FLOW_CONTROL_ERROR
+ event.remote_reset = False
+
+ events = [event]
+ frames = self.reset_stream(event.error_code)
+
+ return frames, events
+
+ def receive_continuation(self) -> None:
+ """
+ A naked CONTINUATION frame has been received. This is always an error,
+ but the type of error it is depends on the state of the stream and must
+ transition the state of the stream, so we need to handle it.
+ """
+ self.config.logger.debug("Receive Continuation frame on %r", self)
+ self.state_machine.process_input(
+ StreamInputs.RECV_CONTINUATION,
+ )
+ msg = "Should not be reachable" # pragma: no cover
+ raise AssertionError(msg) # pragma: no cover
+
+ def receive_alt_svc(self, frame: AltSvcFrame) -> tuple[list[Frame], list[Event]]:
+ """
+ An Alternative Service frame was received on the stream. This frame
+ inherits the origin associated with this stream.
+ """
+ self.config.logger.debug(
+ "Receive Alternative Service frame on stream %r", self,
+ )
+
+ # If the origin is present, RFC 7838 says we have to ignore it.
+ if frame.origin:
+ return [], []
+
+ events = self.state_machine.process_input(
+ StreamInputs.RECV_ALTERNATIVE_SERVICE,
+ )
+
+ # There are lots of situations where we want to ignore the ALTSVC
+ # frame. If we need to pay attention, we'll have an event and should
+ # fill it out.
+ if events:
+ assert isinstance(events[0], AlternativeServiceAvailable)
+ events[0].origin = self._authority
+ events[0].field_value = frame.field
+
+ return [], events
+
+ def reset_stream(self, error_code: ErrorCodes | int = 0) -> list[Frame]:
+ """
+ Close the stream locally. Reset the stream with an error code.
+ """
+ self.config.logger.debug(
+ "Local reset %r with error code: %d", self, error_code,
+ )
+ self.state_machine.process_input(StreamInputs.SEND_RST_STREAM)
+
+ rsf = RstStreamFrame(self.stream_id)
+ rsf.error_code = error_code
+ return [rsf]
+
+ def stream_reset(self, frame: RstStreamFrame) -> tuple[list[Frame], list[Event]]:
+ """
+ Handle a stream being reset remotely.
+ """
+ self.config.logger.debug(
+ "Remote reset %r with error code: %d", self, frame.error_code,
+ )
+ events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM)
+
+ if events:
+ # We don't fire an event if this stream is already closed.
+ events[0].error_code = _error_code_from_int(frame.error_code)
+
+ return [], events
+
+ def acknowledge_received_data(self, acknowledged_size: int) -> list[Frame]:
+ """
+ The user has informed us that they've processed some amount of data
+ that was received on this stream. Pass that to the window manager and
+ potentially return some WindowUpdate frames.
+ """
+ self.config.logger.debug(
+ "Acknowledge received data with size %d on %r",
+ acknowledged_size, self,
+ )
+ increment = self._inbound_window_manager.process_bytes(
+ acknowledged_size,
+ )
+ if increment:
+ f = WindowUpdateFrame(self.stream_id)
+ f.window_increment = increment
+ return [f]
+
+ return []
+
+ def _build_hdr_validation_flags(self, events: Any) -> HeaderValidationFlags:
+ """
+ Constructs a set of header validation flags for use when normalizing
+ and validating header blocks.
+ """
+ is_trailer = isinstance(
+ events[0], (_TrailersSent, TrailersReceived),
+ )
+ is_response_header = isinstance(
+ events[0],
+ (
+ _ResponseSent,
+ ResponseReceived,
+ InformationalResponseReceived,
+ ),
+ )
+ is_push_promise = isinstance(
+ events[0], (PushedStreamReceived, _PushedRequestSent),
+ )
+
+ return HeaderValidationFlags(
+ is_client=self.state_machine.client or False,
+ is_trailer=is_trailer,
+ is_response_header=is_response_header,
+ is_push_promise=is_push_promise,
+ )
+
+ def _build_headers_frames(self,
+ headers: Iterable[Header],
+ encoder: Encoder,
+ first_frame: HeadersFrame | PushPromiseFrame,
+ hdr_validation_flags: HeaderValidationFlags) \
+ -> list[HeadersFrame | ContinuationFrame | PushPromiseFrame]:
+ """
+ Helper method to build headers or push promise frames.
+ """
+ # We need to lowercase the header names, and to ensure that secure
+ # header fields are kept out of compression contexts.
+ if self.config.normalize_outbound_headers:
+ # also we may want to split outbound cookies to improve
+ # headers compression
+ should_split_outbound_cookies = self.config.split_outbound_cookies
+
+ headers = normalize_outbound_headers(
+ headers, hdr_validation_flags, should_split_outbound_cookies,
+ )
+ if self.config.validate_outbound_headers:
+ headers = validate_outbound_headers(
+ headers, hdr_validation_flags,
+ )
+
+ encoded_headers = encoder.encode(headers)
+
+ # Slice into blocks of max_outbound_frame_size. Be careful with this:
+ # it only works right because we never send padded frames or priority
+ # information on the frames. Revisit this if we do.
+ header_blocks = [
+ encoded_headers[i:i+(self.max_outbound_frame_size or 0)]
+ for i in range(
+ 0, len(encoded_headers), (self.max_outbound_frame_size or 0),
+ )
+ ]
+
+ frames: list[HeadersFrame | ContinuationFrame | PushPromiseFrame] = []
+ first_frame.data = header_blocks[0]
+ frames.append(first_frame)
+
+ for block in header_blocks[1:]:
+ cf = ContinuationFrame(self.stream_id)
+ cf.data = block
+ frames.append(cf)
+
+ frames[-1].flags.add("END_HEADERS")
+ return frames
+
+ def _process_received_headers(self,
+ headers: Iterable[Header],
+ header_validation_flags: HeaderValidationFlags,
+ header_encoding: bool | str | None) -> Iterable[Header]:
+ """
+ When headers have been received from the remote peer, run a processing
+ pipeline on them to transform them into the appropriate form for
+ attaching to an event.
+ """
+ if self.config.normalize_inbound_headers:
+ headers = normalize_inbound_headers(
+ headers, header_validation_flags,
+ )
+
+ if self.config.validate_inbound_headers:
+ headers = validate_headers(headers, header_validation_flags)
+
+ if isinstance(header_encoding, str):
+ headers = _decode_headers(headers, header_encoding)
+
+ # The above steps are all generators, so we need to concretize the
+ # headers now.
+ return list(headers)
+
+ def _initialize_content_length(self, headers: Iterable[Header]) -> None:
+ """
+ Checks the headers for a content-length header and initializes the
+ _expected_content_length field from it. It's not an error for no
+ Content-Length header to be present.
+ """
+ if self.request_method == b"HEAD":
+ self._expected_content_length = 0
+ return
+
+ for n, v in headers:
+ if n == b"content-length":
+ try:
+ self._expected_content_length = int(v, 10)
+ except ValueError as err:
+ msg = f"Invalid content-length header: {v!r}"
+ raise ProtocolError(msg) from err
+
+ return
+
+ def _track_content_length(self, length: int, end_stream: bool) -> None:
+ """
+ Update the expected content length in response to data being received.
+ Validates that the appropriate amount of data is sent. Always updates
+ the received data, but only validates the length against the
+ content-length header if one was sent.
+
+ :param length: The length of the body chunk received.
+ :param end_stream: If this is the last body chunk received.
+ """
+ self._actual_content_length += length
+ actual = self._actual_content_length
+ expected = self._expected_content_length
+
+ if expected is not None:
+ if expected < actual:
+ raise InvalidBodyLengthError(expected, actual)
+
+ if end_stream and expected != actual:
+ raise InvalidBodyLengthError(expected, actual)
+
+ def _inbound_flow_control_change_from_settings(self, delta: int) -> None:
+ """
+ We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to
+ update the target window size for flow control. For our flow control
+ strategy, this means we need to do two things: we need to adjust the
+ current window size, but we also need to set the target maximum window
+ size to the new value.
+ """
+ new_max_size = self._inbound_window_manager.max_window_size + delta
+ self._inbound_window_manager.window_opened(delta)
+ self._inbound_window_manager.max_window_size = new_max_size
+
+
+def _decode_headers(headers: Iterable[HeaderWeaklyTyped], encoding: str) -> Generator[HeaderTuple, None, None]:
+ """
+ Given an iterable of header two-tuples and an encoding, decodes those
+ headers using that encoding while preserving the type of the header tuple.
+ This ensures that the use of ``HeaderTuple`` is preserved.
+ """
+ for header in headers:
+ # This function expects to work on decoded headers, which are always
+ # HeaderTuple objects.
+ assert isinstance(header, HeaderTuple)
+
+ name, value = header
+ assert isinstance(name, bytes)
+ assert isinstance(value, bytes)
+
+ n = name.decode(encoding)
+ v = value.decode(encoding)
+ yield header.__class__(n, v)