about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py')
-rw-r--r--.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py792
1 files changed, 792 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py b/.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py
new file mode 100644
index 00000000..054bcdda
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py
@@ -0,0 +1,792 @@
+from __future__ import annotations
+
+import errno
+import os
+import socket
+import ssl
+import stat
+import sys
+from collections.abc import Awaitable
+from ipaddress import IPv6Address, ip_address
+from os import PathLike, chmod
+from socket import AddressFamily, SocketKind
+from typing import TYPE_CHECKING, Any, Literal, cast, overload
+
+from .. import to_thread
+from ..abc import (
+    ConnectedUDPSocket,
+    ConnectedUNIXDatagramSocket,
+    IPAddressType,
+    IPSockAddrType,
+    SocketListener,
+    SocketStream,
+    UDPSocket,
+    UNIXDatagramSocket,
+    UNIXSocketStream,
+)
+from ..streams.stapled import MultiListener
+from ..streams.tls import TLSStream
+from ._eventloop import get_async_backend
+from ._resources import aclose_forcefully
+from ._synchronization import Event
+from ._tasks import create_task_group, move_on_after
+
+if TYPE_CHECKING:
+    from _typeshed import FileDescriptorLike
+else:
+    FileDescriptorLike = object
+
+if sys.version_info < (3, 11):
+    from exceptiongroup import ExceptionGroup
+
+if sys.version_info < (3, 13):
+    from typing_extensions import deprecated
+else:
+    from warnings import deprecated
+
+IPPROTO_IPV6 = getattr(socket, "IPPROTO_IPV6", 41)  # https://bugs.python.org/issue29515
+
+AnyIPAddressFamily = Literal[
+    AddressFamily.AF_UNSPEC, AddressFamily.AF_INET, AddressFamily.AF_INET6
+]
+IPAddressFamily = Literal[AddressFamily.AF_INET, AddressFamily.AF_INET6]
+
+
+# tls_hostname given
+@overload
+async def connect_tcp(
+    remote_host: IPAddressType,
+    remote_port: int,
+    *,
+    local_host: IPAddressType | None = ...,
+    ssl_context: ssl.SSLContext | None = ...,
+    tls_standard_compatible: bool = ...,
+    tls_hostname: str,
+    happy_eyeballs_delay: float = ...,
+) -> TLSStream: ...
+
+
+# ssl_context given
+@overload
+async def connect_tcp(
+    remote_host: IPAddressType,
+    remote_port: int,
+    *,
+    local_host: IPAddressType | None = ...,
+    ssl_context: ssl.SSLContext,
+    tls_standard_compatible: bool = ...,
+    tls_hostname: str | None = ...,
+    happy_eyeballs_delay: float = ...,
+) -> TLSStream: ...
+
+
+# tls=True
+@overload
+async def connect_tcp(
+    remote_host: IPAddressType,
+    remote_port: int,
+    *,
+    local_host: IPAddressType | None = ...,
+    tls: Literal[True],
+    ssl_context: ssl.SSLContext | None = ...,
+    tls_standard_compatible: bool = ...,
+    tls_hostname: str | None = ...,
+    happy_eyeballs_delay: float = ...,
+) -> TLSStream: ...
+
+
+# tls=False
+@overload
+async def connect_tcp(
+    remote_host: IPAddressType,
+    remote_port: int,
+    *,
+    local_host: IPAddressType | None = ...,
+    tls: Literal[False],
+    ssl_context: ssl.SSLContext | None = ...,
+    tls_standard_compatible: bool = ...,
+    tls_hostname: str | None = ...,
+    happy_eyeballs_delay: float = ...,
+) -> SocketStream: ...
+
+
+# No TLS arguments
+@overload
+async def connect_tcp(
+    remote_host: IPAddressType,
+    remote_port: int,
+    *,
+    local_host: IPAddressType | None = ...,
+    happy_eyeballs_delay: float = ...,
+) -> SocketStream: ...
+
+
+async def connect_tcp(
+    remote_host: IPAddressType,
+    remote_port: int,
+    *,
+    local_host: IPAddressType | None = None,
+    tls: bool = False,
+    ssl_context: ssl.SSLContext | None = None,
+    tls_standard_compatible: bool = True,
+    tls_hostname: str | None = None,
+    happy_eyeballs_delay: float = 0.25,
+) -> SocketStream | TLSStream:
+    """
+    Connect to a host using the TCP protocol.
+
+    This function implements the stateless version of the Happy Eyeballs algorithm (RFC
+    6555). If ``remote_host`` is a host name that resolves to multiple IP addresses,
+    each one is tried until one connection attempt succeeds. If the first attempt does
+    not connected within 250 milliseconds, a second attempt is started using the next
+    address in the list, and so on. On IPv6 enabled systems, an IPv6 address (if
+    available) is tried first.
+
+    When the connection has been established, a TLS handshake will be done if either
+    ``ssl_context`` or ``tls_hostname`` is not ``None``, or if ``tls`` is ``True``.
+
+    :param remote_host: the IP address or host name to connect to
+    :param remote_port: port on the target host to connect to
+    :param local_host: the interface address or name to bind the socket to before
+        connecting
+    :param tls: ``True`` to do a TLS handshake with the connected stream and return a
+        :class:`~anyio.streams.tls.TLSStream` instead
+    :param ssl_context: the SSL context object to use (if omitted, a default context is
+        created)
+    :param tls_standard_compatible: If ``True``, performs the TLS shutdown handshake
+        before closing the stream and requires that the server does this as well.
+        Otherwise, :exc:`~ssl.SSLEOFError` may be raised during reads from the stream.
+        Some protocols, such as HTTP, require this option to be ``False``.
+        See :meth:`~ssl.SSLContext.wrap_socket` for details.
+    :param tls_hostname: host name to check the server certificate against (defaults to
+        the value of ``remote_host``)
+    :param happy_eyeballs_delay: delay (in seconds) before starting the next connection
+        attempt
+    :return: a socket stream object if no TLS handshake was done, otherwise a TLS stream
+    :raises OSError: if the connection attempt fails
+
+    """
+    # Placed here due to https://github.com/python/mypy/issues/7057
+    connected_stream: SocketStream | None = None
+
+    async def try_connect(remote_host: str, event: Event) -> None:
+        nonlocal connected_stream
+        try:
+            stream = await asynclib.connect_tcp(remote_host, remote_port, local_address)
+        except OSError as exc:
+            oserrors.append(exc)
+            return
+        else:
+            if connected_stream is None:
+                connected_stream = stream
+                tg.cancel_scope.cancel()
+            else:
+                await stream.aclose()
+        finally:
+            event.set()
+
+    asynclib = get_async_backend()
+    local_address: IPSockAddrType | None = None
+    family = socket.AF_UNSPEC
+    if local_host:
+        gai_res = await getaddrinfo(str(local_host), None)
+        family, *_, local_address = gai_res[0]
+
+    target_host = str(remote_host)
+    try:
+        addr_obj = ip_address(remote_host)
+    except ValueError:
+        addr_obj = None
+
+    if addr_obj is not None:
+        if isinstance(addr_obj, IPv6Address):
+            target_addrs = [(socket.AF_INET6, addr_obj.compressed)]
+        else:
+            target_addrs = [(socket.AF_INET, addr_obj.compressed)]
+    else:
+        # getaddrinfo() will raise an exception if name resolution fails
+        gai_res = await getaddrinfo(
+            target_host, remote_port, family=family, type=socket.SOCK_STREAM
+        )
+
+        # Organize the list so that the first address is an IPv6 address (if available)
+        # and the second one is an IPv4 addresses. The rest can be in whatever order.
+        v6_found = v4_found = False
+        target_addrs = []
+        for af, *rest, sa in gai_res:
+            if af == socket.AF_INET6 and not v6_found:
+                v6_found = True
+                target_addrs.insert(0, (af, sa[0]))
+            elif af == socket.AF_INET and not v4_found and v6_found:
+                v4_found = True
+                target_addrs.insert(1, (af, sa[0]))
+            else:
+                target_addrs.append((af, sa[0]))
+
+    oserrors: list[OSError] = []
+    try:
+        async with create_task_group() as tg:
+            for i, (af, addr) in enumerate(target_addrs):
+                event = Event()
+                tg.start_soon(try_connect, addr, event)
+                with move_on_after(happy_eyeballs_delay):
+                    await event.wait()
+
+        if connected_stream is None:
+            cause = (
+                oserrors[0]
+                if len(oserrors) == 1
+                else ExceptionGroup("multiple connection attempts failed", oserrors)
+            )
+            raise OSError("All connection attempts failed") from cause
+    finally:
+        oserrors.clear()
+
+    if tls or tls_hostname or ssl_context:
+        try:
+            return await TLSStream.wrap(
+                connected_stream,
+                server_side=False,
+                hostname=tls_hostname or str(remote_host),
+                ssl_context=ssl_context,
+                standard_compatible=tls_standard_compatible,
+            )
+        except BaseException:
+            await aclose_forcefully(connected_stream)
+            raise
+
+    return connected_stream
+
+
+async def connect_unix(path: str | bytes | PathLike[Any]) -> UNIXSocketStream:
+    """
+    Connect to the given UNIX socket.
+
+    Not available on Windows.
+
+    :param path: path to the socket
+    :return: a socket stream object
+
+    """
+    path = os.fspath(path)
+    return await get_async_backend().connect_unix(path)
+
+
+async def create_tcp_listener(
+    *,
+    local_host: IPAddressType | None = None,
+    local_port: int = 0,
+    family: AnyIPAddressFamily = socket.AddressFamily.AF_UNSPEC,
+    backlog: int = 65536,
+    reuse_port: bool = False,
+) -> MultiListener[SocketStream]:
+    """
+    Create a TCP socket listener.
+
+    :param local_port: port number to listen on
+    :param local_host: IP address of the interface to listen on. If omitted, listen on
+        all IPv4 and IPv6 interfaces. To listen on all interfaces on a specific address
+        family, use ``0.0.0.0`` for IPv4 or ``::`` for IPv6.
+    :param family: address family (used if ``local_host`` was omitted)
+    :param backlog: maximum number of queued incoming connections (up to a maximum of
+        2**16, or 65536)
+    :param reuse_port: ``True`` to allow multiple sockets to bind to the same
+        address/port (not supported on Windows)
+    :return: a list of listener objects
+
+    """
+    asynclib = get_async_backend()
+    backlog = min(backlog, 65536)
+    local_host = str(local_host) if local_host is not None else None
+    gai_res = await getaddrinfo(
+        local_host,
+        local_port,
+        family=family,
+        type=socket.SocketKind.SOCK_STREAM if sys.platform == "win32" else 0,
+        flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
+    )
+    listeners: list[SocketListener] = []
+    try:
+        # The set() is here to work around a glibc bug:
+        # https://sourceware.org/bugzilla/show_bug.cgi?id=14969
+        sockaddr: tuple[str, int] | tuple[str, int, int, int]
+        for fam, kind, *_, sockaddr in sorted(set(gai_res)):
+            # Workaround for an uvloop bug where we don't get the correct scope ID for
+            # IPv6 link-local addresses when passing type=socket.SOCK_STREAM to
+            # getaddrinfo(): https://github.com/MagicStack/uvloop/issues/539
+            if sys.platform != "win32" and kind is not SocketKind.SOCK_STREAM:
+                continue
+
+            raw_socket = socket.socket(fam)
+            raw_socket.setblocking(False)
+
+            # For Windows, enable exclusive address use. For others, enable address
+            # reuse.
+            if sys.platform == "win32":
+                raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+            else:
+                raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+            if reuse_port:
+                raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+
+            # If only IPv6 was requested, disable dual stack operation
+            if fam == socket.AF_INET6:
+                raw_socket.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
+
+                # Workaround for #554
+                if "%" in sockaddr[0]:
+                    addr, scope_id = sockaddr[0].split("%", 1)
+                    sockaddr = (addr, sockaddr[1], 0, int(scope_id))
+
+            raw_socket.bind(sockaddr)
+            raw_socket.listen(backlog)
+            listener = asynclib.create_tcp_listener(raw_socket)
+            listeners.append(listener)
+    except BaseException:
+        for listener in listeners:
+            await listener.aclose()
+
+        raise
+
+    return MultiListener(listeners)
+
+
+async def create_unix_listener(
+    path: str | bytes | PathLike[Any],
+    *,
+    mode: int | None = None,
+    backlog: int = 65536,
+) -> SocketListener:
+    """
+    Create a UNIX socket listener.
+
+    Not available on Windows.
+
+    :param path: path of the socket
+    :param mode: permissions to set on the socket
+    :param backlog: maximum number of queued incoming connections (up to a maximum of
+        2**16, or 65536)
+    :return: a listener object
+
+    .. versionchanged:: 3.0
+        If a socket already exists on the file system in the given path, it will be
+        removed first.
+
+    """
+    backlog = min(backlog, 65536)
+    raw_socket = await setup_unix_local_socket(path, mode, socket.SOCK_STREAM)
+    try:
+        raw_socket.listen(backlog)
+        return get_async_backend().create_unix_listener(raw_socket)
+    except BaseException:
+        raw_socket.close()
+        raise
+
+
+async def create_udp_socket(
+    family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
+    *,
+    local_host: IPAddressType | None = None,
+    local_port: int = 0,
+    reuse_port: bool = False,
+) -> UDPSocket:
+    """
+    Create a UDP socket.
+
+    If ``port`` has been given, the socket will be bound to this port on the local
+    machine, making this socket suitable for providing UDP based services.
+
+    :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
+        determined from ``local_host`` if omitted
+    :param local_host: IP address or host name of the local interface to bind to
+    :param local_port: local port to bind to
+    :param reuse_port: ``True`` to allow multiple sockets to bind to the same
+        address/port (not supported on Windows)
+    :return: a UDP socket
+
+    """
+    if family is AddressFamily.AF_UNSPEC and not local_host:
+        raise ValueError('Either "family" or "local_host" must be given')
+
+    if local_host:
+        gai_res = await getaddrinfo(
+            str(local_host),
+            local_port,
+            family=family,
+            type=socket.SOCK_DGRAM,
+            flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
+        )
+        family = cast(AnyIPAddressFamily, gai_res[0][0])
+        local_address = gai_res[0][-1]
+    elif family is AddressFamily.AF_INET6:
+        local_address = ("::", 0)
+    else:
+        local_address = ("0.0.0.0", 0)
+
+    sock = await get_async_backend().create_udp_socket(
+        family, local_address, None, reuse_port
+    )
+    return cast(UDPSocket, sock)
+
+
+async def create_connected_udp_socket(
+    remote_host: IPAddressType,
+    remote_port: int,
+    *,
+    family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
+    local_host: IPAddressType | None = None,
+    local_port: int = 0,
+    reuse_port: bool = False,
+) -> ConnectedUDPSocket:
+    """
+    Create a connected UDP socket.
+
+    Connected UDP sockets can only communicate with the specified remote host/port, an
+    any packets sent from other sources are dropped.
+
+    :param remote_host: remote host to set as the default target
+    :param remote_port: port on the remote host to set as the default target
+    :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
+        determined from ``local_host`` or ``remote_host`` if omitted
+    :param local_host: IP address or host name of the local interface to bind to
+    :param local_port: local port to bind to
+    :param reuse_port: ``True`` to allow multiple sockets to bind to the same
+        address/port (not supported on Windows)
+    :return: a connected UDP socket
+
+    """
+    local_address = None
+    if local_host:
+        gai_res = await getaddrinfo(
+            str(local_host),
+            local_port,
+            family=family,
+            type=socket.SOCK_DGRAM,
+            flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
+        )
+        family = cast(AnyIPAddressFamily, gai_res[0][0])
+        local_address = gai_res[0][-1]
+
+    gai_res = await getaddrinfo(
+        str(remote_host), remote_port, family=family, type=socket.SOCK_DGRAM
+    )
+    family = cast(AnyIPAddressFamily, gai_res[0][0])
+    remote_address = gai_res[0][-1]
+
+    sock = await get_async_backend().create_udp_socket(
+        family, local_address, remote_address, reuse_port
+    )
+    return cast(ConnectedUDPSocket, sock)
+
+
+async def create_unix_datagram_socket(
+    *,
+    local_path: None | str | bytes | PathLike[Any] = None,
+    local_mode: int | None = None,
+) -> UNIXDatagramSocket:
+    """
+    Create a UNIX datagram socket.
+
+    Not available on Windows.
+
+    If ``local_path`` has been given, the socket will be bound to this path, making this
+    socket suitable for receiving datagrams from other processes. Other processes can
+    send datagrams to this socket only if ``local_path`` is set.
+
+    If a socket already exists on the file system in the ``local_path``, it will be
+    removed first.
+
+    :param local_path: the path on which to bind to
+    :param local_mode: permissions to set on the local socket
+    :return: a UNIX datagram socket
+
+    """
+    raw_socket = await setup_unix_local_socket(
+        local_path, local_mode, socket.SOCK_DGRAM
+    )
+    return await get_async_backend().create_unix_datagram_socket(raw_socket, None)
+
+
+async def create_connected_unix_datagram_socket(
+    remote_path: str | bytes | PathLike[Any],
+    *,
+    local_path: None | str | bytes | PathLike[Any] = None,
+    local_mode: int | None = None,
+) -> ConnectedUNIXDatagramSocket:
+    """
+    Create a connected UNIX datagram socket.
+
+    Connected datagram sockets can only communicate with the specified remote path.
+
+    If ``local_path`` has been given, the socket will be bound to this path, making
+    this socket suitable for receiving datagrams from other processes. Other processes
+    can send datagrams to this socket only if ``local_path`` is set.
+
+    If a socket already exists on the file system in the ``local_path``, it will be
+    removed first.
+
+    :param remote_path: the path to set as the default target
+    :param local_path: the path on which to bind to
+    :param local_mode: permissions to set on the local socket
+    :return: a connected UNIX datagram socket
+
+    """
+    remote_path = os.fspath(remote_path)
+    raw_socket = await setup_unix_local_socket(
+        local_path, local_mode, socket.SOCK_DGRAM
+    )
+    return await get_async_backend().create_unix_datagram_socket(
+        raw_socket, remote_path
+    )
+
+
+async def getaddrinfo(
+    host: bytes | str | None,
+    port: str | int | None,
+    *,
+    family: int | AddressFamily = 0,
+    type: int | SocketKind = 0,
+    proto: int = 0,
+    flags: int = 0,
+) -> list[tuple[AddressFamily, SocketKind, int, str, tuple[str, int]]]:
+    """
+    Look up a numeric IP address given a host name.
+
+    Internationalized domain names are translated according to the (non-transitional)
+    IDNA 2008 standard.
+
+    .. note:: 4-tuple IPv6 socket addresses are automatically converted to 2-tuples of
+        (host, port), unlike what :func:`socket.getaddrinfo` does.
+
+    :param host: host name
+    :param port: port number
+    :param family: socket family (`'AF_INET``, ...)
+    :param type: socket type (``SOCK_STREAM``, ...)
+    :param proto: protocol number
+    :param flags: flags to pass to upstream ``getaddrinfo()``
+    :return: list of tuples containing (family, type, proto, canonname, sockaddr)
+
+    .. seealso:: :func:`socket.getaddrinfo`
+
+    """
+    # Handle unicode hostnames
+    if isinstance(host, str):
+        try:
+            encoded_host: bytes | None = host.encode("ascii")
+        except UnicodeEncodeError:
+            import idna
+
+            encoded_host = idna.encode(host, uts46=True)
+    else:
+        encoded_host = host
+
+    gai_res = await get_async_backend().getaddrinfo(
+        encoded_host, port, family=family, type=type, proto=proto, flags=flags
+    )
+    return [
+        (family, type, proto, canonname, convert_ipv6_sockaddr(sockaddr))
+        for family, type, proto, canonname, sockaddr in gai_res
+        # filter out IPv6 results when IPv6 is disabled
+        if not isinstance(sockaddr[0], int)
+    ]
+
+
+def getnameinfo(sockaddr: IPSockAddrType, flags: int = 0) -> Awaitable[tuple[str, str]]:
+    """
+    Look up the host name of an IP address.
+
+    :param sockaddr: socket address (e.g. (ipaddress, port) for IPv4)
+    :param flags: flags to pass to upstream ``getnameinfo()``
+    :return: a tuple of (host name, service name)
+
+    .. seealso:: :func:`socket.getnameinfo`
+
+    """
+    return get_async_backend().getnameinfo(sockaddr, flags)
+
+
+@deprecated("This function is deprecated; use `wait_readable` instead")
+def wait_socket_readable(sock: socket.socket) -> Awaitable[None]:
+    """
+    .. deprecated:: 4.7.0
+       Use :func:`wait_readable` instead.
+
+    Wait until the given socket has data to be read.
+
+    .. warning:: Only use this on raw sockets that have not been wrapped by any higher
+        level constructs like socket streams!
+
+    :param sock: a socket object
+    :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
+        socket to become readable
+    :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
+        to become readable
+
+    """
+    return get_async_backend().wait_readable(sock.fileno())
+
+
+@deprecated("This function is deprecated; use `wait_writable` instead")
+def wait_socket_writable(sock: socket.socket) -> Awaitable[None]:
+    """
+    .. deprecated:: 4.7.0
+       Use :func:`wait_writable` instead.
+
+    Wait until the given socket can be written to.
+
+    This does **NOT** work on Windows when using the asyncio backend with a proactor
+    event loop (default on py3.8+).
+
+    .. warning:: Only use this on raw sockets that have not been wrapped by any higher
+        level constructs like socket streams!
+
+    :param sock: a socket object
+    :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
+        socket to become writable
+    :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
+        to become writable
+
+    """
+    return get_async_backend().wait_writable(sock.fileno())
+
+
+def wait_readable(obj: FileDescriptorLike) -> Awaitable[None]:
+    """
+    Wait until the given object has data to be read.
+
+    On Unix systems, ``obj`` must either be an integer file descriptor, or else an
+    object with a ``.fileno()`` method which returns an integer file descriptor. Any
+    kind of file descriptor can be passed, though the exact semantics will depend on
+    your kernel. For example, this probably won't do anything useful for on-disk files.
+
+    On Windows systems, ``obj`` must either be an integer ``SOCKET`` handle, or else an
+    object with a ``.fileno()`` method which returns an integer ``SOCKET`` handle. File
+    descriptors aren't supported, and neither are handles that refer to anything besides
+    a ``SOCKET``.
+
+    On backends where this functionality is not natively provided (asyncio
+    ``ProactorEventLoop`` on Windows), it is provided using a separate selector thread
+    which is set to shut down when the interpreter shuts down.
+
+    .. warning:: Don't use this on raw sockets that have been wrapped by any higher
+        level constructs like socket streams!
+
+    :param obj: an object with a ``.fileno()`` method or an integer handle
+    :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the
+        object to become readable
+    :raises ~anyio.BusyResourceError: if another task is already waiting for the object
+        to become readable
+
+    """
+    return get_async_backend().wait_readable(obj)
+
+
+def wait_writable(obj: FileDescriptorLike) -> Awaitable[None]:
+    """
+    Wait until the given object can be written to.
+
+    :param obj: an object with a ``.fileno()`` method or an integer handle
+    :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the
+        object to become writable
+    :raises ~anyio.BusyResourceError: if another task is already waiting for the object
+        to become writable
+
+    .. seealso:: See the documentation of :func:`wait_readable` for the definition of
+       ``obj`` and notes on backend compatibility.
+
+    .. warning:: Don't use this on raw sockets that have been wrapped by any higher
+        level constructs like socket streams!
+
+    """
+    return get_async_backend().wait_writable(obj)
+
+
+#
+# Private API
+#
+
+
+def convert_ipv6_sockaddr(
+    sockaddr: tuple[str, int, int, int] | tuple[str, int],
+) -> tuple[str, int]:
+    """
+    Convert a 4-tuple IPv6 socket address to a 2-tuple (address, port) format.
+
+    If the scope ID is nonzero, it is added to the address, separated with ``%``.
+    Otherwise the flow id and scope id are simply cut off from the tuple.
+    Any other kinds of socket addresses are returned as-is.
+
+    :param sockaddr: the result of :meth:`~socket.socket.getsockname`
+    :return: the converted socket address
+
+    """
+    # This is more complicated than it should be because of MyPy
+    if isinstance(sockaddr, tuple) and len(sockaddr) == 4:
+        host, port, flowinfo, scope_id = sockaddr
+        if scope_id:
+            # PyPy (as of v7.3.11) leaves the interface name in the result, so
+            # we discard it and only get the scope ID from the end
+            # (https://foss.heptapod.net/pypy/pypy/-/issues/3938)
+            host = host.split("%")[0]
+
+            # Add scope_id to the address
+            return f"{host}%{scope_id}", port
+        else:
+            return host, port
+    else:
+        return sockaddr
+
+
+async def setup_unix_local_socket(
+    path: None | str | bytes | PathLike[Any],
+    mode: int | None,
+    socktype: int,
+) -> socket.socket:
+    """
+    Create a UNIX local socket object, deleting the socket at the given path if it
+    exists.
+
+    Not available on Windows.
+
+    :param path: path of the socket
+    :param mode: permissions to set on the socket
+    :param socktype: socket.SOCK_STREAM or socket.SOCK_DGRAM
+
+    """
+    path_str: str | None
+    if path is not None:
+        path_str = os.fsdecode(path)
+
+        # Linux abstract namespace sockets aren't backed by a concrete file so skip stat call
+        if not path_str.startswith("\0"):
+            # Copied from pathlib...
+            try:
+                stat_result = os.stat(path)
+            except OSError as e:
+                if e.errno not in (
+                    errno.ENOENT,
+                    errno.ENOTDIR,
+                    errno.EBADF,
+                    errno.ELOOP,
+                ):
+                    raise
+            else:
+                if stat.S_ISSOCK(stat_result.st_mode):
+                    os.unlink(path)
+    else:
+        path_str = None
+
+    raw_socket = socket.socket(socket.AF_UNIX, socktype)
+    raw_socket.setblocking(False)
+
+    if path_str is not None:
+        try:
+            await to_thread.run_sync(raw_socket.bind, path_str, abandon_on_cancel=True)
+            if mode is not None:
+                await to_thread.run_sync(chmod, path_str, mode, abandon_on_cancel=True)
+        except BaseException:
+            raw_socket.close()
+            raise
+
+    return raw_socket