about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/anyio/streams/stapled.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/anyio/streams/stapled.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/anyio/streams/stapled.py')
-rw-r--r--.venv/lib/python3.12/site-packages/anyio/streams/stapled.py141
1 files changed, 141 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/anyio/streams/stapled.py b/.venv/lib/python3.12/site-packages/anyio/streams/stapled.py
new file mode 100644
index 00000000..80f64a2e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/anyio/streams/stapled.py
@@ -0,0 +1,141 @@
+from __future__ import annotations
+
+from collections.abc import Callable, Mapping, Sequence
+from dataclasses import dataclass
+from typing import Any, Generic, TypeVar
+
+from ..abc import (
+    ByteReceiveStream,
+    ByteSendStream,
+    ByteStream,
+    Listener,
+    ObjectReceiveStream,
+    ObjectSendStream,
+    ObjectStream,
+    TaskGroup,
+)
+
+T_Item = TypeVar("T_Item")
+T_Stream = TypeVar("T_Stream")
+
+
+@dataclass(eq=False)
+class StapledByteStream(ByteStream):
+    """
+    Combines two byte streams into a single, bidirectional byte stream.
+
+    Extra attributes will be provided from both streams, with the receive stream
+    providing the values in case of a conflict.
+
+    :param ByteSendStream send_stream: the sending byte stream
+    :param ByteReceiveStream receive_stream: the receiving byte stream
+    """
+
+    send_stream: ByteSendStream
+    receive_stream: ByteReceiveStream
+
+    async def receive(self, max_bytes: int = 65536) -> bytes:
+        return await self.receive_stream.receive(max_bytes)
+
+    async def send(self, item: bytes) -> None:
+        await self.send_stream.send(item)
+
+    async def send_eof(self) -> None:
+        await self.send_stream.aclose()
+
+    async def aclose(self) -> None:
+        await self.send_stream.aclose()
+        await self.receive_stream.aclose()
+
+    @property
+    def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
+        return {
+            **self.send_stream.extra_attributes,
+            **self.receive_stream.extra_attributes,
+        }
+
+
+@dataclass(eq=False)
+class StapledObjectStream(Generic[T_Item], ObjectStream[T_Item]):
+    """
+    Combines two object streams into a single, bidirectional object stream.
+
+    Extra attributes will be provided from both streams, with the receive stream
+    providing the values in case of a conflict.
+
+    :param ObjectSendStream send_stream: the sending object stream
+    :param ObjectReceiveStream receive_stream: the receiving object stream
+    """
+
+    send_stream: ObjectSendStream[T_Item]
+    receive_stream: ObjectReceiveStream[T_Item]
+
+    async def receive(self) -> T_Item:
+        return await self.receive_stream.receive()
+
+    async def send(self, item: T_Item) -> None:
+        await self.send_stream.send(item)
+
+    async def send_eof(self) -> None:
+        await self.send_stream.aclose()
+
+    async def aclose(self) -> None:
+        await self.send_stream.aclose()
+        await self.receive_stream.aclose()
+
+    @property
+    def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
+        return {
+            **self.send_stream.extra_attributes,
+            **self.receive_stream.extra_attributes,
+        }
+
+
+@dataclass(eq=False)
+class MultiListener(Generic[T_Stream], Listener[T_Stream]):
+    """
+    Combines multiple listeners into one, serving connections from all of them at once.
+
+    Any MultiListeners in the given collection of listeners will have their listeners
+    moved into this one.
+
+    Extra attributes are provided from each listener, with each successive listener
+    overriding any conflicting attributes from the previous one.
+
+    :param listeners: listeners to serve
+    :type listeners: Sequence[Listener[T_Stream]]
+    """
+
+    listeners: Sequence[Listener[T_Stream]]
+
+    def __post_init__(self) -> None:
+        listeners: list[Listener[T_Stream]] = []
+        for listener in self.listeners:
+            if isinstance(listener, MultiListener):
+                listeners.extend(listener.listeners)
+                del listener.listeners[:]  # type: ignore[attr-defined]
+            else:
+                listeners.append(listener)
+
+        self.listeners = listeners
+
+    async def serve(
+        self, handler: Callable[[T_Stream], Any], task_group: TaskGroup | None = None
+    ) -> None:
+        from .. import create_task_group
+
+        async with create_task_group() as tg:
+            for listener in self.listeners:
+                tg.start_soon(listener.serve, handler, task_group)
+
+    async def aclose(self) -> None:
+        for listener in self.listeners:
+            await listener.aclose()
+
+    @property
+    def extra_attributes(self) -> Mapping[Any, Callable[[], Any]]:
+        attributes: dict = {}
+        for listener in self.listeners:
+            attributes.update(listener.extra_attributes)
+
+        return attributes