about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/httpcore/_backends
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/httpcore/_backends')
-rw-r--r--.venv/lib/python3.12/site-packages/httpcore/_backends/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/httpcore/_backends/anyio.py146
-rw-r--r--.venv/lib/python3.12/site-packages/httpcore/_backends/auto.py52
-rw-r--r--.venv/lib/python3.12/site-packages/httpcore/_backends/base.py101
-rw-r--r--.venv/lib/python3.12/site-packages/httpcore/_backends/mock.py143
-rw-r--r--.venv/lib/python3.12/site-packages/httpcore/_backends/sync.py241
-rw-r--r--.venv/lib/python3.12/site-packages/httpcore/_backends/trio.py159
7 files changed, 842 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/httpcore/_backends/__init__.py b/.venv/lib/python3.12/site-packages/httpcore/_backends/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/httpcore/_backends/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/httpcore/_backends/anyio.py b/.venv/lib/python3.12/site-packages/httpcore/_backends/anyio.py
new file mode 100644
index 00000000..a140095e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/httpcore/_backends/anyio.py
@@ -0,0 +1,146 @@
+from __future__ import annotations
+
+import ssl
+import typing
+
+import anyio
+
+from .._exceptions import (
+    ConnectError,
+    ConnectTimeout,
+    ReadError,
+    ReadTimeout,
+    WriteError,
+    WriteTimeout,
+    map_exceptions,
+)
+from .._utils import is_socket_readable
+from .base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream
+
+
+class AnyIOStream(AsyncNetworkStream):
+    def __init__(self, stream: anyio.abc.ByteStream) -> None:
+        self._stream = stream
+
+    async def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
+        exc_map = {
+            TimeoutError: ReadTimeout,
+            anyio.BrokenResourceError: ReadError,
+            anyio.ClosedResourceError: ReadError,
+            anyio.EndOfStream: ReadError,
+        }
+        with map_exceptions(exc_map):
+            with anyio.fail_after(timeout):
+                try:
+                    return await self._stream.receive(max_bytes=max_bytes)
+                except anyio.EndOfStream:  # pragma: nocover
+                    return b""
+
+    async def write(self, buffer: bytes, timeout: float | None = None) -> None:
+        if not buffer:
+            return
+
+        exc_map = {
+            TimeoutError: WriteTimeout,
+            anyio.BrokenResourceError: WriteError,
+            anyio.ClosedResourceError: WriteError,
+        }
+        with map_exceptions(exc_map):
+            with anyio.fail_after(timeout):
+                await self._stream.send(item=buffer)
+
+    async def aclose(self) -> None:
+        await self._stream.aclose()
+
+    async def start_tls(
+        self,
+        ssl_context: ssl.SSLContext,
+        server_hostname: str | None = None,
+        timeout: float | None = None,
+    ) -> AsyncNetworkStream:
+        exc_map = {
+            TimeoutError: ConnectTimeout,
+            anyio.BrokenResourceError: ConnectError,
+            anyio.EndOfStream: ConnectError,
+            ssl.SSLError: ConnectError,
+        }
+        with map_exceptions(exc_map):
+            try:
+                with anyio.fail_after(timeout):
+                    ssl_stream = await anyio.streams.tls.TLSStream.wrap(
+                        self._stream,
+                        ssl_context=ssl_context,
+                        hostname=server_hostname,
+                        standard_compatible=False,
+                        server_side=False,
+                    )
+            except Exception as exc:  # pragma: nocover
+                await self.aclose()
+                raise exc
+        return AnyIOStream(ssl_stream)
+
+    def get_extra_info(self, info: str) -> typing.Any:
+        if info == "ssl_object":
+            return self._stream.extra(anyio.streams.tls.TLSAttribute.ssl_object, None)
+        if info == "client_addr":
+            return self._stream.extra(anyio.abc.SocketAttribute.local_address, None)
+        if info == "server_addr":
+            return self._stream.extra(anyio.abc.SocketAttribute.remote_address, None)
+        if info == "socket":
+            return self._stream.extra(anyio.abc.SocketAttribute.raw_socket, None)
+        if info == "is_readable":
+            sock = self._stream.extra(anyio.abc.SocketAttribute.raw_socket, None)
+            return is_socket_readable(sock)
+        return None
+
+
+class AnyIOBackend(AsyncNetworkBackend):
+    async def connect_tcp(
+        self,
+        host: str,
+        port: int,
+        timeout: float | None = None,
+        local_address: str | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> AsyncNetworkStream:  # pragma: nocover
+        if socket_options is None:
+            socket_options = []
+        exc_map = {
+            TimeoutError: ConnectTimeout,
+            OSError: ConnectError,
+            anyio.BrokenResourceError: ConnectError,
+        }
+        with map_exceptions(exc_map):
+            with anyio.fail_after(timeout):
+                stream: anyio.abc.ByteStream = await anyio.connect_tcp(
+                    remote_host=host,
+                    remote_port=port,
+                    local_host=local_address,
+                )
+                # By default TCP sockets opened in `asyncio` include TCP_NODELAY.
+                for option in socket_options:
+                    stream._raw_socket.setsockopt(*option)  # type: ignore[attr-defined] # pragma: no cover
+        return AnyIOStream(stream)
+
+    async def connect_unix_socket(
+        self,
+        path: str,
+        timeout: float | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> AsyncNetworkStream:  # pragma: nocover
+        if socket_options is None:
+            socket_options = []
+        exc_map = {
+            TimeoutError: ConnectTimeout,
+            OSError: ConnectError,
+            anyio.BrokenResourceError: ConnectError,
+        }
+        with map_exceptions(exc_map):
+            with anyio.fail_after(timeout):
+                stream: anyio.abc.ByteStream = await anyio.connect_unix(path)
+                for option in socket_options:
+                    stream._raw_socket.setsockopt(*option)  # type: ignore[attr-defined] # pragma: no cover
+        return AnyIOStream(stream)
+
+    async def sleep(self, seconds: float) -> None:
+        await anyio.sleep(seconds)  # pragma: nocover
diff --git a/.venv/lib/python3.12/site-packages/httpcore/_backends/auto.py b/.venv/lib/python3.12/site-packages/httpcore/_backends/auto.py
new file mode 100644
index 00000000..49f0e698
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/httpcore/_backends/auto.py
@@ -0,0 +1,52 @@
+from __future__ import annotations
+
+import typing
+
+from .._synchronization import current_async_library
+from .base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream
+
+
+class AutoBackend(AsyncNetworkBackend):
+    async def _init_backend(self) -> None:
+        if not (hasattr(self, "_backend")):
+            backend = current_async_library()
+            if backend == "trio":
+                from .trio import TrioBackend
+
+                self._backend: AsyncNetworkBackend = TrioBackend()
+            else:
+                from .anyio import AnyIOBackend
+
+                self._backend = AnyIOBackend()
+
+    async def connect_tcp(
+        self,
+        host: str,
+        port: int,
+        timeout: float | None = None,
+        local_address: str | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> AsyncNetworkStream:
+        await self._init_backend()
+        return await self._backend.connect_tcp(
+            host,
+            port,
+            timeout=timeout,
+            local_address=local_address,
+            socket_options=socket_options,
+        )
+
+    async def connect_unix_socket(
+        self,
+        path: str,
+        timeout: float | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> AsyncNetworkStream:  # pragma: nocover
+        await self._init_backend()
+        return await self._backend.connect_unix_socket(
+            path, timeout=timeout, socket_options=socket_options
+        )
+
+    async def sleep(self, seconds: float) -> None:  # pragma: nocover
+        await self._init_backend()
+        return await self._backend.sleep(seconds)
diff --git a/.venv/lib/python3.12/site-packages/httpcore/_backends/base.py b/.venv/lib/python3.12/site-packages/httpcore/_backends/base.py
new file mode 100644
index 00000000..cf55c8b1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/httpcore/_backends/base.py
@@ -0,0 +1,101 @@
+from __future__ import annotations
+
+import ssl
+import time
+import typing
+
+SOCKET_OPTION = typing.Union[
+    typing.Tuple[int, int, int],
+    typing.Tuple[int, int, typing.Union[bytes, bytearray]],
+    typing.Tuple[int, int, None, int],
+]
+
+
+class NetworkStream:
+    def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
+        raise NotImplementedError()  # pragma: nocover
+
+    def write(self, buffer: bytes, timeout: float | None = None) -> None:
+        raise NotImplementedError()  # pragma: nocover
+
+    def close(self) -> None:
+        raise NotImplementedError()  # pragma: nocover
+
+    def start_tls(
+        self,
+        ssl_context: ssl.SSLContext,
+        server_hostname: str | None = None,
+        timeout: float | None = None,
+    ) -> NetworkStream:
+        raise NotImplementedError()  # pragma: nocover
+
+    def get_extra_info(self, info: str) -> typing.Any:
+        return None  # pragma: nocover
+
+
+class NetworkBackend:
+    def connect_tcp(
+        self,
+        host: str,
+        port: int,
+        timeout: float | None = None,
+        local_address: str | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> NetworkStream:
+        raise NotImplementedError()  # pragma: nocover
+
+    def connect_unix_socket(
+        self,
+        path: str,
+        timeout: float | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> NetworkStream:
+        raise NotImplementedError()  # pragma: nocover
+
+    def sleep(self, seconds: float) -> None:
+        time.sleep(seconds)  # pragma: nocover
+
+
+class AsyncNetworkStream:
+    async def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
+        raise NotImplementedError()  # pragma: nocover
+
+    async def write(self, buffer: bytes, timeout: float | None = None) -> None:
+        raise NotImplementedError()  # pragma: nocover
+
+    async def aclose(self) -> None:
+        raise NotImplementedError()  # pragma: nocover
+
+    async def start_tls(
+        self,
+        ssl_context: ssl.SSLContext,
+        server_hostname: str | None = None,
+        timeout: float | None = None,
+    ) -> AsyncNetworkStream:
+        raise NotImplementedError()  # pragma: nocover
+
+    def get_extra_info(self, info: str) -> typing.Any:
+        return None  # pragma: nocover
+
+
+class AsyncNetworkBackend:
+    async def connect_tcp(
+        self,
+        host: str,
+        port: int,
+        timeout: float | None = None,
+        local_address: str | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> AsyncNetworkStream:
+        raise NotImplementedError()  # pragma: nocover
+
+    async def connect_unix_socket(
+        self,
+        path: str,
+        timeout: float | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> AsyncNetworkStream:
+        raise NotImplementedError()  # pragma: nocover
+
+    async def sleep(self, seconds: float) -> None:
+        raise NotImplementedError()  # pragma: nocover
diff --git a/.venv/lib/python3.12/site-packages/httpcore/_backends/mock.py b/.venv/lib/python3.12/site-packages/httpcore/_backends/mock.py
new file mode 100644
index 00000000..9b6edca0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/httpcore/_backends/mock.py
@@ -0,0 +1,143 @@
+from __future__ import annotations
+
+import ssl
+import typing
+
+from .._exceptions import ReadError
+from .base import (
+    SOCKET_OPTION,
+    AsyncNetworkBackend,
+    AsyncNetworkStream,
+    NetworkBackend,
+    NetworkStream,
+)
+
+
+class MockSSLObject:
+    def __init__(self, http2: bool):
+        self._http2 = http2
+
+    def selected_alpn_protocol(self) -> str:
+        return "h2" if self._http2 else "http/1.1"
+
+
+class MockStream(NetworkStream):
+    def __init__(self, buffer: list[bytes], http2: bool = False) -> None:
+        self._buffer = buffer
+        self._http2 = http2
+        self._closed = False
+
+    def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
+        if self._closed:
+            raise ReadError("Connection closed")
+        if not self._buffer:
+            return b""
+        return self._buffer.pop(0)
+
+    def write(self, buffer: bytes, timeout: float | None = None) -> None:
+        pass
+
+    def close(self) -> None:
+        self._closed = True
+
+    def start_tls(
+        self,
+        ssl_context: ssl.SSLContext,
+        server_hostname: str | None = None,
+        timeout: float | None = None,
+    ) -> NetworkStream:
+        return self
+
+    def get_extra_info(self, info: str) -> typing.Any:
+        return MockSSLObject(http2=self._http2) if info == "ssl_object" else None
+
+    def __repr__(self) -> str:
+        return "<httpcore.MockStream>"
+
+
+class MockBackend(NetworkBackend):
+    def __init__(self, buffer: list[bytes], http2: bool = False) -> None:
+        self._buffer = buffer
+        self._http2 = http2
+
+    def connect_tcp(
+        self,
+        host: str,
+        port: int,
+        timeout: float | None = None,
+        local_address: str | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> NetworkStream:
+        return MockStream(list(self._buffer), http2=self._http2)
+
+    def connect_unix_socket(
+        self,
+        path: str,
+        timeout: float | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> NetworkStream:
+        return MockStream(list(self._buffer), http2=self._http2)
+
+    def sleep(self, seconds: float) -> None:
+        pass
+
+
+class AsyncMockStream(AsyncNetworkStream):
+    def __init__(self, buffer: list[bytes], http2: bool = False) -> None:
+        self._buffer = buffer
+        self._http2 = http2
+        self._closed = False
+
+    async def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
+        if self._closed:
+            raise ReadError("Connection closed")
+        if not self._buffer:
+            return b""
+        return self._buffer.pop(0)
+
+    async def write(self, buffer: bytes, timeout: float | None = None) -> None:
+        pass
+
+    async def aclose(self) -> None:
+        self._closed = True
+
+    async def start_tls(
+        self,
+        ssl_context: ssl.SSLContext,
+        server_hostname: str | None = None,
+        timeout: float | None = None,
+    ) -> AsyncNetworkStream:
+        return self
+
+    def get_extra_info(self, info: str) -> typing.Any:
+        return MockSSLObject(http2=self._http2) if info == "ssl_object" else None
+
+    def __repr__(self) -> str:
+        return "<httpcore.AsyncMockStream>"
+
+
+class AsyncMockBackend(AsyncNetworkBackend):
+    def __init__(self, buffer: list[bytes], http2: bool = False) -> None:
+        self._buffer = buffer
+        self._http2 = http2
+
+    async def connect_tcp(
+        self,
+        host: str,
+        port: int,
+        timeout: float | None = None,
+        local_address: str | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> AsyncNetworkStream:
+        return AsyncMockStream(list(self._buffer), http2=self._http2)
+
+    async def connect_unix_socket(
+        self,
+        path: str,
+        timeout: float | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> AsyncNetworkStream:
+        return AsyncMockStream(list(self._buffer), http2=self._http2)
+
+    async def sleep(self, seconds: float) -> None:
+        pass
diff --git a/.venv/lib/python3.12/site-packages/httpcore/_backends/sync.py b/.venv/lib/python3.12/site-packages/httpcore/_backends/sync.py
new file mode 100644
index 00000000..4018a09c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/httpcore/_backends/sync.py
@@ -0,0 +1,241 @@
+from __future__ import annotations
+
+import functools
+import socket
+import ssl
+import sys
+import typing
+
+from .._exceptions import (
+    ConnectError,
+    ConnectTimeout,
+    ExceptionMapping,
+    ReadError,
+    ReadTimeout,
+    WriteError,
+    WriteTimeout,
+    map_exceptions,
+)
+from .._utils import is_socket_readable
+from .base import SOCKET_OPTION, NetworkBackend, NetworkStream
+
+
+class TLSinTLSStream(NetworkStream):  # pragma: no cover
+    """
+    Because the standard `SSLContext.wrap_socket` method does
+    not work for `SSLSocket` objects, we need this class
+    to implement TLS stream using an underlying `SSLObject`
+    instance in order to support TLS on top of TLS.
+    """
+
+    # Defined in RFC 8449
+    TLS_RECORD_SIZE = 16384
+
+    def __init__(
+        self,
+        sock: socket.socket,
+        ssl_context: ssl.SSLContext,
+        server_hostname: str | None = None,
+        timeout: float | None = None,
+    ):
+        self._sock = sock
+        self._incoming = ssl.MemoryBIO()
+        self._outgoing = ssl.MemoryBIO()
+
+        self.ssl_obj = ssl_context.wrap_bio(
+            incoming=self._incoming,
+            outgoing=self._outgoing,
+            server_hostname=server_hostname,
+        )
+
+        self._sock.settimeout(timeout)
+        self._perform_io(self.ssl_obj.do_handshake)
+
+    def _perform_io(
+        self,
+        func: typing.Callable[..., typing.Any],
+    ) -> typing.Any:
+        ret = None
+
+        while True:
+            errno = None
+            try:
+                ret = func()
+            except (ssl.SSLWantReadError, ssl.SSLWantWriteError) as e:
+                errno = e.errno
+
+            self._sock.sendall(self._outgoing.read())
+
+            if errno == ssl.SSL_ERROR_WANT_READ:
+                buf = self._sock.recv(self.TLS_RECORD_SIZE)
+
+                if buf:
+                    self._incoming.write(buf)
+                else:
+                    self._incoming.write_eof()
+            if errno is None:
+                return ret
+
+    def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
+        exc_map: ExceptionMapping = {socket.timeout: ReadTimeout, OSError: ReadError}
+        with map_exceptions(exc_map):
+            self._sock.settimeout(timeout)
+            return typing.cast(
+                bytes, self._perform_io(functools.partial(self.ssl_obj.read, max_bytes))
+            )
+
+    def write(self, buffer: bytes, timeout: float | None = None) -> None:
+        exc_map: ExceptionMapping = {socket.timeout: WriteTimeout, OSError: WriteError}
+        with map_exceptions(exc_map):
+            self._sock.settimeout(timeout)
+            while buffer:
+                nsent = self._perform_io(functools.partial(self.ssl_obj.write, buffer))
+                buffer = buffer[nsent:]
+
+    def close(self) -> None:
+        self._sock.close()
+
+    def start_tls(
+        self,
+        ssl_context: ssl.SSLContext,
+        server_hostname: str | None = None,
+        timeout: float | None = None,
+    ) -> NetworkStream:
+        raise NotImplementedError()
+
+    def get_extra_info(self, info: str) -> typing.Any:
+        if info == "ssl_object":
+            return self.ssl_obj
+        if info == "client_addr":
+            return self._sock.getsockname()
+        if info == "server_addr":
+            return self._sock.getpeername()
+        if info == "socket":
+            return self._sock
+        if info == "is_readable":
+            return is_socket_readable(self._sock)
+        return None
+
+
+class SyncStream(NetworkStream):
+    def __init__(self, sock: socket.socket) -> None:
+        self._sock = sock
+
+    def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
+        exc_map: ExceptionMapping = {socket.timeout: ReadTimeout, OSError: ReadError}
+        with map_exceptions(exc_map):
+            self._sock.settimeout(timeout)
+            return self._sock.recv(max_bytes)
+
+    def write(self, buffer: bytes, timeout: float | None = None) -> None:
+        if not buffer:
+            return
+
+        exc_map: ExceptionMapping = {socket.timeout: WriteTimeout, OSError: WriteError}
+        with map_exceptions(exc_map):
+            while buffer:
+                self._sock.settimeout(timeout)
+                n = self._sock.send(buffer)
+                buffer = buffer[n:]
+
+    def close(self) -> None:
+        self._sock.close()
+
+    def start_tls(
+        self,
+        ssl_context: ssl.SSLContext,
+        server_hostname: str | None = None,
+        timeout: float | None = None,
+    ) -> NetworkStream:
+        exc_map: ExceptionMapping = {
+            socket.timeout: ConnectTimeout,
+            OSError: ConnectError,
+        }
+        with map_exceptions(exc_map):
+            try:
+                if isinstance(self._sock, ssl.SSLSocket):  # pragma: no cover
+                    # If the underlying socket has already been upgraded
+                    # to the TLS layer (i.e. is an instance of SSLSocket),
+                    # we need some additional smarts to support TLS-in-TLS.
+                    return TLSinTLSStream(
+                        self._sock, ssl_context, server_hostname, timeout
+                    )
+                else:
+                    self._sock.settimeout(timeout)
+                    sock = ssl_context.wrap_socket(
+                        self._sock, server_hostname=server_hostname
+                    )
+            except Exception as exc:  # pragma: nocover
+                self.close()
+                raise exc
+        return SyncStream(sock)
+
+    def get_extra_info(self, info: str) -> typing.Any:
+        if info == "ssl_object" and isinstance(self._sock, ssl.SSLSocket):
+            return self._sock._sslobj  # type: ignore
+        if info == "client_addr":
+            return self._sock.getsockname()
+        if info == "server_addr":
+            return self._sock.getpeername()
+        if info == "socket":
+            return self._sock
+        if info == "is_readable":
+            return is_socket_readable(self._sock)
+        return None
+
+
+class SyncBackend(NetworkBackend):
+    def connect_tcp(
+        self,
+        host: str,
+        port: int,
+        timeout: float | None = None,
+        local_address: str | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> NetworkStream:
+        # Note that we automatically include `TCP_NODELAY`
+        # in addition to any other custom socket options.
+        if socket_options is None:
+            socket_options = []  # pragma: no cover
+        address = (host, port)
+        source_address = None if local_address is None else (local_address, 0)
+        exc_map: ExceptionMapping = {
+            socket.timeout: ConnectTimeout,
+            OSError: ConnectError,
+        }
+
+        with map_exceptions(exc_map):
+            sock = socket.create_connection(
+                address,
+                timeout,
+                source_address=source_address,
+            )
+            for option in socket_options:
+                sock.setsockopt(*option)  # pragma: no cover
+            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
+        return SyncStream(sock)
+
+    def connect_unix_socket(
+        self,
+        path: str,
+        timeout: float | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> NetworkStream:  # pragma: nocover
+        if sys.platform == "win32":
+            raise RuntimeError(
+                "Attempted to connect to a UNIX socket on a Windows system."
+            )
+        if socket_options is None:
+            socket_options = []
+
+        exc_map: ExceptionMapping = {
+            socket.timeout: ConnectTimeout,
+            OSError: ConnectError,
+        }
+        with map_exceptions(exc_map):
+            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+            for option in socket_options:
+                sock.setsockopt(*option)
+            sock.settimeout(timeout)
+            sock.connect(path)
+        return SyncStream(sock)
diff --git a/.venv/lib/python3.12/site-packages/httpcore/_backends/trio.py b/.venv/lib/python3.12/site-packages/httpcore/_backends/trio.py
new file mode 100644
index 00000000..6f53f5f2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/httpcore/_backends/trio.py
@@ -0,0 +1,159 @@
+from __future__ import annotations
+
+import ssl
+import typing
+
+import trio
+
+from .._exceptions import (
+    ConnectError,
+    ConnectTimeout,
+    ExceptionMapping,
+    ReadError,
+    ReadTimeout,
+    WriteError,
+    WriteTimeout,
+    map_exceptions,
+)
+from .base import SOCKET_OPTION, AsyncNetworkBackend, AsyncNetworkStream
+
+
+class TrioStream(AsyncNetworkStream):
+    def __init__(self, stream: trio.abc.Stream) -> None:
+        self._stream = stream
+
+    async def read(self, max_bytes: int, timeout: float | None = None) -> bytes:
+        timeout_or_inf = float("inf") if timeout is None else timeout
+        exc_map: ExceptionMapping = {
+            trio.TooSlowError: ReadTimeout,
+            trio.BrokenResourceError: ReadError,
+            trio.ClosedResourceError: ReadError,
+        }
+        with map_exceptions(exc_map):
+            with trio.fail_after(timeout_or_inf):
+                data: bytes = await self._stream.receive_some(max_bytes=max_bytes)
+                return data
+
+    async def write(self, buffer: bytes, timeout: float | None = None) -> None:
+        if not buffer:
+            return
+
+        timeout_or_inf = float("inf") if timeout is None else timeout
+        exc_map: ExceptionMapping = {
+            trio.TooSlowError: WriteTimeout,
+            trio.BrokenResourceError: WriteError,
+            trio.ClosedResourceError: WriteError,
+        }
+        with map_exceptions(exc_map):
+            with trio.fail_after(timeout_or_inf):
+                await self._stream.send_all(data=buffer)
+
+    async def aclose(self) -> None:
+        await self._stream.aclose()
+
+    async def start_tls(
+        self,
+        ssl_context: ssl.SSLContext,
+        server_hostname: str | None = None,
+        timeout: float | None = None,
+    ) -> AsyncNetworkStream:
+        timeout_or_inf = float("inf") if timeout is None else timeout
+        exc_map: ExceptionMapping = {
+            trio.TooSlowError: ConnectTimeout,
+            trio.BrokenResourceError: ConnectError,
+        }
+        ssl_stream = trio.SSLStream(
+            self._stream,
+            ssl_context=ssl_context,
+            server_hostname=server_hostname,
+            https_compatible=True,
+            server_side=False,
+        )
+        with map_exceptions(exc_map):
+            try:
+                with trio.fail_after(timeout_or_inf):
+                    await ssl_stream.do_handshake()
+            except Exception as exc:  # pragma: nocover
+                await self.aclose()
+                raise exc
+        return TrioStream(ssl_stream)
+
+    def get_extra_info(self, info: str) -> typing.Any:
+        if info == "ssl_object" and isinstance(self._stream, trio.SSLStream):
+            # Type checkers cannot see `_ssl_object` attribute because trio._ssl.SSLStream uses __getattr__/__setattr__.
+            # Tracked at https://github.com/python-trio/trio/issues/542
+            return self._stream._ssl_object  # type: ignore[attr-defined]
+        if info == "client_addr":
+            return self._get_socket_stream().socket.getsockname()
+        if info == "server_addr":
+            return self._get_socket_stream().socket.getpeername()
+        if info == "socket":
+            stream = self._stream
+            while isinstance(stream, trio.SSLStream):
+                stream = stream.transport_stream
+            assert isinstance(stream, trio.SocketStream)
+            return stream.socket
+        if info == "is_readable":
+            socket = self.get_extra_info("socket")
+            return socket.is_readable()
+        return None
+
+    def _get_socket_stream(self) -> trio.SocketStream:
+        stream = self._stream
+        while isinstance(stream, trio.SSLStream):
+            stream = stream.transport_stream
+        assert isinstance(stream, trio.SocketStream)
+        return stream
+
+
+class TrioBackend(AsyncNetworkBackend):
+    async def connect_tcp(
+        self,
+        host: str,
+        port: int,
+        timeout: float | None = None,
+        local_address: str | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> AsyncNetworkStream:
+        # By default for TCP sockets, trio enables TCP_NODELAY.
+        # https://trio.readthedocs.io/en/stable/reference-io.html#trio.SocketStream
+        if socket_options is None:
+            socket_options = []  # pragma: no cover
+        timeout_or_inf = float("inf") if timeout is None else timeout
+        exc_map: ExceptionMapping = {
+            trio.TooSlowError: ConnectTimeout,
+            trio.BrokenResourceError: ConnectError,
+            OSError: ConnectError,
+        }
+        with map_exceptions(exc_map):
+            with trio.fail_after(timeout_or_inf):
+                stream: trio.abc.Stream = await trio.open_tcp_stream(
+                    host=host, port=port, local_address=local_address
+                )
+                for option in socket_options:
+                    stream.setsockopt(*option)  # type: ignore[attr-defined] # pragma: no cover
+        return TrioStream(stream)
+
+    async def connect_unix_socket(
+        self,
+        path: str,
+        timeout: float | None = None,
+        socket_options: typing.Iterable[SOCKET_OPTION] | None = None,
+    ) -> AsyncNetworkStream:  # pragma: nocover
+        if socket_options is None:
+            socket_options = []
+        timeout_or_inf = float("inf") if timeout is None else timeout
+        exc_map: ExceptionMapping = {
+            trio.TooSlowError: ConnectTimeout,
+            trio.BrokenResourceError: ConnectError,
+            OSError: ConnectError,
+        }
+        with map_exceptions(exc_map):
+            with trio.fail_after(timeout_or_inf):
+                stream: trio.abc.Stream = await trio.open_unix_socket(path)
+                for option in socket_options:
+                    stream.setsockopt(*option)  # type: ignore[attr-defined] # pragma: no cover
+        return TrioStream(stream)
+
+    async def sleep(self, seconds: float) -> None:
+        await trio.sleep(seconds)  # pragma: nocover