aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/websockets/asyncio
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/websockets/asyncio')
-rw-r--r--.venv/lib/python3.12/site-packages/websockets/asyncio/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/websockets/asyncio/async_timeout.py282
-rw-r--r--.venv/lib/python3.12/site-packages/websockets/asyncio/client.py567
-rw-r--r--.venv/lib/python3.12/site-packages/websockets/asyncio/compatibility.py30
-rw-r--r--.venv/lib/python3.12/site-packages/websockets/asyncio/connection.py1214
-rw-r--r--.venv/lib/python3.12/site-packages/websockets/asyncio/messages.py296
-rw-r--r--.venv/lib/python3.12/site-packages/websockets/asyncio/server.py978
7 files changed, 3367 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/websockets/asyncio/__init__.py b/.venv/lib/python3.12/site-packages/websockets/asyncio/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/websockets/asyncio/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/websockets/asyncio/async_timeout.py b/.venv/lib/python3.12/site-packages/websockets/asyncio/async_timeout.py
new file mode 100644
index 00000000..6ffa8996
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/websockets/asyncio/async_timeout.py
@@ -0,0 +1,282 @@
+# From https://github.com/aio-libs/async-timeout/blob/master/async_timeout/__init__.py
+# Licensed under the Apache License (Apache-2.0)
+
+import asyncio
+import enum
+import sys
+import warnings
+from types import TracebackType
+from typing import Optional, Type
+
+
+if sys.version_info >= (3, 11):
+ from typing import final
+else:
+ # From https://github.com/python/typing_extensions/blob/main/src/typing_extensions.py
+ # Licensed under the Python Software Foundation License (PSF-2.0)
+
+ # @final exists in 3.8+, but we backport it for all versions
+ # before 3.11 to keep support for the __final__ attribute.
+ # See https://bugs.python.org/issue46342
+ def final(f):
+ """This decorator can be used to indicate to type checkers that
+ the decorated method cannot be overridden, and decorated class
+ cannot be subclassed. For example:
+
+ class Base:
+ @final
+ def done(self) -> None:
+ ...
+ class Sub(Base):
+ def done(self) -> None: # Error reported by type checker
+ ...
+ @final
+ class Leaf:
+ ...
+ class Other(Leaf): # Error reported by type checker
+ ...
+
+ There is no runtime checking of these properties. The decorator
+ sets the ``__final__`` attribute to ``True`` on the decorated object
+ to allow runtime introspection.
+ """
+ try:
+ f.__final__ = True
+ except (AttributeError, TypeError):
+ # Skip the attribute silently if it is not writable.
+ # AttributeError happens if the object has __slots__ or a
+ # read-only property, TypeError if it's a builtin class.
+ pass
+ return f
+
+ # End https://github.com/python/typing_extensions/blob/main/src/typing_extensions.py
+
+
+if sys.version_info >= (3, 11):
+
+ def _uncancel_task(task: "asyncio.Task[object]") -> None:
+ task.uncancel()
+
+else:
+
+ def _uncancel_task(task: "asyncio.Task[object]") -> None:
+ pass
+
+
+__version__ = "4.0.3"
+
+
+__all__ = ("timeout", "timeout_at", "Timeout")
+
+
+def timeout(delay: Optional[float]) -> "Timeout":
+ """timeout context manager.
+
+ Useful in cases when you want to apply timeout logic around block
+ of code or in cases when asyncio.wait_for is not suitable. For example:
+
+ >>> async with timeout(0.001):
+ ... async with aiohttp.get('https://github.com') as r:
+ ... await r.text()
+
+
+ delay - value in seconds or None to disable timeout logic
+ """
+ loop = asyncio.get_running_loop()
+ if delay is not None:
+ deadline = loop.time() + delay # type: Optional[float]
+ else:
+ deadline = None
+ return Timeout(deadline, loop)
+
+
+def timeout_at(deadline: Optional[float]) -> "Timeout":
+ """Schedule the timeout at absolute time.
+
+ deadline argument points on the time in the same clock system
+ as loop.time().
+
+ Please note: it is not POSIX time but a time with
+ undefined starting base, e.g. the time of the system power on.
+
+ >>> async with timeout_at(loop.time() + 10):
+ ... async with aiohttp.get('https://github.com') as r:
+ ... await r.text()
+
+
+ """
+ loop = asyncio.get_running_loop()
+ return Timeout(deadline, loop)
+
+
+class _State(enum.Enum):
+ INIT = "INIT"
+ ENTER = "ENTER"
+ TIMEOUT = "TIMEOUT"
+ EXIT = "EXIT"
+
+
+@final
+class Timeout:
+ # Internal class, please don't instantiate it directly
+ # Use timeout() and timeout_at() public factories instead.
+ #
+ # Implementation note: `async with timeout()` is preferred
+ # over `with timeout()`.
+ # While technically the Timeout class implementation
+ # doesn't need to be async at all,
+ # the `async with` statement explicitly points that
+ # the context manager should be used from async function context.
+ #
+ # This design allows to avoid many silly misusages.
+ #
+ # TimeoutError is raised immediately when scheduled
+ # if the deadline is passed.
+ # The purpose is to time out as soon as possible
+ # without waiting for the next await expression.
+
+ __slots__ = ("_deadline", "_loop", "_state", "_timeout_handler", "_task")
+
+ def __init__(
+ self, deadline: Optional[float], loop: asyncio.AbstractEventLoop
+ ) -> None:
+ self._loop = loop
+ self._state = _State.INIT
+
+ self._task: Optional["asyncio.Task[object]"] = None
+ self._timeout_handler = None # type: Optional[asyncio.Handle]
+ if deadline is None:
+ self._deadline = None # type: Optional[float]
+ else:
+ self.update(deadline)
+
+ def __enter__(self) -> "Timeout":
+ warnings.warn(
+ "with timeout() is deprecated, use async with timeout() instead",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ self._do_enter()
+ return self
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> Optional[bool]:
+ self._do_exit(exc_type)
+ return None
+
+ async def __aenter__(self) -> "Timeout":
+ self._do_enter()
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> Optional[bool]:
+ self._do_exit(exc_type)
+ return None
+
+ @property
+ def expired(self) -> bool:
+ """Is timeout expired during execution?"""
+ return self._state == _State.TIMEOUT
+
+ @property
+ def deadline(self) -> Optional[float]:
+ return self._deadline
+
+ def reject(self) -> None:
+ """Reject scheduled timeout if any."""
+ # cancel is maybe better name but
+ # task.cancel() raises CancelledError in asyncio world.
+ if self._state not in (_State.INIT, _State.ENTER):
+ raise RuntimeError(f"invalid state {self._state.value}")
+ self._reject()
+
+ def _reject(self) -> None:
+ self._task = None
+ if self._timeout_handler is not None:
+ self._timeout_handler.cancel()
+ self._timeout_handler = None
+
+ def shift(self, delay: float) -> None:
+ """Advance timeout on delay seconds.
+
+ The delay can be negative.
+
+ Raise RuntimeError if shift is called when deadline is not scheduled
+ """
+ deadline = self._deadline
+ if deadline is None:
+ raise RuntimeError("cannot shift timeout if deadline is not scheduled")
+ self.update(deadline + delay)
+
+ def update(self, deadline: float) -> None:
+ """Set deadline to absolute value.
+
+ deadline argument points on the time in the same clock system
+ as loop.time().
+
+ If new deadline is in the past the timeout is raised immediately.
+
+ Please note: it is not POSIX time but a time with
+ undefined starting base, e.g. the time of the system power on.
+ """
+ if self._state == _State.EXIT:
+ raise RuntimeError("cannot reschedule after exit from context manager")
+ if self._state == _State.TIMEOUT:
+ raise RuntimeError("cannot reschedule expired timeout")
+ if self._timeout_handler is not None:
+ self._timeout_handler.cancel()
+ self._deadline = deadline
+ if self._state != _State.INIT:
+ self._reschedule()
+
+ def _reschedule(self) -> None:
+ assert self._state == _State.ENTER
+ deadline = self._deadline
+ if deadline is None:
+ return
+
+ now = self._loop.time()
+ if self._timeout_handler is not None:
+ self._timeout_handler.cancel()
+
+ self._task = asyncio.current_task()
+ if deadline <= now:
+ self._timeout_handler = self._loop.call_soon(self._on_timeout)
+ else:
+ self._timeout_handler = self._loop.call_at(deadline, self._on_timeout)
+
+ def _do_enter(self) -> None:
+ if self._state != _State.INIT:
+ raise RuntimeError(f"invalid state {self._state.value}")
+ self._state = _State.ENTER
+ self._reschedule()
+
+ def _do_exit(self, exc_type: Optional[Type[BaseException]]) -> None:
+ if exc_type is asyncio.CancelledError and self._state == _State.TIMEOUT:
+ assert self._task is not None
+ _uncancel_task(self._task)
+ self._timeout_handler = None
+ self._task = None
+ raise asyncio.TimeoutError
+ # timeout has not expired
+ self._state = _State.EXIT
+ self._reject()
+ return None
+
+ def _on_timeout(self) -> None:
+ assert self._task is not None
+ self._task.cancel()
+ self._state = _State.TIMEOUT
+ # drop the reference early
+ self._timeout_handler = None
+
+
+# End https://github.com/aio-libs/async-timeout/blob/master/async_timeout/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/websockets/asyncio/client.py b/.venv/lib/python3.12/site-packages/websockets/asyncio/client.py
new file mode 100644
index 00000000..f05f546d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/websockets/asyncio/client.py
@@ -0,0 +1,567 @@
+from __future__ import annotations
+
+import asyncio
+import logging
+import os
+import traceback
+import urllib.parse
+from collections.abc import AsyncIterator, Generator, Sequence
+from types import TracebackType
+from typing import Any, Callable
+
+from ..client import ClientProtocol, backoff
+from ..datastructures import HeadersLike
+from ..exceptions import InvalidMessage, InvalidStatus, SecurityError
+from ..extensions.base import ClientExtensionFactory
+from ..extensions.permessage_deflate import enable_client_permessage_deflate
+from ..headers import validate_subprotocols
+from ..http11 import USER_AGENT, Response
+from ..protocol import CONNECTING, Event
+from ..typing import LoggerLike, Origin, Subprotocol
+from ..uri import WebSocketURI, parse_uri
+from .compatibility import TimeoutError, asyncio_timeout
+from .connection import Connection
+
+
+__all__ = ["connect", "unix_connect", "ClientConnection"]
+
+MAX_REDIRECTS = int(os.environ.get("WEBSOCKETS_MAX_REDIRECTS", "10"))
+
+
+class ClientConnection(Connection):
+ """
+ :mod:`asyncio` implementation of a WebSocket client connection.
+
+ :class:`ClientConnection` provides :meth:`recv` and :meth:`send` coroutines
+ for receiving and sending messages.
+
+ It supports asynchronous iteration to receive messages::
+
+ async for message in websocket:
+ await process(message)
+
+ The iterator exits normally when the connection is closed with close code
+ 1000 (OK) or 1001 (going away) or without a close code. It raises a
+ :exc:`~websockets.exceptions.ConnectionClosedError` when the connection is
+ closed with any other code.
+
+ The ``ping_interval``, ``ping_timeout``, ``close_timeout``, ``max_queue``,
+ and ``write_limit`` arguments have the same meaning as in :func:`connect`.
+
+ Args:
+ protocol: Sans-I/O connection.
+
+ """
+
+ def __init__(
+ self,
+ protocol: ClientProtocol,
+ *,
+ ping_interval: float | None = 20,
+ ping_timeout: float | None = 20,
+ close_timeout: float | None = 10,
+ max_queue: int | None | tuple[int | None, int | None] = 16,
+ write_limit: int | tuple[int, int | None] = 2**15,
+ ) -> None:
+ self.protocol: ClientProtocol
+ super().__init__(
+ protocol,
+ ping_interval=ping_interval,
+ ping_timeout=ping_timeout,
+ close_timeout=close_timeout,
+ max_queue=max_queue,
+ write_limit=write_limit,
+ )
+ self.response_rcvd: asyncio.Future[None] = self.loop.create_future()
+
+ async def handshake(
+ self,
+ additional_headers: HeadersLike | None = None,
+ user_agent_header: str | None = USER_AGENT,
+ ) -> None:
+ """
+ Perform the opening handshake.
+
+ """
+ async with self.send_context(expected_state=CONNECTING):
+ self.request = self.protocol.connect()
+ if additional_headers is not None:
+ self.request.headers.update(additional_headers)
+ if user_agent_header:
+ self.request.headers["User-Agent"] = user_agent_header
+ self.protocol.send_request(self.request)
+
+ await asyncio.wait(
+ [self.response_rcvd, self.connection_lost_waiter],
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ # self.protocol.handshake_exc is set when the connection is lost before
+ # receiving a response, when the response cannot be parsed, or when the
+ # response fails the handshake.
+
+ if self.protocol.handshake_exc is not None:
+ raise self.protocol.handshake_exc
+
+ def process_event(self, event: Event) -> None:
+ """
+ Process one incoming event.
+
+ """
+ # First event - handshake response.
+ if self.response is None:
+ assert isinstance(event, Response)
+ self.response = event
+ self.response_rcvd.set_result(None)
+ # Later events - frames.
+ else:
+ super().process_event(event)
+
+
+def process_exception(exc: Exception) -> Exception | None:
+ """
+ Determine whether a connection error is retryable or fatal.
+
+ When reconnecting automatically with ``async for ... in connect(...)``, if a
+ connection attempt fails, :func:`process_exception` is called to determine
+ whether to retry connecting or to raise the exception.
+
+ This function defines the default behavior, which is to retry on:
+
+ * :exc:`EOFError`, :exc:`OSError`, :exc:`asyncio.TimeoutError`: network
+ errors;
+ * :exc:`~websockets.exceptions.InvalidStatus` when the status code is 500,
+ 502, 503, or 504: server or proxy errors.
+
+ All other exceptions are considered fatal.
+
+ You can change this behavior with the ``process_exception`` argument of
+ :func:`connect`.
+
+ Return :obj:`None` if the exception is retryable i.e. when the error could
+ be transient and trying to reconnect with the same parameters could succeed.
+ The exception will be logged at the ``INFO`` level.
+
+ Return an exception, either ``exc`` or a new exception, if the exception is
+ fatal i.e. when trying to reconnect will most likely produce the same error.
+ That exception will be raised, breaking out of the retry loop.
+
+ """
+ if isinstance(exc, (OSError, asyncio.TimeoutError)):
+ return None
+ if isinstance(exc, InvalidMessage) and isinstance(exc.__cause__, EOFError):
+ return None
+ if isinstance(exc, InvalidStatus) and exc.response.status_code in [
+ 500, # Internal Server Error
+ 502, # Bad Gateway
+ 503, # Service Unavailable
+ 504, # Gateway Timeout
+ ]:
+ return None
+ return exc
+
+
+# This is spelled in lower case because it's exposed as a callable in the API.
+class connect:
+ """
+ Connect to the WebSocket server at ``uri``.
+
+ This coroutine returns a :class:`ClientConnection` instance, which you can
+ use to send and receive messages.
+
+ :func:`connect` may be used as an asynchronous context manager::
+
+ from websockets.asyncio.client import connect
+
+ async with connect(...) as websocket:
+ ...
+
+ The connection is closed automatically when exiting the context.
+
+ :func:`connect` can be used as an infinite asynchronous iterator to
+ reconnect automatically on errors::
+
+ async for websocket in connect(...):
+ try:
+ ...
+ except websockets.exceptions.ConnectionClosed:
+ continue
+
+ If the connection fails with a transient error, it is retried with
+ exponential backoff. If it fails with a fatal error, the exception is
+ raised, breaking out of the loop.
+
+ The connection is closed automatically after each iteration of the loop.
+
+ Args:
+ uri: URI of the WebSocket server.
+ origin: Value of the ``Origin`` header, for servers that require it.
+ extensions: List of supported extensions, in order in which they
+ should be negotiated and run.
+ subprotocols: List of supported subprotocols, in order of decreasing
+ preference.
+ additional_headers (HeadersLike | None): Arbitrary HTTP headers to add
+ to the handshake request.
+ user_agent_header: Value of the ``User-Agent`` request header.
+ It defaults to ``"Python/x.y.z websockets/X.Y"``.
+ Setting it to :obj:`None` removes the header.
+ compression: The "permessage-deflate" extension is enabled by default.
+ Set ``compression`` to :obj:`None` to disable it. See the
+ :doc:`compression guide <../../topics/compression>` for details.
+ process_exception: When reconnecting automatically, tell whether an
+ error is transient or fatal. The default behavior is defined by
+ :func:`process_exception`. Refer to its documentation for details.
+ open_timeout: Timeout for opening the connection in seconds.
+ :obj:`None` disables the timeout.
+ ping_interval: Interval between keepalive pings in seconds.
+ :obj:`None` disables keepalive.
+ ping_timeout: Timeout for keepalive pings in seconds.
+ :obj:`None` disables timeouts.
+ close_timeout: Timeout for closing the connection in seconds.
+ :obj:`None` disables the timeout.
+ max_size: Maximum size of incoming messages in bytes.
+ :obj:`None` disables the limit.
+ max_queue: High-water mark of the buffer where frames are received.
+ It defaults to 16 frames. The low-water mark defaults to ``max_queue
+ // 4``. You may pass a ``(high, low)`` tuple to set the high-water
+ and low-water marks. If you want to disable flow control entirely,
+ you may set it to ``None``, although that's a bad idea.
+ write_limit: High-water mark of write buffer in bytes. It is passed to
+ :meth:`~asyncio.WriteTransport.set_write_buffer_limits`. It defaults
+ to 32 KiB. You may pass a ``(high, low)`` tuple to set the
+ high-water and low-water marks.
+ logger: Logger for this client.
+ It defaults to ``logging.getLogger("websockets.client")``.
+ See the :doc:`logging guide <../../topics/logging>` for details.
+ create_connection: Factory for the :class:`ClientConnection` managing
+ the connection. Set it to a wrapper or a subclass to customize
+ connection handling.
+
+ Any other keyword arguments are passed to the event loop's
+ :meth:`~asyncio.loop.create_connection` method.
+
+ For example:
+
+ * You can set ``ssl`` to a :class:`~ssl.SSLContext` to enforce TLS settings.
+ When connecting to a ``wss://`` URI, if ``ssl`` isn't provided, a TLS
+ context is created with :func:`~ssl.create_default_context`.
+
+ * You can set ``server_hostname`` to override the host name from ``uri`` in
+ the TLS handshake.
+
+ * You can set ``host`` and ``port`` to connect to a different host and port
+ from those found in ``uri``. This only changes the destination of the TCP
+ connection. The host name from ``uri`` is still used in the TLS handshake
+ for secure connections and in the ``Host`` header.
+
+ * You can set ``sock`` to provide a preexisting TCP socket. You may call
+ :func:`socket.create_connection` (not to be confused with the event loop's
+ :meth:`~asyncio.loop.create_connection` method) to create a suitable
+ client socket and customize it.
+
+ Raises:
+ InvalidURI: If ``uri`` isn't a valid WebSocket URI.
+ OSError: If the TCP connection fails.
+ InvalidHandshake: If the opening handshake fails.
+ TimeoutError: If the opening handshake times out.
+
+ """
+
+ def __init__(
+ self,
+ uri: str,
+ *,
+ # WebSocket
+ origin: Origin | None = None,
+ extensions: Sequence[ClientExtensionFactory] | None = None,
+ subprotocols: Sequence[Subprotocol] | None = None,
+ additional_headers: HeadersLike | None = None,
+ user_agent_header: str | None = USER_AGENT,
+ compression: str | None = "deflate",
+ process_exception: Callable[[Exception], Exception | None] = process_exception,
+ # Timeouts
+ open_timeout: float | None = 10,
+ ping_interval: float | None = 20,
+ ping_timeout: float | None = 20,
+ close_timeout: float | None = 10,
+ # Limits
+ max_size: int | None = 2**20,
+ max_queue: int | None | tuple[int | None, int | None] = 16,
+ write_limit: int | tuple[int, int | None] = 2**15,
+ # Logging
+ logger: LoggerLike | None = None,
+ # Escape hatch for advanced customization
+ create_connection: type[ClientConnection] | None = None,
+ # Other keyword arguments are passed to loop.create_connection
+ **kwargs: Any,
+ ) -> None:
+ self.uri = uri
+
+ if subprotocols is not None:
+ validate_subprotocols(subprotocols)
+
+ if compression == "deflate":
+ extensions = enable_client_permessage_deflate(extensions)
+ elif compression is not None:
+ raise ValueError(f"unsupported compression: {compression}")
+
+ if logger is None:
+ logger = logging.getLogger("websockets.client")
+
+ if create_connection is None:
+ create_connection = ClientConnection
+
+ def protocol_factory(wsuri: WebSocketURI) -> ClientConnection:
+ # This is a protocol in the Sans-I/O implementation of websockets.
+ protocol = ClientProtocol(
+ wsuri,
+ origin=origin,
+ extensions=extensions,
+ subprotocols=subprotocols,
+ max_size=max_size,
+ logger=logger,
+ )
+ # This is a connection in websockets and a protocol in asyncio.
+ connection = create_connection(
+ protocol,
+ ping_interval=ping_interval,
+ ping_timeout=ping_timeout,
+ close_timeout=close_timeout,
+ max_queue=max_queue,
+ write_limit=write_limit,
+ )
+ return connection
+
+ self.protocol_factory = protocol_factory
+ self.handshake_args = (
+ additional_headers,
+ user_agent_header,
+ )
+ self.process_exception = process_exception
+ self.open_timeout = open_timeout
+ self.logger = logger
+ self.connection_kwargs = kwargs
+
+ async def create_connection(self) -> ClientConnection:
+ """Create TCP or Unix connection."""
+ loop = asyncio.get_running_loop()
+
+ wsuri = parse_uri(self.uri)
+ kwargs = self.connection_kwargs.copy()
+
+ def factory() -> ClientConnection:
+ return self.protocol_factory(wsuri)
+
+ if wsuri.secure:
+ kwargs.setdefault("ssl", True)
+ kwargs.setdefault("server_hostname", wsuri.host)
+ if kwargs.get("ssl") is None:
+ raise ValueError("ssl=None is incompatible with a wss:// URI")
+ else:
+ if kwargs.get("ssl") is not None:
+ raise ValueError("ssl argument is incompatible with a ws:// URI")
+
+ if kwargs.pop("unix", False):
+ _, connection = await loop.create_unix_connection(factory, **kwargs)
+ else:
+ if kwargs.get("sock") is None:
+ kwargs.setdefault("host", wsuri.host)
+ kwargs.setdefault("port", wsuri.port)
+ _, connection = await loop.create_connection(factory, **kwargs)
+ return connection
+
+ def process_redirect(self, exc: Exception) -> Exception | str:
+ """
+ Determine whether a connection error is a redirect that can be followed.
+
+ Return the new URI if it's a valid redirect. Else, return an exception.
+
+ """
+ if not (
+ isinstance(exc, InvalidStatus)
+ and exc.response.status_code
+ in [
+ 300, # Multiple Choices
+ 301, # Moved Permanently
+ 302, # Found
+ 303, # See Other
+ 307, # Temporary Redirect
+ 308, # Permanent Redirect
+ ]
+ and "Location" in exc.response.headers
+ ):
+ return exc
+
+ old_wsuri = parse_uri(self.uri)
+ new_uri = urllib.parse.urljoin(self.uri, exc.response.headers["Location"])
+ new_wsuri = parse_uri(new_uri)
+
+ # If connect() received a socket, it is closed and cannot be reused.
+ if self.connection_kwargs.get("sock") is not None:
+ return ValueError(
+ f"cannot follow redirect to {new_uri} with a preexisting socket"
+ )
+
+ # TLS downgrade is forbidden.
+ if old_wsuri.secure and not new_wsuri.secure:
+ return SecurityError(f"cannot follow redirect to non-secure URI {new_uri}")
+
+ # Apply restrictions to cross-origin redirects.
+ if (
+ old_wsuri.secure != new_wsuri.secure
+ or old_wsuri.host != new_wsuri.host
+ or old_wsuri.port != new_wsuri.port
+ ):
+ # Cross-origin redirects on Unix sockets don't quite make sense.
+ if self.connection_kwargs.get("unix", False):
+ return ValueError(
+ f"cannot follow cross-origin redirect to {new_uri} "
+ f"with a Unix socket"
+ )
+
+ # Cross-origin redirects when host and port are overridden are ill-defined.
+ if (
+ self.connection_kwargs.get("host") is not None
+ or self.connection_kwargs.get("port") is not None
+ ):
+ return ValueError(
+ f"cannot follow cross-origin redirect to {new_uri} "
+ f"with an explicit host or port"
+ )
+
+ return new_uri
+
+ # ... = await connect(...)
+
+ def __await__(self) -> Generator[Any, None, ClientConnection]:
+ # Create a suitable iterator by calling __await__ on a coroutine.
+ return self.__await_impl__().__await__()
+
+ async def __await_impl__(self) -> ClientConnection:
+ try:
+ async with asyncio_timeout(self.open_timeout):
+ for _ in range(MAX_REDIRECTS):
+ self.connection = await self.create_connection()
+ try:
+ await self.connection.handshake(*self.handshake_args)
+ except asyncio.CancelledError:
+ self.connection.transport.abort()
+ raise
+ except Exception as exc:
+ # Always close the connection even though keep-alive is
+ # the default in HTTP/1.1 because create_connection ties
+ # opening the network connection with initializing the
+ # protocol. In the current design of connect(), there is
+ # no easy way to reuse the network connection that works
+ # in every case nor to reinitialize the protocol.
+ self.connection.transport.abort()
+
+ uri_or_exc = self.process_redirect(exc)
+ # Response is a valid redirect; follow it.
+ if isinstance(uri_or_exc, str):
+ self.uri = uri_or_exc
+ continue
+ # Response isn't a valid redirect; raise the exception.
+ if uri_or_exc is exc:
+ raise
+ else:
+ raise uri_or_exc from exc
+
+ else:
+ self.connection.start_keepalive()
+ return self.connection
+ else:
+ raise SecurityError(f"more than {MAX_REDIRECTS} redirects")
+
+ except TimeoutError:
+ # Re-raise exception with an informative error message.
+ raise TimeoutError("timed out during handshake") from None
+
+ # ... = yield from connect(...) - remove when dropping Python < 3.10
+
+ __iter__ = __await__
+
+ # async with connect(...) as ...: ...
+
+ async def __aenter__(self) -> ClientConnection:
+ return await self
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_value: BaseException | None,
+ traceback: TracebackType | None,
+ ) -> None:
+ await self.connection.close()
+
+ # async for ... in connect(...):
+
+ async def __aiter__(self) -> AsyncIterator[ClientConnection]:
+ delays: Generator[float] | None = None
+ while True:
+ try:
+ async with self as protocol:
+ yield protocol
+ except Exception as exc:
+ # Determine whether the exception is retryable or fatal.
+ # The API of process_exception is "return an exception or None";
+ # "raise an exception" is also supported because it's a frequent
+ # mistake. It isn't documented in order to keep the API simple.
+ try:
+ new_exc = self.process_exception(exc)
+ except Exception as raised_exc:
+ new_exc = raised_exc
+
+ # The connection failed with a fatal error.
+ # Raise the exception and exit the loop.
+ if new_exc is exc:
+ raise
+ if new_exc is not None:
+ raise new_exc from exc
+
+ # The connection failed with a retryable error.
+ # Start or continue backoff and reconnect.
+ if delays is None:
+ delays = backoff()
+ delay = next(delays)
+ self.logger.info(
+ "connect failed; reconnecting in %.1f seconds: %s",
+ delay,
+ # Remove first argument when dropping Python 3.9.
+ traceback.format_exception_only(type(exc), exc)[0].strip(),
+ )
+ await asyncio.sleep(delay)
+ continue
+
+ else:
+ # The connection succeeded. Reset backoff.
+ delays = None
+
+
+def unix_connect(
+ path: str | None = None,
+ uri: str | None = None,
+ **kwargs: Any,
+) -> connect:
+ """
+ Connect to a WebSocket server listening on a Unix socket.
+
+ This function accepts the same keyword arguments as :func:`connect`.
+
+ It's only available on Unix.
+
+ It's mainly useful for debugging servers listening on Unix sockets.
+
+ Args:
+ path: File system path to the Unix socket.
+ uri: URI of the WebSocket server. ``uri`` defaults to
+ ``ws://localhost/`` or, when a ``ssl`` argument is provided, to
+ ``wss://localhost/``.
+
+ """
+ if uri is None:
+ if kwargs.get("ssl") is None:
+ uri = "ws://localhost/"
+ else:
+ uri = "wss://localhost/"
+ return connect(uri=uri, unix=True, path=path, **kwargs)
diff --git a/.venv/lib/python3.12/site-packages/websockets/asyncio/compatibility.py b/.venv/lib/python3.12/site-packages/websockets/asyncio/compatibility.py
new file mode 100644
index 00000000..e1700006
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/websockets/asyncio/compatibility.py
@@ -0,0 +1,30 @@
+from __future__ import annotations
+
+import sys
+
+
+__all__ = ["TimeoutError", "aiter", "anext", "asyncio_timeout", "asyncio_timeout_at"]
+
+
+if sys.version_info[:2] >= (3, 11):
+ TimeoutError = TimeoutError
+ aiter = aiter
+ anext = anext
+ from asyncio import (
+ timeout as asyncio_timeout, # noqa: F401
+ timeout_at as asyncio_timeout_at, # noqa: F401
+ )
+
+else: # Python < 3.11
+ from asyncio import TimeoutError
+
+ def aiter(async_iterable):
+ return type(async_iterable).__aiter__(async_iterable)
+
+ async def anext(async_iterator):
+ return await type(async_iterator).__anext__(async_iterator)
+
+ from .async_timeout import (
+ timeout as asyncio_timeout, # noqa: F401
+ timeout_at as asyncio_timeout_at, # noqa: F401
+ )
diff --git a/.venv/lib/python3.12/site-packages/websockets/asyncio/connection.py b/.venv/lib/python3.12/site-packages/websockets/asyncio/connection.py
new file mode 100644
index 00000000..e2e587e7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/websockets/asyncio/connection.py
@@ -0,0 +1,1214 @@
+from __future__ import annotations
+
+import asyncio
+import collections
+import contextlib
+import logging
+import random
+import struct
+import sys
+import traceback
+import uuid
+from collections.abc import AsyncIterable, AsyncIterator, Awaitable, Iterable, Mapping
+from types import TracebackType
+from typing import Any, cast
+
+from ..exceptions import (
+ ConcurrencyError,
+ ConnectionClosed,
+ ConnectionClosedOK,
+ ProtocolError,
+)
+from ..frames import DATA_OPCODES, BytesLike, CloseCode, Frame, Opcode
+from ..http11 import Request, Response
+from ..protocol import CLOSED, OPEN, Event, Protocol, State
+from ..typing import Data, LoggerLike, Subprotocol
+from .compatibility import (
+ TimeoutError,
+ aiter,
+ anext,
+ asyncio_timeout,
+ asyncio_timeout_at,
+)
+from .messages import Assembler
+
+
+__all__ = ["Connection"]
+
+
+class Connection(asyncio.Protocol):
+ """
+ :mod:`asyncio` implementation of a WebSocket connection.
+
+ :class:`Connection` provides APIs shared between WebSocket servers and
+ clients.
+
+ You shouldn't use it directly. Instead, use
+ :class:`~websockets.asyncio.client.ClientConnection` or
+ :class:`~websockets.asyncio.server.ServerConnection`.
+
+ """
+
+ def __init__(
+ self,
+ protocol: Protocol,
+ *,
+ ping_interval: float | None = 20,
+ ping_timeout: float | None = 20,
+ close_timeout: float | None = 10,
+ max_queue: int | None | tuple[int | None, int | None] = 16,
+ write_limit: int | tuple[int, int | None] = 2**15,
+ ) -> None:
+ self.protocol = protocol
+ self.ping_interval = ping_interval
+ self.ping_timeout = ping_timeout
+ self.close_timeout = close_timeout
+ if isinstance(max_queue, int) or max_queue is None:
+ max_queue = (max_queue, None)
+ self.max_queue = max_queue
+ if isinstance(write_limit, int):
+ write_limit = (write_limit, None)
+ self.write_limit = write_limit
+
+ # Inject reference to this instance in the protocol's logger.
+ self.protocol.logger = logging.LoggerAdapter(
+ self.protocol.logger,
+ {"websocket": self},
+ )
+
+ # Copy attributes from the protocol for convenience.
+ self.id: uuid.UUID = self.protocol.id
+ """Unique identifier of the connection. Useful in logs."""
+ self.logger: LoggerLike = self.protocol.logger
+ """Logger for this connection."""
+ self.debug = self.protocol.debug
+
+ # HTTP handshake request and response.
+ self.request: Request | None = None
+ """Opening handshake request."""
+ self.response: Response | None = None
+ """Opening handshake response."""
+
+ # Event loop running this connection.
+ self.loop = asyncio.get_running_loop()
+
+ # Assembler turning frames into messages and serializing reads.
+ self.recv_messages: Assembler # initialized in connection_made
+
+ # Deadline for the closing handshake.
+ self.close_deadline: float | None = None
+
+ # Protect sending fragmented messages.
+ self.fragmented_send_waiter: asyncio.Future[None] | None = None
+
+ # Mapping of ping IDs to pong waiters, in chronological order.
+ self.pong_waiters: dict[bytes, tuple[asyncio.Future[float], float]] = {}
+
+ self.latency: float = 0
+ """
+ Latency of the connection, in seconds.
+
+ Latency is defined as the round-trip time of the connection. It is
+ measured by sending a Ping frame and waiting for a matching Pong frame.
+ Before the first measurement, :attr:`latency` is ``0``.
+
+ By default, websockets enables a :ref:`keepalive <keepalive>` mechanism
+ that sends Ping frames automatically at regular intervals. You can also
+ send Ping frames and measure latency with :meth:`ping`.
+ """
+
+ # Task that sends keepalive pings. None when ping_interval is None.
+ self.keepalive_task: asyncio.Task[None] | None = None
+
+ # Exception raised while reading from the connection, to be chained to
+ # ConnectionClosed in order to show why the TCP connection dropped.
+ self.recv_exc: BaseException | None = None
+
+ # Completed when the TCP connection is closed and the WebSocket
+ # connection state becomes CLOSED.
+ self.connection_lost_waiter: asyncio.Future[None] = self.loop.create_future()
+
+ # Adapted from asyncio.FlowControlMixin
+ self.paused: bool = False
+ self.drain_waiters: collections.deque[asyncio.Future[None]] = (
+ collections.deque()
+ )
+
+ # Public attributes
+
+ @property
+ def local_address(self) -> Any:
+ """
+ Local address of the connection.
+
+ For IPv4 connections, this is a ``(host, port)`` tuple.
+
+ The format of the address depends on the address family.
+ See :meth:`~socket.socket.getsockname`.
+
+ """
+ return self.transport.get_extra_info("sockname")
+
+ @property
+ def remote_address(self) -> Any:
+ """
+ Remote address of the connection.
+
+ For IPv4 connections, this is a ``(host, port)`` tuple.
+
+ The format of the address depends on the address family.
+ See :meth:`~socket.socket.getpeername`.
+
+ """
+ return self.transport.get_extra_info("peername")
+
+ @property
+ def state(self) -> State:
+ """
+ State of the WebSocket connection, defined in :rfc:`6455`.
+
+ This attribute is provided for completeness. Typical applications
+ shouldn't check its value. Instead, they should call :meth:`~recv` or
+ :meth:`send` and handle :exc:`~websockets.exceptions.ConnectionClosed`
+ exceptions.
+
+ """
+ return self.protocol.state
+
+ @property
+ def subprotocol(self) -> Subprotocol | None:
+ """
+ Subprotocol negotiated during the opening handshake.
+
+ :obj:`None` if no subprotocol was negotiated.
+
+ """
+ return self.protocol.subprotocol
+
+ @property
+ def close_code(self) -> int | None:
+ """
+ State of the WebSocket connection, defined in :rfc:`6455`.
+
+ This attribute is provided for completeness. Typical applications
+ shouldn't check its value. Instead, they should inspect attributes
+ of :exc:`~websockets.exceptions.ConnectionClosed` exceptions.
+
+ """
+ return self.protocol.close_code
+
+ @property
+ def close_reason(self) -> str | None:
+ """
+ State of the WebSocket connection, defined in :rfc:`6455`.
+
+ This attribute is provided for completeness. Typical applications
+ shouldn't check its value. Instead, they should inspect attributes
+ of :exc:`~websockets.exceptions.ConnectionClosed` exceptions.
+
+ """
+ return self.protocol.close_reason
+
+ # Public methods
+
+ async def __aenter__(self) -> Connection:
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_value: BaseException | None,
+ traceback: TracebackType | None,
+ ) -> None:
+ if exc_type is None:
+ await self.close()
+ else:
+ await self.close(CloseCode.INTERNAL_ERROR)
+
+ async def __aiter__(self) -> AsyncIterator[Data]:
+ """
+ Iterate on incoming messages.
+
+ The iterator calls :meth:`recv` and yields messages asynchronously in an
+ infinite loop.
+
+ It exits when the connection is closed normally. It raises a
+ :exc:`~websockets.exceptions.ConnectionClosedError` exception after a
+ protocol error or a network failure.
+
+ """
+ try:
+ while True:
+ yield await self.recv()
+ except ConnectionClosedOK:
+ return
+
+ async def recv(self, decode: bool | None = None) -> Data:
+ """
+ Receive the next message.
+
+ When the connection is closed, :meth:`recv` raises
+ :exc:`~websockets.exceptions.ConnectionClosed`. Specifically, it raises
+ :exc:`~websockets.exceptions.ConnectionClosedOK` after a normal closure
+ and :exc:`~websockets.exceptions.ConnectionClosedError` after a protocol
+ error or a network failure. This is how you detect the end of the
+ message stream.
+
+ Canceling :meth:`recv` is safe. There's no risk of losing data. The next
+ invocation of :meth:`recv` will return the next message.
+
+ This makes it possible to enforce a timeout by wrapping :meth:`recv` in
+ :func:`~asyncio.timeout` or :func:`~asyncio.wait_for`.
+
+ When the message is fragmented, :meth:`recv` waits until all fragments
+ are received, reassembles them, and returns the whole message.
+
+ Args:
+ decode: Set this flag to override the default behavior of returning
+ :class:`str` or :class:`bytes`. See below for details.
+
+ Returns:
+ A string (:class:`str`) for a Text_ frame or a bytestring
+ (:class:`bytes`) for a Binary_ frame.
+
+ .. _Text: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
+ .. _Binary: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
+
+ You may override this behavior with the ``decode`` argument:
+
+ * Set ``decode=False`` to disable UTF-8 decoding of Text_ frames and
+ return a bytestring (:class:`bytes`). This improves performance
+ when decoding isn't needed, for example if the message contains
+ JSON and you're using a JSON library that expects a bytestring.
+ * Set ``decode=True`` to force UTF-8 decoding of Binary_ frames
+ and return a string (:class:`str`). This may be useful for
+ servers that send binary frames instead of text frames.
+
+ Raises:
+ ConnectionClosed: When the connection is closed.
+ ConcurrencyError: If two coroutines call :meth:`recv` or
+ :meth:`recv_streaming` concurrently.
+
+ """
+ try:
+ return await self.recv_messages.get(decode)
+ except EOFError:
+ pass
+ # fallthrough
+ except ConcurrencyError:
+ raise ConcurrencyError(
+ "cannot call recv while another coroutine "
+ "is already running recv or recv_streaming"
+ ) from None
+ except UnicodeDecodeError as exc:
+ async with self.send_context():
+ self.protocol.fail(
+ CloseCode.INVALID_DATA,
+ f"{exc.reason} at position {exc.start}",
+ )
+ # fallthrough
+
+ # Wait for the protocol state to be CLOSED before accessing close_exc.
+ await asyncio.shield(self.connection_lost_waiter)
+ raise self.protocol.close_exc from self.recv_exc
+
+ async def recv_streaming(self, decode: bool | None = None) -> AsyncIterator[Data]:
+ """
+ Receive the next message frame by frame.
+
+ This method is designed for receiving fragmented messages. It returns an
+ asynchronous iterator that yields each fragment as it is received. This
+ iterator must be fully consumed. Else, future calls to :meth:`recv` or
+ :meth:`recv_streaming` will raise
+ :exc:`~websockets.exceptions.ConcurrencyError`, making the connection
+ unusable.
+
+ :meth:`recv_streaming` raises the same exceptions as :meth:`recv`.
+
+ Canceling :meth:`recv_streaming` before receiving the first frame is
+ safe. Canceling it after receiving one or more frames leaves the
+ iterator in a partially consumed state, making the connection unusable.
+ Instead, you should close the connection with :meth:`close`.
+
+ Args:
+ decode: Set this flag to override the default behavior of returning
+ :class:`str` or :class:`bytes`. See below for details.
+
+ Returns:
+ An iterator of strings (:class:`str`) for a Text_ frame or
+ bytestrings (:class:`bytes`) for a Binary_ frame.
+
+ .. _Text: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
+ .. _Binary: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
+
+ You may override this behavior with the ``decode`` argument:
+
+ * Set ``decode=False`` to disable UTF-8 decoding of Text_ frames
+ and return bytestrings (:class:`bytes`). This may be useful to
+ optimize performance when decoding isn't needed.
+ * Set ``decode=True`` to force UTF-8 decoding of Binary_ frames
+ and return strings (:class:`str`). This is useful for servers
+ that send binary frames instead of text frames.
+
+ Raises:
+ ConnectionClosed: When the connection is closed.
+ ConcurrencyError: If two coroutines call :meth:`recv` or
+ :meth:`recv_streaming` concurrently.
+
+ """
+ try:
+ async for frame in self.recv_messages.get_iter(decode):
+ yield frame
+ return
+ except EOFError:
+ pass
+ # fallthrough
+ except ConcurrencyError:
+ raise ConcurrencyError(
+ "cannot call recv_streaming while another coroutine "
+ "is already running recv or recv_streaming"
+ ) from None
+ except UnicodeDecodeError as exc:
+ async with self.send_context():
+ self.protocol.fail(
+ CloseCode.INVALID_DATA,
+ f"{exc.reason} at position {exc.start}",
+ )
+ # fallthrough
+
+ # Wait for the protocol state to be CLOSED before accessing close_exc.
+ await asyncio.shield(self.connection_lost_waiter)
+ raise self.protocol.close_exc from self.recv_exc
+
+ async def send(
+ self,
+ message: Data | Iterable[Data] | AsyncIterable[Data],
+ text: bool | None = None,
+ ) -> None:
+ """
+ Send a message.
+
+ A string (:class:`str`) is sent as a Text_ frame. A bytestring or
+ bytes-like object (:class:`bytes`, :class:`bytearray`, or
+ :class:`memoryview`) is sent as a Binary_ frame.
+
+ .. _Text: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
+ .. _Binary: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
+
+ You may override this behavior with the ``text`` argument:
+
+ * Set ``text=True`` to send a bytestring or bytes-like object
+ (:class:`bytes`, :class:`bytearray`, or :class:`memoryview`) as a
+ Text_ frame. This improves performance when the message is already
+ UTF-8 encoded, for example if the message contains JSON and you're
+ using a JSON library that produces a bytestring.
+ * Set ``text=False`` to send a string (:class:`str`) in a Binary_
+ frame. This may be useful for servers that expect binary frames
+ instead of text frames.
+
+ :meth:`send` also accepts an iterable or an asynchronous iterable of
+ strings, bytestrings, or bytes-like objects to enable fragmentation_.
+ Each item is treated as a message fragment and sent in its own frame.
+ All items must be of the same type, or else :meth:`send` will raise a
+ :exc:`TypeError` and the connection will be closed.
+
+ .. _fragmentation: https://datatracker.ietf.org/doc/html/rfc6455#section-5.4
+
+ :meth:`send` rejects dict-like objects because this is often an error.
+ (If you really want to send the keys of a dict-like object as fragments,
+ call its :meth:`~dict.keys` method and pass the result to :meth:`send`.)
+
+ Canceling :meth:`send` is discouraged. Instead, you should close the
+ connection with :meth:`close`. Indeed, there are only two situations
+ where :meth:`send` may yield control to the event loop and then get
+ canceled; in both cases, :meth:`close` has the same effect and is
+ more clear:
+
+ 1. The write buffer is full. If you don't want to wait until enough
+ data is sent, your only alternative is to close the connection.
+ :meth:`close` will likely time out then abort the TCP connection.
+ 2. ``message`` is an asynchronous iterator that yields control.
+ Stopping in the middle of a fragmented message will cause a
+ protocol error and the connection will be closed.
+
+ When the connection is closed, :meth:`send` raises
+ :exc:`~websockets.exceptions.ConnectionClosed`. Specifically, it
+ raises :exc:`~websockets.exceptions.ConnectionClosedOK` after a normal
+ connection closure and
+ :exc:`~websockets.exceptions.ConnectionClosedError` after a protocol
+ error or a network failure.
+
+ Args:
+ message: Message to send.
+
+ Raises:
+ ConnectionClosed: When the connection is closed.
+ TypeError: If ``message`` doesn't have a supported type.
+
+ """
+ # While sending a fragmented message, prevent sending other messages
+ # until all fragments are sent.
+ while self.fragmented_send_waiter is not None:
+ await asyncio.shield(self.fragmented_send_waiter)
+
+ # Unfragmented message -- this case must be handled first because
+ # strings and bytes-like objects are iterable.
+
+ if isinstance(message, str):
+ async with self.send_context():
+ if text is False:
+ self.protocol.send_binary(message.encode())
+ else:
+ self.protocol.send_text(message.encode())
+
+ elif isinstance(message, BytesLike):
+ async with self.send_context():
+ if text is True:
+ self.protocol.send_text(message)
+ else:
+ self.protocol.send_binary(message)
+
+ # Catch a common mistake -- passing a dict to send().
+
+ elif isinstance(message, Mapping):
+ raise TypeError("data is a dict-like object")
+
+ # Fragmented message -- regular iterator.
+
+ elif isinstance(message, Iterable):
+ chunks = iter(message)
+ try:
+ chunk = next(chunks)
+ except StopIteration:
+ return
+
+ assert self.fragmented_send_waiter is None
+ self.fragmented_send_waiter = self.loop.create_future()
+ try:
+ # First fragment.
+ if isinstance(chunk, str):
+ async with self.send_context():
+ if text is False:
+ self.protocol.send_binary(chunk.encode(), fin=False)
+ else:
+ self.protocol.send_text(chunk.encode(), fin=False)
+ encode = True
+ elif isinstance(chunk, BytesLike):
+ async with self.send_context():
+ if text is True:
+ self.protocol.send_text(chunk, fin=False)
+ else:
+ self.protocol.send_binary(chunk, fin=False)
+ encode = False
+ else:
+ raise TypeError("iterable must contain bytes or str")
+
+ # Other fragments
+ for chunk in chunks:
+ if isinstance(chunk, str) and encode:
+ async with self.send_context():
+ self.protocol.send_continuation(chunk.encode(), fin=False)
+ elif isinstance(chunk, BytesLike) and not encode:
+ async with self.send_context():
+ self.protocol.send_continuation(chunk, fin=False)
+ else:
+ raise TypeError("iterable must contain uniform types")
+
+ # Final fragment.
+ async with self.send_context():
+ self.protocol.send_continuation(b"", fin=True)
+
+ except Exception:
+ # We're half-way through a fragmented message and we can't
+ # complete it. This makes the connection unusable.
+ async with self.send_context():
+ self.protocol.fail(
+ CloseCode.INTERNAL_ERROR,
+ "error in fragmented message",
+ )
+ raise
+
+ finally:
+ self.fragmented_send_waiter.set_result(None)
+ self.fragmented_send_waiter = None
+
+ # Fragmented message -- async iterator.
+
+ elif isinstance(message, AsyncIterable):
+ achunks = aiter(message)
+ try:
+ chunk = await anext(achunks)
+ except StopAsyncIteration:
+ return
+
+ assert self.fragmented_send_waiter is None
+ self.fragmented_send_waiter = self.loop.create_future()
+ try:
+ # First fragment.
+ if isinstance(chunk, str):
+ if text is False:
+ async with self.send_context():
+ self.protocol.send_binary(chunk.encode(), fin=False)
+ else:
+ async with self.send_context():
+ self.protocol.send_text(chunk.encode(), fin=False)
+ encode = True
+ elif isinstance(chunk, BytesLike):
+ if text is True:
+ async with self.send_context():
+ self.protocol.send_text(chunk, fin=False)
+ else:
+ async with self.send_context():
+ self.protocol.send_binary(chunk, fin=False)
+ encode = False
+ else:
+ raise TypeError("async iterable must contain bytes or str")
+
+ # Other fragments
+ async for chunk in achunks:
+ if isinstance(chunk, str) and encode:
+ async with self.send_context():
+ self.protocol.send_continuation(chunk.encode(), fin=False)
+ elif isinstance(chunk, BytesLike) and not encode:
+ async with self.send_context():
+ self.protocol.send_continuation(chunk, fin=False)
+ else:
+ raise TypeError("async iterable must contain uniform types")
+
+ # Final fragment.
+ async with self.send_context():
+ self.protocol.send_continuation(b"", fin=True)
+
+ except Exception:
+ # We're half-way through a fragmented message and we can't
+ # complete it. This makes the connection unusable.
+ async with self.send_context():
+ self.protocol.fail(
+ CloseCode.INTERNAL_ERROR,
+ "error in fragmented message",
+ )
+ raise
+
+ finally:
+ self.fragmented_send_waiter.set_result(None)
+ self.fragmented_send_waiter = None
+
+ else:
+ raise TypeError("data must be str, bytes, iterable, or async iterable")
+
+ async def close(self, code: int = 1000, reason: str = "") -> None:
+ """
+ Perform the closing handshake.
+
+ :meth:`close` waits for the other end to complete the handshake and
+ for the TCP connection to terminate.
+
+ :meth:`close` is idempotent: it doesn't do anything once the
+ connection is closed.
+
+ Args:
+ code: WebSocket close code.
+ reason: WebSocket close reason.
+
+ """
+ try:
+ # The context manager takes care of waiting for the TCP connection
+ # to terminate after calling a method that sends a close frame.
+ async with self.send_context():
+ if self.fragmented_send_waiter is not None:
+ self.protocol.fail(
+ CloseCode.INTERNAL_ERROR,
+ "close during fragmented message",
+ )
+ else:
+ self.protocol.send_close(code, reason)
+ except ConnectionClosed:
+ # Ignore ConnectionClosed exceptions raised from send_context().
+ # They mean that the connection is closed, which was the goal.
+ pass
+
+ async def wait_closed(self) -> None:
+ """
+ Wait until the connection is closed.
+
+ :meth:`wait_closed` waits for the closing handshake to complete and for
+ the TCP connection to terminate.
+
+ """
+ await asyncio.shield(self.connection_lost_waiter)
+
+ async def ping(self, data: Data | None = None) -> Awaitable[float]:
+ """
+ Send a Ping_.
+
+ .. _Ping: https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.2
+
+ A ping may serve as a keepalive or as a check that the remote endpoint
+ received all messages up to this point
+
+ Args:
+ data: Payload of the ping. A :class:`str` will be encoded to UTF-8.
+ If ``data`` is :obj:`None`, the payload is four random bytes.
+
+ Returns:
+ A future that will be completed when the corresponding pong is
+ received. You can ignore it if you don't intend to wait. The result
+ of the future is the latency of the connection in seconds.
+
+ ::
+
+ pong_waiter = await ws.ping()
+ # only if you want to wait for the corresponding pong
+ latency = await pong_waiter
+
+ Raises:
+ ConnectionClosed: When the connection is closed.
+ ConcurrencyError: If another ping was sent with the same data and
+ the corresponding pong wasn't received yet.
+
+ """
+ if isinstance(data, BytesLike):
+ data = bytes(data)
+ elif isinstance(data, str):
+ data = data.encode()
+ elif data is not None:
+ raise TypeError("data must be str or bytes-like")
+
+ async with self.send_context():
+ # Protect against duplicates if a payload is explicitly set.
+ if data in self.pong_waiters:
+ raise ConcurrencyError("already waiting for a pong with the same data")
+
+ # Generate a unique random payload otherwise.
+ while data is None or data in self.pong_waiters:
+ data = struct.pack("!I", random.getrandbits(32))
+
+ pong_waiter = self.loop.create_future()
+ # The event loop's default clock is time.monotonic(). Its resolution
+ # is a bit low on Windows (~16ms). This is improved in Python 3.13.
+ ping_timestamp = self.loop.time()
+ self.pong_waiters[data] = (pong_waiter, ping_timestamp)
+ self.protocol.send_ping(data)
+ return pong_waiter
+
+ async def pong(self, data: Data = b"") -> None:
+ """
+ Send a Pong_.
+
+ .. _Pong: https://datatracker.ietf.org/doc/html/rfc6455#section-5.5.3
+
+ An unsolicited pong may serve as a unidirectional heartbeat.
+
+ Args:
+ data: Payload of the pong. A :class:`str` will be encoded to UTF-8.
+
+ Raises:
+ ConnectionClosed: When the connection is closed.
+
+ """
+ if isinstance(data, BytesLike):
+ data = bytes(data)
+ elif isinstance(data, str):
+ data = data.encode()
+ else:
+ raise TypeError("data must be str or bytes-like")
+
+ async with self.send_context():
+ self.protocol.send_pong(data)
+
+ # Private methods
+
+ def process_event(self, event: Event) -> None:
+ """
+ Process one incoming event.
+
+ This method is overridden in subclasses to handle the handshake.
+
+ """
+ assert isinstance(event, Frame)
+ if event.opcode in DATA_OPCODES:
+ self.recv_messages.put(event)
+
+ if event.opcode is Opcode.PONG:
+ self.acknowledge_pings(bytes(event.data))
+
+ def acknowledge_pings(self, data: bytes) -> None:
+ """
+ Acknowledge pings when receiving a pong.
+
+ """
+ # Ignore unsolicited pong.
+ if data not in self.pong_waiters:
+ return
+
+ pong_timestamp = self.loop.time()
+
+ # Sending a pong for only the most recent ping is legal.
+ # Acknowledge all previous pings too in that case.
+ ping_id = None
+ ping_ids = []
+ for ping_id, (pong_waiter, ping_timestamp) in self.pong_waiters.items():
+ ping_ids.append(ping_id)
+ latency = pong_timestamp - ping_timestamp
+ if not pong_waiter.done():
+ pong_waiter.set_result(latency)
+ if ping_id == data:
+ self.latency = latency
+ break
+ else:
+ raise AssertionError("solicited pong not found in pings")
+
+ # Remove acknowledged pings from self.pong_waiters.
+ for ping_id in ping_ids:
+ del self.pong_waiters[ping_id]
+
+ def abort_pings(self) -> None:
+ """
+ Raise ConnectionClosed in pending pings.
+
+ They'll never receive a pong once the connection is closed.
+
+ """
+ assert self.protocol.state is CLOSED
+ exc = self.protocol.close_exc
+
+ for pong_waiter, _ping_timestamp in self.pong_waiters.values():
+ if not pong_waiter.done():
+ pong_waiter.set_exception(exc)
+ # If the exception is never retrieved, it will be logged when ping
+ # is garbage-collected. This is confusing for users.
+ # Given that ping is done (with an exception), canceling it does
+ # nothing, but it prevents logging the exception.
+ pong_waiter.cancel()
+
+ self.pong_waiters.clear()
+
+ async def keepalive(self) -> None:
+ """
+ Send a Ping frame and wait for a Pong frame at regular intervals.
+
+ """
+ assert self.ping_interval is not None
+ latency = 0.0
+ try:
+ while True:
+ # If self.ping_timeout > latency > self.ping_interval, pings
+ # will be sent immediately after receiving pongs. The period
+ # will be longer than self.ping_interval.
+ await asyncio.sleep(self.ping_interval - latency)
+
+ self.logger.debug("% sending keepalive ping")
+ pong_waiter = await self.ping()
+
+ if self.ping_timeout is not None:
+ try:
+ async with asyncio_timeout(self.ping_timeout):
+ # connection_lost cancels keepalive immediately
+ # after setting a ConnectionClosed exception on
+ # pong_waiter. A CancelledError is raised here,
+ # not a ConnectionClosed exception.
+ latency = await pong_waiter
+ self.logger.debug("% received keepalive pong")
+ except asyncio.TimeoutError:
+ if self.debug:
+ self.logger.debug("- timed out waiting for keepalive pong")
+ async with self.send_context():
+ self.protocol.fail(
+ CloseCode.INTERNAL_ERROR,
+ "keepalive ping timeout",
+ )
+ raise AssertionError(
+ "send_context() should wait for connection_lost(), "
+ "which cancels keepalive()"
+ )
+ except Exception:
+ self.logger.error("keepalive ping failed", exc_info=True)
+
+ def start_keepalive(self) -> None:
+ """
+ Run :meth:`keepalive` in a task, unless keepalive is disabled.
+
+ """
+ if self.ping_interval is not None:
+ self.keepalive_task = self.loop.create_task(self.keepalive())
+
+ @contextlib.asynccontextmanager
+ async def send_context(
+ self,
+ *,
+ expected_state: State = OPEN, # CONNECTING during the opening handshake
+ ) -> AsyncIterator[None]:
+ """
+ Create a context for writing to the connection from user code.
+
+ On entry, :meth:`send_context` checks that the connection is open; on
+ exit, it writes outgoing data to the socket::
+
+ async with self.send_context():
+ self.protocol.send_text(message.encode())
+
+ When the connection isn't open on entry, when the connection is expected
+ to close on exit, or when an unexpected error happens, terminating the
+ connection, :meth:`send_context` waits until the connection is closed
+ then raises :exc:`~websockets.exceptions.ConnectionClosed`.
+
+ """
+ # Should we wait until the connection is closed?
+ wait_for_close = False
+ # Should we close the transport and raise ConnectionClosed?
+ raise_close_exc = False
+ # What exception should we chain ConnectionClosed to?
+ original_exc: BaseException | None = None
+
+ if self.protocol.state is expected_state:
+ # Let the caller interact with the protocol.
+ try:
+ yield
+ except (ProtocolError, ConcurrencyError):
+ # The protocol state wasn't changed. Exit immediately.
+ raise
+ except Exception as exc:
+ self.logger.error("unexpected internal error", exc_info=True)
+ # This branch should never run. It's a safety net in case of
+ # bugs. Since we don't know what happened, we will close the
+ # connection and raise the exception to the caller.
+ wait_for_close = False
+ raise_close_exc = True
+ original_exc = exc
+ else:
+ # Check if the connection is expected to close soon.
+ if self.protocol.close_expected():
+ wait_for_close = True
+ # If the connection is expected to close soon, set the
+ # close deadline based on the close timeout.
+ # Since we tested earlier that protocol.state was OPEN
+ # (or CONNECTING), self.close_deadline is still None.
+ if self.close_timeout is not None:
+ assert self.close_deadline is None
+ self.close_deadline = self.loop.time() + self.close_timeout
+ # Write outgoing data to the socket and enforce flow control.
+ try:
+ self.send_data()
+ await self.drain()
+ except Exception as exc:
+ if self.debug:
+ self.logger.debug("! error while sending data", exc_info=True)
+ # While the only expected exception here is OSError,
+ # other exceptions would be treated identically.
+ wait_for_close = False
+ raise_close_exc = True
+ original_exc = exc
+
+ else: # self.protocol.state is not expected_state
+ # Minor layering violation: we assume that the connection
+ # will be closing soon if it isn't in the expected state.
+ wait_for_close = True
+ # Calculate close_deadline if it wasn't set yet.
+ if self.close_timeout is not None:
+ if self.close_deadline is None:
+ self.close_deadline = self.loop.time() + self.close_timeout
+ raise_close_exc = True
+
+ # If the connection is expected to close soon and the close timeout
+ # elapses, close the socket to terminate the connection.
+ if wait_for_close:
+ try:
+ async with asyncio_timeout_at(self.close_deadline):
+ await asyncio.shield(self.connection_lost_waiter)
+ except TimeoutError:
+ # There's no risk to overwrite another error because
+ # original_exc is never set when wait_for_close is True.
+ assert original_exc is None
+ original_exc = TimeoutError("timed out while closing connection")
+ # Set recv_exc before closing the transport in order to get
+ # proper exception reporting.
+ raise_close_exc = True
+ self.set_recv_exc(original_exc)
+
+ # If an error occurred, close the transport to terminate the connection and
+ # raise an exception.
+ if raise_close_exc:
+ self.transport.abort()
+ # Wait for the protocol state to be CLOSED before accessing close_exc.
+ await asyncio.shield(self.connection_lost_waiter)
+ raise self.protocol.close_exc from original_exc
+
+ def send_data(self) -> None:
+ """
+ Send outgoing data.
+
+ Raises:
+ OSError: When a socket operations fails.
+
+ """
+ for data in self.protocol.data_to_send():
+ if data:
+ self.transport.write(data)
+ else:
+ # Half-close the TCP connection when possible i.e. no TLS.
+ if self.transport.can_write_eof():
+ if self.debug:
+ self.logger.debug("x half-closing TCP connection")
+ # write_eof() doesn't document which exceptions it raises.
+ # OSError is plausible. uvloop can raise RuntimeError here.
+ try:
+ self.transport.write_eof()
+ except (OSError, RuntimeError): # pragma: no cover
+ pass
+ # Else, close the TCP connection.
+ else: # pragma: no cover
+ if self.debug:
+ self.logger.debug("x closing TCP connection")
+ self.transport.close()
+
+ def set_recv_exc(self, exc: BaseException | None) -> None:
+ """
+ Set recv_exc, if not set yet.
+
+ """
+ if self.recv_exc is None:
+ self.recv_exc = exc
+
+ # asyncio.Protocol methods
+
+ # Connection callbacks
+
+ def connection_made(self, transport: asyncio.BaseTransport) -> None:
+ transport = cast(asyncio.Transport, transport)
+ self.recv_messages = Assembler(
+ *self.max_queue,
+ pause=transport.pause_reading,
+ resume=transport.resume_reading,
+ )
+ transport.set_write_buffer_limits(*self.write_limit)
+ self.transport = transport
+
+ def connection_lost(self, exc: Exception | None) -> None:
+ # Calling protocol.receive_eof() is safe because it's idempotent.
+ # This guarantees that the protocol state becomes CLOSED.
+ self.protocol.receive_eof()
+ assert self.protocol.state is CLOSED
+
+ self.set_recv_exc(exc)
+
+ # Abort recv() and pending pings with a ConnectionClosed exception.
+ self.recv_messages.close()
+ self.abort_pings()
+
+ if self.keepalive_task is not None:
+ self.keepalive_task.cancel()
+
+ # If self.connection_lost_waiter isn't pending, that's a bug, because:
+ # - it's set only here in connection_lost() which is called only once;
+ # - it must never be canceled.
+ self.connection_lost_waiter.set_result(None)
+
+ # Adapted from asyncio.streams.FlowControlMixin
+ if self.paused: # pragma: no cover
+ self.paused = False
+ for waiter in self.drain_waiters:
+ if not waiter.done():
+ if exc is None:
+ waiter.set_result(None)
+ else:
+ waiter.set_exception(exc)
+
+ # Flow control callbacks
+
+ def pause_writing(self) -> None: # pragma: no cover
+ # Adapted from asyncio.streams.FlowControlMixin
+ assert not self.paused
+ self.paused = True
+
+ def resume_writing(self) -> None: # pragma: no cover
+ # Adapted from asyncio.streams.FlowControlMixin
+ assert self.paused
+ self.paused = False
+ for waiter in self.drain_waiters:
+ if not waiter.done():
+ waiter.set_result(None)
+
+ async def drain(self) -> None: # pragma: no cover
+ # We don't check if the connection is closed because we call drain()
+ # immediately after write() and write() would fail in that case.
+
+ # Adapted from asyncio.streams.StreamWriter
+ # Yield to the event loop so that connection_lost() may be called.
+ if self.transport.is_closing():
+ await asyncio.sleep(0)
+
+ # Adapted from asyncio.streams.FlowControlMixin
+ if self.paused:
+ waiter = self.loop.create_future()
+ self.drain_waiters.append(waiter)
+ try:
+ await waiter
+ finally:
+ self.drain_waiters.remove(waiter)
+
+ # Streaming protocol callbacks
+
+ def data_received(self, data: bytes) -> None:
+ # Feed incoming data to the protocol.
+ self.protocol.receive_data(data)
+
+ # This isn't expected to raise an exception.
+ events = self.protocol.events_received()
+
+ # Write outgoing data to the transport.
+ try:
+ self.send_data()
+ except Exception as exc:
+ if self.debug:
+ self.logger.debug("! error while sending data", exc_info=True)
+ self.set_recv_exc(exc)
+
+ if self.protocol.close_expected():
+ # If the connection is expected to close soon, set the
+ # close deadline based on the close timeout.
+ if self.close_timeout is not None:
+ if self.close_deadline is None:
+ self.close_deadline = self.loop.time() + self.close_timeout
+
+ for event in events:
+ # This isn't expected to raise an exception.
+ self.process_event(event)
+
+ def eof_received(self) -> None:
+ # Feed the end of the data stream to the connection.
+ self.protocol.receive_eof()
+
+ # This isn't expected to raise an exception.
+ events = self.protocol.events_received()
+
+ # There is no error handling because send_data() can only write
+ # the end of the data stream here and it shouldn't raise errors.
+ self.send_data()
+
+ # This code path is triggered when receiving an HTTP response
+ # without a Content-Length header. This is the only case where
+ # reading until EOF generates an event; all other events have
+ # a known length. Ignore for coverage measurement because tests
+ # are in test_client.py rather than test_connection.py.
+ for event in events: # pragma: no cover
+ # This isn't expected to raise an exception.
+ self.process_event(event)
+
+ # The WebSocket protocol has its own closing handshake: endpoints close
+ # the TCP or TLS connection after sending and receiving a close frame.
+ # As a consequence, they never need to write after receiving EOF, so
+ # there's no reason to keep the transport open by returning True.
+ # Besides, that doesn't work on TLS connections.
+
+
+# broadcast() is defined in the connection module even though it's primarily
+# used by servers and documented in the server module because it works with
+# client connections too and because it's easier to test together with the
+# Connection class.
+
+
+def broadcast(
+ connections: Iterable[Connection],
+ message: Data,
+ raise_exceptions: bool = False,
+) -> None:
+ """
+ Broadcast a message to several WebSocket connections.
+
+ A string (:class:`str`) is sent as a Text_ frame. A bytestring or bytes-like
+ object (:class:`bytes`, :class:`bytearray`, or :class:`memoryview`) is sent
+ as a Binary_ frame.
+
+ .. _Text: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
+ .. _Binary: https://datatracker.ietf.org/doc/html/rfc6455#section-5.6
+
+ :func:`broadcast` pushes the message synchronously to all connections even
+ if their write buffers are overflowing. There's no backpressure.
+
+ If you broadcast messages faster than a connection can handle them, messages
+ will pile up in its write buffer until the connection times out. Keep
+ ``ping_interval`` and ``ping_timeout`` low to prevent excessive memory usage
+ from slow connections.
+
+ Unlike :meth:`~websockets.asyncio.connection.Connection.send`,
+ :func:`broadcast` doesn't support sending fragmented messages. Indeed,
+ fragmentation is useful for sending large messages without buffering them in
+ memory, while :func:`broadcast` buffers one copy per connection as fast as
+ possible.
+
+ :func:`broadcast` skips connections that aren't open in order to avoid
+ errors on connections where the closing handshake is in progress.
+
+ :func:`broadcast` ignores failures to write the message on some connections.
+ It continues writing to other connections. On Python 3.11 and above, you may
+ set ``raise_exceptions`` to :obj:`True` to record failures and raise all
+ exceptions in a :pep:`654` :exc:`ExceptionGroup`.
+
+ While :func:`broadcast` makes more sense for servers, it works identically
+ with clients, if you have a use case for opening connections to many servers
+ and broadcasting a message to them.
+
+ Args:
+ websockets: WebSocket connections to which the message will be sent.
+ message: Message to send.
+ raise_exceptions: Whether to raise an exception in case of failures.
+
+ Raises:
+ TypeError: If ``message`` doesn't have a supported type.
+
+ """
+ if isinstance(message, str):
+ send_method = "send_text"
+ message = message.encode()
+ elif isinstance(message, BytesLike):
+ send_method = "send_binary"
+ else:
+ raise TypeError("data must be str or bytes")
+
+ if raise_exceptions:
+ if sys.version_info[:2] < (3, 11): # pragma: no cover
+ raise ValueError("raise_exceptions requires at least Python 3.11")
+ exceptions: list[Exception] = []
+
+ for connection in connections:
+ exception: Exception
+
+ if connection.protocol.state is not OPEN:
+ continue
+
+ if connection.fragmented_send_waiter is not None:
+ if raise_exceptions:
+ exception = ConcurrencyError("sending a fragmented message")
+ exceptions.append(exception)
+ else:
+ connection.logger.warning(
+ "skipped broadcast: sending a fragmented message",
+ )
+ continue
+
+ try:
+ # Call connection.protocol.send_text or send_binary.
+ # Either way, message is already converted to bytes.
+ getattr(connection.protocol, send_method)(message)
+ connection.send_data()
+ except Exception as write_exception:
+ if raise_exceptions:
+ exception = RuntimeError("failed to write message")
+ exception.__cause__ = write_exception
+ exceptions.append(exception)
+ else:
+ connection.logger.warning(
+ "skipped broadcast: failed to write message: %s",
+ traceback.format_exception_only(
+ # Remove first argument when dropping Python 3.9.
+ type(write_exception),
+ write_exception,
+ )[0].strip(),
+ )
+
+ if raise_exceptions and exceptions:
+ raise ExceptionGroup("skipped broadcast", exceptions)
+
+
+# Pretend that broadcast is actually defined in the server module.
+broadcast.__module__ = "websockets.asyncio.server"
diff --git a/.venv/lib/python3.12/site-packages/websockets/asyncio/messages.py b/.venv/lib/python3.12/site-packages/websockets/asyncio/messages.py
new file mode 100644
index 00000000..c1007246
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/websockets/asyncio/messages.py
@@ -0,0 +1,296 @@
+from __future__ import annotations
+
+import asyncio
+import codecs
+import collections
+from collections.abc import AsyncIterator, Iterable
+from typing import Any, Callable, Generic, TypeVar
+
+from ..exceptions import ConcurrencyError
+from ..frames import OP_BINARY, OP_CONT, OP_TEXT, Frame
+from ..typing import Data
+
+
+__all__ = ["Assembler"]
+
+UTF8Decoder = codecs.getincrementaldecoder("utf-8")
+
+T = TypeVar("T")
+
+
+class SimpleQueue(Generic[T]):
+ """
+ Simplified version of :class:`asyncio.Queue`.
+
+ Provides only the subset of functionality needed by :class:`Assembler`.
+
+ """
+
+ def __init__(self) -> None:
+ self.loop = asyncio.get_running_loop()
+ self.get_waiter: asyncio.Future[None] | None = None
+ self.queue: collections.deque[T] = collections.deque()
+
+ def __len__(self) -> int:
+ return len(self.queue)
+
+ def put(self, item: T) -> None:
+ """Put an item into the queue without waiting."""
+ self.queue.append(item)
+ if self.get_waiter is not None and not self.get_waiter.done():
+ self.get_waiter.set_result(None)
+
+ async def get(self, block: bool = True) -> T:
+ """Remove and return an item from the queue, waiting if necessary."""
+ if not self.queue:
+ if not block:
+ raise EOFError("stream of frames ended")
+ assert self.get_waiter is None, "cannot call get() concurrently"
+ self.get_waiter = self.loop.create_future()
+ try:
+ await self.get_waiter
+ finally:
+ self.get_waiter.cancel()
+ self.get_waiter = None
+ return self.queue.popleft()
+
+ def reset(self, items: Iterable[T]) -> None:
+ """Put back items into an empty, idle queue."""
+ assert self.get_waiter is None, "cannot reset() while get() is running"
+ assert not self.queue, "cannot reset() while queue isn't empty"
+ self.queue.extend(items)
+
+ def abort(self) -> None:
+ """Close the queue, raising EOFError in get() if necessary."""
+ if self.get_waiter is not None and not self.get_waiter.done():
+ self.get_waiter.set_exception(EOFError("stream of frames ended"))
+
+
+class Assembler:
+ """
+ Assemble messages from frames.
+
+ :class:`Assembler` expects only data frames. The stream of frames must
+ respect the protocol; if it doesn't, the behavior is undefined.
+
+ Args:
+ pause: Called when the buffer of frames goes above the high water mark;
+ should pause reading from the network.
+ resume: Called when the buffer of frames goes below the low water mark;
+ should resume reading from the network.
+
+ """
+
+ # coverage reports incorrectly: "line NN didn't jump to the function exit"
+ def __init__( # pragma: no cover
+ self,
+ high: int | None = None,
+ low: int | None = None,
+ pause: Callable[[], Any] = lambda: None,
+ resume: Callable[[], Any] = lambda: None,
+ ) -> None:
+ # Queue of incoming frames.
+ self.frames: SimpleQueue[Frame] = SimpleQueue()
+
+ # We cannot put a hard limit on the size of the queue because a single
+ # call to Protocol.data_received() could produce thousands of frames,
+ # which must be buffered. Instead, we pause reading when the buffer goes
+ # above the high limit and we resume when it goes under the low limit.
+ if high is not None and low is None:
+ low = high // 4
+ if high is None and low is not None:
+ high = low * 4
+ if high is not None and low is not None:
+ if low < 0:
+ raise ValueError("low must be positive or equal to zero")
+ if high < low:
+ raise ValueError("high must be greater than or equal to low")
+ self.high, self.low = high, low
+ self.pause = pause
+ self.resume = resume
+ self.paused = False
+
+ # This flag prevents concurrent calls to get() by user code.
+ self.get_in_progress = False
+
+ # This flag marks the end of the connection.
+ self.closed = False
+
+ async def get(self, decode: bool | None = None) -> Data:
+ """
+ Read the next message.
+
+ :meth:`get` returns a single :class:`str` or :class:`bytes`.
+
+ If the message is fragmented, :meth:`get` waits until the last frame is
+ received, then it reassembles the message and returns it. To receive
+ messages frame by frame, use :meth:`get_iter` instead.
+
+ Args:
+ decode: :obj:`False` disables UTF-8 decoding of text frames and
+ returns :class:`bytes`. :obj:`True` forces UTF-8 decoding of
+ binary frames and returns :class:`str`.
+
+ Raises:
+ EOFError: If the stream of frames has ended.
+ UnicodeDecodeError: If a text frame contains invalid UTF-8.
+ ConcurrencyError: If two coroutines run :meth:`get` or
+ :meth:`get_iter` concurrently.
+
+ """
+ if self.get_in_progress:
+ raise ConcurrencyError("get() or get_iter() is already running")
+ self.get_in_progress = True
+
+ # Locking with get_in_progress prevents concurrent execution
+ # until get() fetches a complete message or is cancelled.
+
+ try:
+ # First frame
+ frame = await self.frames.get(not self.closed)
+ self.maybe_resume()
+ assert frame.opcode is OP_TEXT or frame.opcode is OP_BINARY
+ if decode is None:
+ decode = frame.opcode is OP_TEXT
+ frames = [frame]
+
+ # Following frames, for fragmented messages
+ while not frame.fin:
+ try:
+ frame = await self.frames.get(not self.closed)
+ except asyncio.CancelledError:
+ # Put frames already received back into the queue
+ # so that future calls to get() can return them.
+ self.frames.reset(frames)
+ raise
+ self.maybe_resume()
+ assert frame.opcode is OP_CONT
+ frames.append(frame)
+
+ finally:
+ self.get_in_progress = False
+
+ data = b"".join(frame.data for frame in frames)
+ if decode:
+ return data.decode()
+ else:
+ return data
+
+ async def get_iter(self, decode: bool | None = None) -> AsyncIterator[Data]:
+ """
+ Stream the next message.
+
+ Iterating the return value of :meth:`get_iter` asynchronously yields a
+ :class:`str` or :class:`bytes` for each frame in the message.
+
+ The iterator must be fully consumed before calling :meth:`get_iter` or
+ :meth:`get` again. Else, :exc:`ConcurrencyError` is raised.
+
+ This method only makes sense for fragmented messages. If messages aren't
+ fragmented, use :meth:`get` instead.
+
+ Args:
+ decode: :obj:`False` disables UTF-8 decoding of text frames and
+ returns :class:`bytes`. :obj:`True` forces UTF-8 decoding of
+ binary frames and returns :class:`str`.
+
+ Raises:
+ EOFError: If the stream of frames has ended.
+ UnicodeDecodeError: If a text frame contains invalid UTF-8.
+ ConcurrencyError: If two coroutines run :meth:`get` or
+ :meth:`get_iter` concurrently.
+
+ """
+ if self.get_in_progress:
+ raise ConcurrencyError("get() or get_iter() is already running")
+ self.get_in_progress = True
+
+ # Locking with get_in_progress prevents concurrent execution
+ # until get_iter() fetches a complete message or is cancelled.
+
+ # If get_iter() raises an exception e.g. in decoder.decode(),
+ # get_in_progress remains set and the connection becomes unusable.
+
+ # First frame
+ try:
+ frame = await self.frames.get(not self.closed)
+ except asyncio.CancelledError:
+ self.get_in_progress = False
+ raise
+ self.maybe_resume()
+ assert frame.opcode is OP_TEXT or frame.opcode is OP_BINARY
+ if decode is None:
+ decode = frame.opcode is OP_TEXT
+ if decode:
+ decoder = UTF8Decoder()
+ yield decoder.decode(frame.data, frame.fin)
+ else:
+ yield frame.data
+
+ # Following frames, for fragmented messages
+ while not frame.fin:
+ # We cannot handle asyncio.CancelledError because we don't buffer
+ # previous fragments — we're streaming them. Canceling get_iter()
+ # here will leave the assembler in a stuck state. Future calls to
+ # get() or get_iter() will raise ConcurrencyError.
+ frame = await self.frames.get(not self.closed)
+ self.maybe_resume()
+ assert frame.opcode is OP_CONT
+ if decode:
+ yield decoder.decode(frame.data, frame.fin)
+ else:
+ yield frame.data
+
+ self.get_in_progress = False
+
+ def put(self, frame: Frame) -> None:
+ """
+ Add ``frame`` to the next message.
+
+ Raises:
+ EOFError: If the stream of frames has ended.
+
+ """
+ if self.closed:
+ raise EOFError("stream of frames ended")
+
+ self.frames.put(frame)
+ self.maybe_pause()
+
+ def maybe_pause(self) -> None:
+ """Pause the writer if queue is above the high water mark."""
+ # Skip if flow control is disabled
+ if self.high is None:
+ return
+
+ # Check for "> high" to support high = 0
+ if len(self.frames) > self.high and not self.paused:
+ self.paused = True
+ self.pause()
+
+ def maybe_resume(self) -> None:
+ """Resume the writer if queue is below the low water mark."""
+ # Skip if flow control is disabled
+ if self.low is None:
+ return
+
+ # Check for "<= low" to support low = 0
+ if len(self.frames) <= self.low and self.paused:
+ self.paused = False
+ self.resume()
+
+ def close(self) -> None:
+ """
+ End the stream of frames.
+
+ Calling :meth:`close` concurrently with :meth:`get`, :meth:`get_iter`,
+ or :meth:`put` is safe. They will raise :exc:`EOFError`.
+
+ """
+ if self.closed:
+ return
+
+ self.closed = True
+
+ # Unblock get() or get_iter().
+ self.frames.abort()
diff --git a/.venv/lib/python3.12/site-packages/websockets/asyncio/server.py b/.venv/lib/python3.12/site-packages/websockets/asyncio/server.py
new file mode 100644
index 00000000..ebe45c2a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/websockets/asyncio/server.py
@@ -0,0 +1,978 @@
+from __future__ import annotations
+
+import asyncio
+import hmac
+import http
+import logging
+import re
+import socket
+import sys
+from collections.abc import Awaitable, Generator, Iterable, Sequence
+from types import TracebackType
+from typing import Any, Callable, cast
+
+from ..exceptions import InvalidHeader
+from ..extensions.base import ServerExtensionFactory
+from ..extensions.permessage_deflate import enable_server_permessage_deflate
+from ..frames import CloseCode
+from ..headers import (
+ build_www_authenticate_basic,
+ parse_authorization_basic,
+ validate_subprotocols,
+)
+from ..http11 import SERVER, Request, Response
+from ..protocol import CONNECTING, OPEN, Event
+from ..server import ServerProtocol
+from ..typing import LoggerLike, Origin, StatusLike, Subprotocol
+from .compatibility import asyncio_timeout
+from .connection import Connection, broadcast
+
+
+__all__ = [
+ "broadcast",
+ "serve",
+ "unix_serve",
+ "ServerConnection",
+ "Server",
+ "basic_auth",
+]
+
+
+class ServerConnection(Connection):
+ """
+ :mod:`asyncio` implementation of a WebSocket server connection.
+
+ :class:`ServerConnection` provides :meth:`recv` and :meth:`send` methods for
+ receiving and sending messages.
+
+ It supports asynchronous iteration to receive messages::
+
+ async for message in websocket:
+ await process(message)
+
+ The iterator exits normally when the connection is closed with close code
+ 1000 (OK) or 1001 (going away) or without a close code. It raises a
+ :exc:`~websockets.exceptions.ConnectionClosedError` when the connection is
+ closed with any other code.
+
+ The ``ping_interval``, ``ping_timeout``, ``close_timeout``, ``max_queue``,
+ and ``write_limit`` arguments have the same meaning as in :func:`serve`.
+
+ Args:
+ protocol: Sans-I/O connection.
+ server: Server that manages this connection.
+
+ """
+
+ def __init__(
+ self,
+ protocol: ServerProtocol,
+ server: Server,
+ *,
+ ping_interval: float | None = 20,
+ ping_timeout: float | None = 20,
+ close_timeout: float | None = 10,
+ max_queue: int | None | tuple[int | None, int | None] = 16,
+ write_limit: int | tuple[int, int | None] = 2**15,
+ ) -> None:
+ self.protocol: ServerProtocol
+ super().__init__(
+ protocol,
+ ping_interval=ping_interval,
+ ping_timeout=ping_timeout,
+ close_timeout=close_timeout,
+ max_queue=max_queue,
+ write_limit=write_limit,
+ )
+ self.server = server
+ self.request_rcvd: asyncio.Future[None] = self.loop.create_future()
+ self.username: str # see basic_auth()
+
+ def respond(self, status: StatusLike, text: str) -> Response:
+ """
+ Create a plain text HTTP response.
+
+ ``process_request`` and ``process_response`` may call this method to
+ return an HTTP response instead of performing the WebSocket opening
+ handshake.
+
+ You can modify the response before returning it, for example by changing
+ HTTP headers.
+
+ Args:
+ status: HTTP status code.
+ text: HTTP response body; it will be encoded to UTF-8.
+
+ Returns:
+ HTTP response to send to the client.
+
+ """
+ return self.protocol.reject(status, text)
+
+ async def handshake(
+ self,
+ process_request: (
+ Callable[
+ [ServerConnection, Request],
+ Awaitable[Response | None] | Response | None,
+ ]
+ | None
+ ) = None,
+ process_response: (
+ Callable[
+ [ServerConnection, Request, Response],
+ Awaitable[Response | None] | Response | None,
+ ]
+ | None
+ ) = None,
+ server_header: str | None = SERVER,
+ ) -> None:
+ """
+ Perform the opening handshake.
+
+ """
+ await asyncio.wait(
+ [self.request_rcvd, self.connection_lost_waiter],
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if self.request is not None:
+ async with self.send_context(expected_state=CONNECTING):
+ response = None
+
+ if process_request is not None:
+ try:
+ response = process_request(self, self.request)
+ if isinstance(response, Awaitable):
+ response = await response
+ except Exception as exc:
+ self.protocol.handshake_exc = exc
+ response = self.protocol.reject(
+ http.HTTPStatus.INTERNAL_SERVER_ERROR,
+ (
+ "Failed to open a WebSocket connection.\n"
+ "See server log for more information.\n"
+ ),
+ )
+
+ if response is None:
+ if self.server.is_serving():
+ self.response = self.protocol.accept(self.request)
+ else:
+ self.response = self.protocol.reject(
+ http.HTTPStatus.SERVICE_UNAVAILABLE,
+ "Server is shutting down.\n",
+ )
+ else:
+ assert isinstance(response, Response) # help mypy
+ self.response = response
+
+ if server_header:
+ self.response.headers["Server"] = server_header
+
+ response = None
+
+ if process_response is not None:
+ try:
+ response = process_response(self, self.request, self.response)
+ if isinstance(response, Awaitable):
+ response = await response
+ except Exception as exc:
+ self.protocol.handshake_exc = exc
+ response = self.protocol.reject(
+ http.HTTPStatus.INTERNAL_SERVER_ERROR,
+ (
+ "Failed to open a WebSocket connection.\n"
+ "See server log for more information.\n"
+ ),
+ )
+
+ if response is not None:
+ assert isinstance(response, Response) # help mypy
+ self.response = response
+
+ self.protocol.send_response(self.response)
+
+ # self.protocol.handshake_exc is set when the connection is lost before
+ # receiving a request, when the request cannot be parsed, or when the
+ # handshake fails, including when process_request or process_response
+ # raises an exception.
+
+ # It isn't set when process_request or process_response sends an HTTP
+ # response that rejects the handshake.
+
+ if self.protocol.handshake_exc is not None:
+ raise self.protocol.handshake_exc
+
+ def process_event(self, event: Event) -> None:
+ """
+ Process one incoming event.
+
+ """
+ # First event - handshake request.
+ if self.request is None:
+ assert isinstance(event, Request)
+ self.request = event
+ self.request_rcvd.set_result(None)
+ # Later events - frames.
+ else:
+ super().process_event(event)
+
+ def connection_made(self, transport: asyncio.BaseTransport) -> None:
+ super().connection_made(transport)
+ self.server.start_connection_handler(self)
+
+
+class Server:
+ """
+ WebSocket server returned by :func:`serve`.
+
+ This class mirrors the API of :class:`asyncio.Server`.
+
+ It keeps track of WebSocket connections in order to close them properly
+ when shutting down.
+
+ Args:
+ handler: Connection handler. It receives the WebSocket connection,
+ which is a :class:`ServerConnection`, in argument.
+ process_request: Intercept the request during the opening handshake.
+ Return an HTTP response to force the response. Return :obj:`None` to
+ continue normally. When you force an HTTP 101 Continue response, the
+ handshake is successful. Else, the connection is aborted.
+ ``process_request`` may be a function or a coroutine.
+ process_response: Intercept the response during the opening handshake.
+ Modify the response or return a new HTTP response to force the
+ response. Return :obj:`None` to continue normally. When you force an
+ HTTP 101 Continue response, the handshake is successful. Else, the
+ connection is aborted. ``process_response`` may be a function or a
+ coroutine.
+ server_header: Value of the ``Server`` response header.
+ It defaults to ``"Python/x.y.z websockets/X.Y"``. Setting it to
+ :obj:`None` removes the header.
+ open_timeout: Timeout for opening connections in seconds.
+ :obj:`None` disables the timeout.
+ logger: Logger for this server.
+ It defaults to ``logging.getLogger("websockets.server")``.
+ See the :doc:`logging guide <../../topics/logging>` for details.
+
+ """
+
+ def __init__(
+ self,
+ handler: Callable[[ServerConnection], Awaitable[None]],
+ *,
+ process_request: (
+ Callable[
+ [ServerConnection, Request],
+ Awaitable[Response | None] | Response | None,
+ ]
+ | None
+ ) = None,
+ process_response: (
+ Callable[
+ [ServerConnection, Request, Response],
+ Awaitable[Response | None] | Response | None,
+ ]
+ | None
+ ) = None,
+ server_header: str | None = SERVER,
+ open_timeout: float | None = 10,
+ logger: LoggerLike | None = None,
+ ) -> None:
+ self.loop = asyncio.get_running_loop()
+ self.handler = handler
+ self.process_request = process_request
+ self.process_response = process_response
+ self.server_header = server_header
+ self.open_timeout = open_timeout
+ if logger is None:
+ logger = logging.getLogger("websockets.server")
+ self.logger = logger
+
+ # Keep track of active connections.
+ self.handlers: dict[ServerConnection, asyncio.Task[None]] = {}
+
+ # Task responsible for closing the server and terminating connections.
+ self.close_task: asyncio.Task[None] | None = None
+
+ # Completed when the server is closed and connections are terminated.
+ self.closed_waiter: asyncio.Future[None] = self.loop.create_future()
+
+ @property
+ def connections(self) -> set[ServerConnection]:
+ """
+ Set of active connections.
+
+ This property contains all connections that completed the opening
+ handshake successfully and didn't start the closing handshake yet.
+ It can be useful in combination with :func:`~broadcast`.
+
+ """
+ return {connection for connection in self.handlers if connection.state is OPEN}
+
+ def wrap(self, server: asyncio.Server) -> None:
+ """
+ Attach to a given :class:`asyncio.Server`.
+
+ Since :meth:`~asyncio.loop.create_server` doesn't support injecting a
+ custom ``Server`` class, the easiest solution that doesn't rely on
+ private :mod:`asyncio` APIs is to:
+
+ - instantiate a :class:`Server`
+ - give the protocol factory a reference to that instance
+ - call :meth:`~asyncio.loop.create_server` with the factory
+ - attach the resulting :class:`asyncio.Server` with this method
+
+ """
+ self.server = server
+ for sock in server.sockets:
+ if sock.family == socket.AF_INET:
+ name = "%s:%d" % sock.getsockname()
+ elif sock.family == socket.AF_INET6:
+ name = "[%s]:%d" % sock.getsockname()[:2]
+ elif sock.family == socket.AF_UNIX:
+ name = sock.getsockname()
+ # In the unlikely event that someone runs websockets over a
+ # protocol other than IP or Unix sockets, avoid crashing.
+ else: # pragma: no cover
+ name = str(sock.getsockname())
+ self.logger.info("server listening on %s", name)
+
+ async def conn_handler(self, connection: ServerConnection) -> None:
+ """
+ Handle the lifecycle of a WebSocket connection.
+
+ Since this method doesn't have a caller that can handle exceptions,
+ it attempts to log relevant ones.
+
+ It guarantees that the TCP connection is closed before exiting.
+
+ """
+ try:
+ async with asyncio_timeout(self.open_timeout):
+ try:
+ await connection.handshake(
+ self.process_request,
+ self.process_response,
+ self.server_header,
+ )
+ except asyncio.CancelledError:
+ connection.transport.abort()
+ raise
+ except Exception:
+ connection.logger.error("opening handshake failed", exc_info=True)
+ connection.transport.abort()
+ return
+
+ if connection.protocol.state is not OPEN:
+ # process_request or process_response rejected the handshake.
+ connection.transport.abort()
+ return
+
+ try:
+ connection.start_keepalive()
+ await self.handler(connection)
+ except Exception:
+ connection.logger.error("connection handler failed", exc_info=True)
+ await connection.close(CloseCode.INTERNAL_ERROR)
+ else:
+ await connection.close()
+
+ except TimeoutError:
+ # When the opening handshake times out, there's nothing to log.
+ pass
+
+ except Exception: # pragma: no cover
+ # Don't leak connections on unexpected errors.
+ connection.transport.abort()
+
+ finally:
+ # Registration is tied to the lifecycle of conn_handler() because
+ # the server waits for connection handlers to terminate, even if
+ # all connections are already closed.
+ del self.handlers[connection]
+
+ def start_connection_handler(self, connection: ServerConnection) -> None:
+ """
+ Register a connection with this server.
+
+ """
+ # The connection must be registered in self.handlers immediately.
+ # If it was registered in conn_handler(), a race condition could
+ # happen when closing the server after scheduling conn_handler()
+ # but before it starts executing.
+ self.handlers[connection] = self.loop.create_task(self.conn_handler(connection))
+
+ def close(self, close_connections: bool = True) -> None:
+ """
+ Close the server.
+
+ * Close the underlying :class:`asyncio.Server`.
+ * When ``close_connections`` is :obj:`True`, which is the default,
+ close existing connections. Specifically:
+
+ * Reject opening WebSocket connections with an HTTP 503 (service
+ unavailable) error. This happens when the server accepted the TCP
+ connection but didn't complete the opening handshake before closing.
+ * Close open WebSocket connections with close code 1001 (going away).
+
+ * Wait until all connection handlers terminate.
+
+ :meth:`close` is idempotent.
+
+ """
+ if self.close_task is None:
+ self.close_task = self.get_loop().create_task(
+ self._close(close_connections)
+ )
+
+ async def _close(self, close_connections: bool) -> None:
+ """
+ Implementation of :meth:`close`.
+
+ This calls :meth:`~asyncio.Server.close` on the underlying
+ :class:`asyncio.Server` object to stop accepting new connections and
+ then closes open connections with close code 1001.
+
+ """
+ self.logger.info("server closing")
+
+ # Stop accepting new connections.
+ self.server.close()
+
+ # Wait until all accepted connections reach connection_made() and call
+ # register(). See https://github.com/python/cpython/issues/79033 for
+ # details. This workaround can be removed when dropping Python < 3.11.
+ await asyncio.sleep(0)
+
+ if close_connections:
+ # Close OPEN connections with close code 1001. After server.close(),
+ # handshake() closes OPENING connections with an HTTP 503 error.
+ close_tasks = [
+ asyncio.create_task(connection.close(1001))
+ for connection in self.handlers
+ if connection.protocol.state is not CONNECTING
+ ]
+ # asyncio.wait doesn't accept an empty first argument.
+ if close_tasks:
+ await asyncio.wait(close_tasks)
+
+ # Wait until all TCP connections are closed.
+ await self.server.wait_closed()
+
+ # Wait until all connection handlers terminate.
+ # asyncio.wait doesn't accept an empty first argument.
+ if self.handlers:
+ await asyncio.wait(self.handlers.values())
+
+ # Tell wait_closed() to return.
+ self.closed_waiter.set_result(None)
+
+ self.logger.info("server closed")
+
+ async def wait_closed(self) -> None:
+ """
+ Wait until the server is closed.
+
+ When :meth:`wait_closed` returns, all TCP connections are closed and
+ all connection handlers have returned.
+
+ To ensure a fast shutdown, a connection handler should always be
+ awaiting at least one of:
+
+ * :meth:`~ServerConnection.recv`: when the connection is closed,
+ it raises :exc:`~websockets.exceptions.ConnectionClosedOK`;
+ * :meth:`~ServerConnection.wait_closed`: when the connection is
+ closed, it returns.
+
+ Then the connection handler is immediately notified of the shutdown;
+ it can clean up and exit.
+
+ """
+ await asyncio.shield(self.closed_waiter)
+
+ def get_loop(self) -> asyncio.AbstractEventLoop:
+ """
+ See :meth:`asyncio.Server.get_loop`.
+
+ """
+ return self.server.get_loop()
+
+ def is_serving(self) -> bool: # pragma: no cover
+ """
+ See :meth:`asyncio.Server.is_serving`.
+
+ """
+ return self.server.is_serving()
+
+ async def start_serving(self) -> None: # pragma: no cover
+ """
+ See :meth:`asyncio.Server.start_serving`.
+
+ Typical use::
+
+ server = await serve(..., start_serving=False)
+ # perform additional setup here...
+ # ... then start the server
+ await server.start_serving()
+
+ """
+ await self.server.start_serving()
+
+ async def serve_forever(self) -> None: # pragma: no cover
+ """
+ See :meth:`asyncio.Server.serve_forever`.
+
+ Typical use::
+
+ server = await serve(...)
+ # this coroutine doesn't return
+ # canceling it stops the server
+ await server.serve_forever()
+
+ This is an alternative to using :func:`serve` as an asynchronous context
+ manager. Shutdown is triggered by canceling :meth:`serve_forever`
+ instead of exiting a :func:`serve` context.
+
+ """
+ await self.server.serve_forever()
+
+ @property
+ def sockets(self) -> Iterable[socket.socket]:
+ """
+ See :attr:`asyncio.Server.sockets`.
+
+ """
+ return self.server.sockets
+
+ async def __aenter__(self) -> Server: # pragma: no cover
+ return self
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_value: BaseException | None,
+ traceback: TracebackType | None,
+ ) -> None: # pragma: no cover
+ self.close()
+ await self.wait_closed()
+
+
+# This is spelled in lower case because it's exposed as a callable in the API.
+class serve:
+ """
+ Create a WebSocket server listening on ``host`` and ``port``.
+
+ Whenever a client connects, the server creates a :class:`ServerConnection`,
+ performs the opening handshake, and delegates to the ``handler`` coroutine.
+
+ The handler receives the :class:`ServerConnection` instance, which you can
+ use to send and receive messages.
+
+ Once the handler completes, either normally or with an exception, the server
+ performs the closing handshake and closes the connection.
+
+ This coroutine returns a :class:`Server` whose API mirrors
+ :class:`asyncio.Server`. Treat it as an asynchronous context manager to
+ ensure that the server will be closed::
+
+ from websockets.asyncio.server import serve
+
+ def handler(websocket):
+ ...
+
+ # set this future to exit the server
+ stop = asyncio.get_running_loop().create_future()
+
+ async with serve(handler, host, port):
+ await stop
+
+ Alternatively, call :meth:`~Server.serve_forever` to serve requests and
+ cancel it to stop the server::
+
+ server = await serve(handler, host, port)
+ await server.serve_forever()
+
+ Args:
+ handler: Connection handler. It receives the WebSocket connection,
+ which is a :class:`ServerConnection`, in argument.
+ host: Network interfaces the server binds to.
+ See :meth:`~asyncio.loop.create_server` for details.
+ port: TCP port the server listens on.
+ See :meth:`~asyncio.loop.create_server` for details.
+ origins: Acceptable values of the ``Origin`` header, for defending
+ against Cross-Site WebSocket Hijacking attacks. Values can be
+ :class:`str` to test for an exact match or regular expressions
+ compiled by :func:`re.compile` to test against a pattern. Include
+ :obj:`None` in the list if the lack of an origin is acceptable.
+ extensions: List of supported extensions, in order in which they
+ should be negotiated and run.
+ subprotocols: List of supported subprotocols, in order of decreasing
+ preference.
+ select_subprotocol: Callback for selecting a subprotocol among
+ those supported by the client and the server. It receives a
+ :class:`ServerConnection` (not a
+ :class:`~websockets.server.ServerProtocol`!) instance and a list of
+ subprotocols offered by the client. Other than the first argument,
+ it has the same behavior as the
+ :meth:`ServerProtocol.select_subprotocol
+ <websockets.server.ServerProtocol.select_subprotocol>` method.
+ process_request: Intercept the request during the opening handshake.
+ Return an HTTP response to force the response or :obj:`None` to
+ continue normally. When you force an HTTP 101 Continue response, the
+ handshake is successful. Else, the connection is aborted.
+ ``process_request`` may be a function or a coroutine.
+ process_response: Intercept the response during the opening handshake.
+ Return an HTTP response to force the response or :obj:`None` to
+ continue normally. When you force an HTTP 101 Continue response, the
+ handshake is successful. Else, the connection is aborted.
+ ``process_response`` may be a function or a coroutine.
+ server_header: Value of the ``Server`` response header.
+ It defaults to ``"Python/x.y.z websockets/X.Y"``. Setting it to
+ :obj:`None` removes the header.
+ compression: The "permessage-deflate" extension is enabled by default.
+ Set ``compression`` to :obj:`None` to disable it. See the
+ :doc:`compression guide <../../topics/compression>` for details.
+ open_timeout: Timeout for opening connections in seconds.
+ :obj:`None` disables the timeout.
+ ping_interval: Interval between keepalive pings in seconds.
+ :obj:`None` disables keepalive.
+ ping_timeout: Timeout for keepalive pings in seconds.
+ :obj:`None` disables timeouts.
+ close_timeout: Timeout for closing connections in seconds.
+ :obj:`None` disables the timeout.
+ max_size: Maximum size of incoming messages in bytes.
+ :obj:`None` disables the limit.
+ max_queue: High-water mark of the buffer where frames are received.
+ It defaults to 16 frames. The low-water mark defaults to ``max_queue
+ // 4``. You may pass a ``(high, low)`` tuple to set the high-water
+ and low-water marks. If you want to disable flow control entirely,
+ you may set it to ``None``, although that's a bad idea.
+ write_limit: High-water mark of write buffer in bytes. It is passed to
+ :meth:`~asyncio.WriteTransport.set_write_buffer_limits`. It defaults
+ to 32 KiB. You may pass a ``(high, low)`` tuple to set the
+ high-water and low-water marks.
+ logger: Logger for this server.
+ It defaults to ``logging.getLogger("websockets.server")``. See the
+ :doc:`logging guide <../../topics/logging>` for details.
+ create_connection: Factory for the :class:`ServerConnection` managing
+ the connection. Set it to a wrapper or a subclass to customize
+ connection handling.
+
+ Any other keyword arguments are passed to the event loop's
+ :meth:`~asyncio.loop.create_server` method.
+
+ For example:
+
+ * You can set ``ssl`` to a :class:`~ssl.SSLContext` to enable TLS.
+
+ * You can set ``sock`` to provide a preexisting TCP socket. You may call
+ :func:`socket.create_server` (not to be confused with the event loop's
+ :meth:`~asyncio.loop.create_server` method) to create a suitable server
+ socket and customize it.
+
+ * You can set ``start_serving`` to ``False`` to start accepting connections
+ only after you call :meth:`~Server.start_serving()` or
+ :meth:`~Server.serve_forever()`.
+
+ """
+
+ def __init__(
+ self,
+ handler: Callable[[ServerConnection], Awaitable[None]],
+ host: str | None = None,
+ port: int | None = None,
+ *,
+ # WebSocket
+ origins: Sequence[Origin | re.Pattern[str] | None] | None = None,
+ extensions: Sequence[ServerExtensionFactory] | None = None,
+ subprotocols: Sequence[Subprotocol] | None = None,
+ select_subprotocol: (
+ Callable[
+ [ServerConnection, Sequence[Subprotocol]],
+ Subprotocol | None,
+ ]
+ | None
+ ) = None,
+ process_request: (
+ Callable[
+ [ServerConnection, Request],
+ Awaitable[Response | None] | Response | None,
+ ]
+ | None
+ ) = None,
+ process_response: (
+ Callable[
+ [ServerConnection, Request, Response],
+ Awaitable[Response | None] | Response | None,
+ ]
+ | None
+ ) = None,
+ server_header: str | None = SERVER,
+ compression: str | None = "deflate",
+ # Timeouts
+ open_timeout: float | None = 10,
+ ping_interval: float | None = 20,
+ ping_timeout: float | None = 20,
+ close_timeout: float | None = 10,
+ # Limits
+ max_size: int | None = 2**20,
+ max_queue: int | None | tuple[int | None, int | None] = 16,
+ write_limit: int | tuple[int, int | None] = 2**15,
+ # Logging
+ logger: LoggerLike | None = None,
+ # Escape hatch for advanced customization
+ create_connection: type[ServerConnection] | None = None,
+ # Other keyword arguments are passed to loop.create_server
+ **kwargs: Any,
+ ) -> None:
+ if subprotocols is not None:
+ validate_subprotocols(subprotocols)
+
+ if compression == "deflate":
+ extensions = enable_server_permessage_deflate(extensions)
+ elif compression is not None:
+ raise ValueError(f"unsupported compression: {compression}")
+
+ if create_connection is None:
+ create_connection = ServerConnection
+
+ self.server = Server(
+ handler,
+ process_request=process_request,
+ process_response=process_response,
+ server_header=server_header,
+ open_timeout=open_timeout,
+ logger=logger,
+ )
+
+ if kwargs.get("ssl") is not None:
+ kwargs.setdefault("ssl_handshake_timeout", open_timeout)
+ if sys.version_info[:2] >= (3, 11): # pragma: no branch
+ kwargs.setdefault("ssl_shutdown_timeout", close_timeout)
+
+ def factory() -> ServerConnection:
+ """
+ Create an asyncio protocol for managing a WebSocket connection.
+
+ """
+ # Create a closure to give select_subprotocol access to connection.
+ protocol_select_subprotocol: (
+ Callable[
+ [ServerProtocol, Sequence[Subprotocol]],
+ Subprotocol | None,
+ ]
+ | None
+ ) = None
+ if select_subprotocol is not None:
+
+ def protocol_select_subprotocol(
+ protocol: ServerProtocol,
+ subprotocols: Sequence[Subprotocol],
+ ) -> Subprotocol | None:
+ # mypy doesn't know that select_subprotocol is immutable.
+ assert select_subprotocol is not None
+ # Ensure this function is only used in the intended context.
+ assert protocol is connection.protocol
+ return select_subprotocol(connection, subprotocols)
+
+ # This is a protocol in the Sans-I/O implementation of websockets.
+ protocol = ServerProtocol(
+ origins=origins,
+ extensions=extensions,
+ subprotocols=subprotocols,
+ select_subprotocol=protocol_select_subprotocol,
+ max_size=max_size,
+ logger=logger,
+ )
+ # This is a connection in websockets and a protocol in asyncio.
+ connection = create_connection(
+ protocol,
+ self.server,
+ ping_interval=ping_interval,
+ ping_timeout=ping_timeout,
+ close_timeout=close_timeout,
+ max_queue=max_queue,
+ write_limit=write_limit,
+ )
+ return connection
+
+ loop = asyncio.get_running_loop()
+ if kwargs.pop("unix", False):
+ self.create_server = loop.create_unix_server(factory, **kwargs)
+ else:
+ # mypy cannot tell that kwargs must provide sock when port is None.
+ self.create_server = loop.create_server(factory, host, port, **kwargs) # type: ignore[arg-type]
+
+ # async with serve(...) as ...: ...
+
+ async def __aenter__(self) -> Server:
+ return await self
+
+ async def __aexit__(
+ self,
+ exc_type: type[BaseException] | None,
+ exc_value: BaseException | None,
+ traceback: TracebackType | None,
+ ) -> None:
+ self.server.close()
+ await self.server.wait_closed()
+
+ # ... = await serve(...)
+
+ def __await__(self) -> Generator[Any, None, Server]:
+ # Create a suitable iterator by calling __await__ on a coroutine.
+ return self.__await_impl__().__await__()
+
+ async def __await_impl__(self) -> Server:
+ server = await self.create_server
+ self.server.wrap(server)
+ return self.server
+
+ # ... = yield from serve(...) - remove when dropping Python < 3.10
+
+ __iter__ = __await__
+
+
+def unix_serve(
+ handler: Callable[[ServerConnection], Awaitable[None]],
+ path: str | None = None,
+ **kwargs: Any,
+) -> Awaitable[Server]:
+ """
+ Create a WebSocket server listening on a Unix socket.
+
+ This function is identical to :func:`serve`, except the ``host`` and
+ ``port`` arguments are replaced by ``path``. It's only available on Unix.
+
+ It's useful for deploying a server behind a reverse proxy such as nginx.
+
+ Args:
+ handler: Connection handler. It receives the WebSocket connection,
+ which is a :class:`ServerConnection`, in argument.
+ path: File system path to the Unix socket.
+
+ """
+ return serve(handler, unix=True, path=path, **kwargs)
+
+
+def is_credentials(credentials: Any) -> bool:
+ try:
+ username, password = credentials
+ except (TypeError, ValueError):
+ return False
+ else:
+ return isinstance(username, str) and isinstance(password, str)
+
+
+def basic_auth(
+ realm: str = "",
+ credentials: tuple[str, str] | Iterable[tuple[str, str]] | None = None,
+ check_credentials: Callable[[str, str], Awaitable[bool] | bool] | None = None,
+) -> Callable[[ServerConnection, Request], Awaitable[Response | None]]:
+ """
+ Factory for ``process_request`` to enforce HTTP Basic Authentication.
+
+ :func:`basic_auth` is designed to integrate with :func:`serve` as follows::
+
+ from websockets.asyncio.server import basic_auth, serve
+
+ async with serve(
+ ...,
+ process_request=basic_auth(
+ realm="my dev server",
+ credentials=("hello", "iloveyou"),
+ ),
+ ):
+
+ If authentication succeeds, the connection's ``username`` attribute is set.
+ If it fails, the server responds with an HTTP 401 Unauthorized status.
+
+ One of ``credentials`` or ``check_credentials`` must be provided; not both.
+
+ Args:
+ realm: Scope of protection. It should contain only ASCII characters
+ because the encoding of non-ASCII characters is undefined. Refer to
+ section 2.2 of :rfc:`7235` for details.
+ credentials: Hard coded authorized credentials. It can be a
+ ``(username, password)`` pair or a list of such pairs.
+ check_credentials: Function or coroutine that verifies credentials.
+ It receives ``username`` and ``password`` arguments and returns
+ whether they're valid.
+ Raises:
+ TypeError: If ``credentials`` or ``check_credentials`` is wrong.
+ ValueError: If ``credentials`` and ``check_credentials`` are both
+ provided or both not provided.
+
+ """
+ if (credentials is None) == (check_credentials is None):
+ raise ValueError("provide either credentials or check_credentials")
+
+ if credentials is not None:
+ if is_credentials(credentials):
+ credentials_list = [cast(tuple[str, str], credentials)]
+ elif isinstance(credentials, Iterable):
+ credentials_list = list(cast(Iterable[tuple[str, str]], credentials))
+ if not all(is_credentials(item) for item in credentials_list):
+ raise TypeError(f"invalid credentials argument: {credentials}")
+ else:
+ raise TypeError(f"invalid credentials argument: {credentials}")
+
+ credentials_dict = dict(credentials_list)
+
+ def check_credentials(username: str, password: str) -> bool:
+ try:
+ expected_password = credentials_dict[username]
+ except KeyError:
+ return False
+ return hmac.compare_digest(expected_password, password)
+
+ assert check_credentials is not None # help mypy
+
+ async def process_request(
+ connection: ServerConnection,
+ request: Request,
+ ) -> Response | None:
+ """
+ Perform HTTP Basic Authentication.
+
+ If it succeeds, set the connection's ``username`` attribute and return
+ :obj:`None`. If it fails, return an HTTP 401 Unauthorized responss.
+
+ """
+ try:
+ authorization = request.headers["Authorization"]
+ except KeyError:
+ response = connection.respond(
+ http.HTTPStatus.UNAUTHORIZED,
+ "Missing credentials\n",
+ )
+ response.headers["WWW-Authenticate"] = build_www_authenticate_basic(realm)
+ return response
+
+ try:
+ username, password = parse_authorization_basic(authorization)
+ except InvalidHeader:
+ response = connection.respond(
+ http.HTTPStatus.UNAUTHORIZED,
+ "Unsupported credentials\n",
+ )
+ response.headers["WWW-Authenticate"] = build_www_authenticate_basic(realm)
+ return response
+
+ valid_credentials = check_credentials(username, password)
+ if isinstance(valid_credentials, Awaitable):
+ valid_credentials = await valid_credentials
+
+ if not valid_credentials:
+ response = connection.respond(
+ http.HTTPStatus.UNAUTHORIZED,
+ "Invalid credentials\n",
+ )
+ response.headers["WWW-Authenticate"] = build_www_authenticate_basic(realm)
+ return response
+
+ connection.username = username
+ return None
+
+ return process_request