aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py')
-rw-r--r--.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py792
1 files changed, 792 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py b/.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py
new file mode 100644
index 00000000..054bcdda
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py
@@ -0,0 +1,792 @@
+from __future__ import annotations
+
+import errno
+import os
+import socket
+import ssl
+import stat
+import sys
+from collections.abc import Awaitable
+from ipaddress import IPv6Address, ip_address
+from os import PathLike, chmod
+from socket import AddressFamily, SocketKind
+from typing import TYPE_CHECKING, Any, Literal, cast, overload
+
+from .. import to_thread
+from ..abc import (
+ ConnectedUDPSocket,
+ ConnectedUNIXDatagramSocket,
+ IPAddressType,
+ IPSockAddrType,
+ SocketListener,
+ SocketStream,
+ UDPSocket,
+ UNIXDatagramSocket,
+ UNIXSocketStream,
+)
+from ..streams.stapled import MultiListener
+from ..streams.tls import TLSStream
+from ._eventloop import get_async_backend
+from ._resources import aclose_forcefully
+from ._synchronization import Event
+from ._tasks import create_task_group, move_on_after
+
+if TYPE_CHECKING:
+ from _typeshed import FileDescriptorLike
+else:
+ FileDescriptorLike = object
+
+if sys.version_info < (3, 11):
+ from exceptiongroup import ExceptionGroup
+
+if sys.version_info < (3, 13):
+ from typing_extensions import deprecated
+else:
+ from warnings import deprecated
+
+IPPROTO_IPV6 = getattr(socket, "IPPROTO_IPV6", 41) # https://bugs.python.org/issue29515
+
+AnyIPAddressFamily = Literal[
+ AddressFamily.AF_UNSPEC, AddressFamily.AF_INET, AddressFamily.AF_INET6
+]
+IPAddressFamily = Literal[AddressFamily.AF_INET, AddressFamily.AF_INET6]
+
+
+# tls_hostname given
+@overload
+async def connect_tcp(
+ remote_host: IPAddressType,
+ remote_port: int,
+ *,
+ local_host: IPAddressType | None = ...,
+ ssl_context: ssl.SSLContext | None = ...,
+ tls_standard_compatible: bool = ...,
+ tls_hostname: str,
+ happy_eyeballs_delay: float = ...,
+) -> TLSStream: ...
+
+
+# ssl_context given
+@overload
+async def connect_tcp(
+ remote_host: IPAddressType,
+ remote_port: int,
+ *,
+ local_host: IPAddressType | None = ...,
+ ssl_context: ssl.SSLContext,
+ tls_standard_compatible: bool = ...,
+ tls_hostname: str | None = ...,
+ happy_eyeballs_delay: float = ...,
+) -> TLSStream: ...
+
+
+# tls=True
+@overload
+async def connect_tcp(
+ remote_host: IPAddressType,
+ remote_port: int,
+ *,
+ local_host: IPAddressType | None = ...,
+ tls: Literal[True],
+ ssl_context: ssl.SSLContext | None = ...,
+ tls_standard_compatible: bool = ...,
+ tls_hostname: str | None = ...,
+ happy_eyeballs_delay: float = ...,
+) -> TLSStream: ...
+
+
+# tls=False
+@overload
+async def connect_tcp(
+ remote_host: IPAddressType,
+ remote_port: int,
+ *,
+ local_host: IPAddressType | None = ...,
+ tls: Literal[False],
+ ssl_context: ssl.SSLContext | None = ...,
+ tls_standard_compatible: bool = ...,
+ tls_hostname: str | None = ...,
+ happy_eyeballs_delay: float = ...,
+) -> SocketStream: ...
+
+
+# No TLS arguments
+@overload
+async def connect_tcp(
+ remote_host: IPAddressType,
+ remote_port: int,
+ *,
+ local_host: IPAddressType | None = ...,
+ happy_eyeballs_delay: float = ...,
+) -> SocketStream: ...
+
+
+async def connect_tcp(
+ remote_host: IPAddressType,
+ remote_port: int,
+ *,
+ local_host: IPAddressType | None = None,
+ tls: bool = False,
+ ssl_context: ssl.SSLContext | None = None,
+ tls_standard_compatible: bool = True,
+ tls_hostname: str | None = None,
+ happy_eyeballs_delay: float = 0.25,
+) -> SocketStream | TLSStream:
+ """
+ Connect to a host using the TCP protocol.
+
+ This function implements the stateless version of the Happy Eyeballs algorithm (RFC
+ 6555). If ``remote_host`` is a host name that resolves to multiple IP addresses,
+ each one is tried until one connection attempt succeeds. If the first attempt does
+ not connected within 250 milliseconds, a second attempt is started using the next
+ address in the list, and so on. On IPv6 enabled systems, an IPv6 address (if
+ available) is tried first.
+
+ When the connection has been established, a TLS handshake will be done if either
+ ``ssl_context`` or ``tls_hostname`` is not ``None``, or if ``tls`` is ``True``.
+
+ :param remote_host: the IP address or host name to connect to
+ :param remote_port: port on the target host to connect to
+ :param local_host: the interface address or name to bind the socket to before
+ connecting
+ :param tls: ``True`` to do a TLS handshake with the connected stream and return a
+ :class:`~anyio.streams.tls.TLSStream` instead
+ :param ssl_context: the SSL context object to use (if omitted, a default context is
+ created)
+ :param tls_standard_compatible: If ``True``, performs the TLS shutdown handshake
+ before closing the stream and requires that the server does this as well.
+ Otherwise, :exc:`~ssl.SSLEOFError` may be raised during reads from the stream.
+ Some protocols, such as HTTP, require this option to be ``False``.
+ See :meth:`~ssl.SSLContext.wrap_socket` for details.
+ :param tls_hostname: host name to check the server certificate against (defaults to
+ the value of ``remote_host``)
+ :param happy_eyeballs_delay: delay (in seconds) before starting the next connection
+ attempt
+ :return: a socket stream object if no TLS handshake was done, otherwise a TLS stream
+ :raises OSError: if the connection attempt fails
+
+ """
+ # Placed here due to https://github.com/python/mypy/issues/7057
+ connected_stream: SocketStream | None = None
+
+ async def try_connect(remote_host: str, event: Event) -> None:
+ nonlocal connected_stream
+ try:
+ stream = await asynclib.connect_tcp(remote_host, remote_port, local_address)
+ except OSError as exc:
+ oserrors.append(exc)
+ return
+ else:
+ if connected_stream is None:
+ connected_stream = stream
+ tg.cancel_scope.cancel()
+ else:
+ await stream.aclose()
+ finally:
+ event.set()
+
+ asynclib = get_async_backend()
+ local_address: IPSockAddrType | None = None
+ family = socket.AF_UNSPEC
+ if local_host:
+ gai_res = await getaddrinfo(str(local_host), None)
+ family, *_, local_address = gai_res[0]
+
+ target_host = str(remote_host)
+ try:
+ addr_obj = ip_address(remote_host)
+ except ValueError:
+ addr_obj = None
+
+ if addr_obj is not None:
+ if isinstance(addr_obj, IPv6Address):
+ target_addrs = [(socket.AF_INET6, addr_obj.compressed)]
+ else:
+ target_addrs = [(socket.AF_INET, addr_obj.compressed)]
+ else:
+ # getaddrinfo() will raise an exception if name resolution fails
+ gai_res = await getaddrinfo(
+ target_host, remote_port, family=family, type=socket.SOCK_STREAM
+ )
+
+ # Organize the list so that the first address is an IPv6 address (if available)
+ # and the second one is an IPv4 addresses. The rest can be in whatever order.
+ v6_found = v4_found = False
+ target_addrs = []
+ for af, *rest, sa in gai_res:
+ if af == socket.AF_INET6 and not v6_found:
+ v6_found = True
+ target_addrs.insert(0, (af, sa[0]))
+ elif af == socket.AF_INET and not v4_found and v6_found:
+ v4_found = True
+ target_addrs.insert(1, (af, sa[0]))
+ else:
+ target_addrs.append((af, sa[0]))
+
+ oserrors: list[OSError] = []
+ try:
+ async with create_task_group() as tg:
+ for i, (af, addr) in enumerate(target_addrs):
+ event = Event()
+ tg.start_soon(try_connect, addr, event)
+ with move_on_after(happy_eyeballs_delay):
+ await event.wait()
+
+ if connected_stream is None:
+ cause = (
+ oserrors[0]
+ if len(oserrors) == 1
+ else ExceptionGroup("multiple connection attempts failed", oserrors)
+ )
+ raise OSError("All connection attempts failed") from cause
+ finally:
+ oserrors.clear()
+
+ if tls or tls_hostname or ssl_context:
+ try:
+ return await TLSStream.wrap(
+ connected_stream,
+ server_side=False,
+ hostname=tls_hostname or str(remote_host),
+ ssl_context=ssl_context,
+ standard_compatible=tls_standard_compatible,
+ )
+ except BaseException:
+ await aclose_forcefully(connected_stream)
+ raise
+
+ return connected_stream
+
+
+async def connect_unix(path: str | bytes | PathLike[Any]) -> UNIXSocketStream:
+ """
+ Connect to the given UNIX socket.
+
+ Not available on Windows.
+
+ :param path: path to the socket
+ :return: a socket stream object
+
+ """
+ path = os.fspath(path)
+ return await get_async_backend().connect_unix(path)
+
+
+async def create_tcp_listener(
+ *,
+ local_host: IPAddressType | None = None,
+ local_port: int = 0,
+ family: AnyIPAddressFamily = socket.AddressFamily.AF_UNSPEC,
+ backlog: int = 65536,
+ reuse_port: bool = False,
+) -> MultiListener[SocketStream]:
+ """
+ Create a TCP socket listener.
+
+ :param local_port: port number to listen on
+ :param local_host: IP address of the interface to listen on. If omitted, listen on
+ all IPv4 and IPv6 interfaces. To listen on all interfaces on a specific address
+ family, use ``0.0.0.0`` for IPv4 or ``::`` for IPv6.
+ :param family: address family (used if ``local_host`` was omitted)
+ :param backlog: maximum number of queued incoming connections (up to a maximum of
+ 2**16, or 65536)
+ :param reuse_port: ``True`` to allow multiple sockets to bind to the same
+ address/port (not supported on Windows)
+ :return: a list of listener objects
+
+ """
+ asynclib = get_async_backend()
+ backlog = min(backlog, 65536)
+ local_host = str(local_host) if local_host is not None else None
+ gai_res = await getaddrinfo(
+ local_host,
+ local_port,
+ family=family,
+ type=socket.SocketKind.SOCK_STREAM if sys.platform == "win32" else 0,
+ flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
+ )
+ listeners: list[SocketListener] = []
+ try:
+ # The set() is here to work around a glibc bug:
+ # https://sourceware.org/bugzilla/show_bug.cgi?id=14969
+ sockaddr: tuple[str, int] | tuple[str, int, int, int]
+ for fam, kind, *_, sockaddr in sorted(set(gai_res)):
+ # Workaround for an uvloop bug where we don't get the correct scope ID for
+ # IPv6 link-local addresses when passing type=socket.SOCK_STREAM to
+ # getaddrinfo(): https://github.com/MagicStack/uvloop/issues/539
+ if sys.platform != "win32" and kind is not SocketKind.SOCK_STREAM:
+ continue
+
+ raw_socket = socket.socket(fam)
+ raw_socket.setblocking(False)
+
+ # For Windows, enable exclusive address use. For others, enable address
+ # reuse.
+ if sys.platform == "win32":
+ raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+ else:
+ raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ if reuse_port:
+ raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
+
+ # If only IPv6 was requested, disable dual stack operation
+ if fam == socket.AF_INET6:
+ raw_socket.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
+
+ # Workaround for #554
+ if "%" in sockaddr[0]:
+ addr, scope_id = sockaddr[0].split("%", 1)
+ sockaddr = (addr, sockaddr[1], 0, int(scope_id))
+
+ raw_socket.bind(sockaddr)
+ raw_socket.listen(backlog)
+ listener = asynclib.create_tcp_listener(raw_socket)
+ listeners.append(listener)
+ except BaseException:
+ for listener in listeners:
+ await listener.aclose()
+
+ raise
+
+ return MultiListener(listeners)
+
+
+async def create_unix_listener(
+ path: str | bytes | PathLike[Any],
+ *,
+ mode: int | None = None,
+ backlog: int = 65536,
+) -> SocketListener:
+ """
+ Create a UNIX socket listener.
+
+ Not available on Windows.
+
+ :param path: path of the socket
+ :param mode: permissions to set on the socket
+ :param backlog: maximum number of queued incoming connections (up to a maximum of
+ 2**16, or 65536)
+ :return: a listener object
+
+ .. versionchanged:: 3.0
+ If a socket already exists on the file system in the given path, it will be
+ removed first.
+
+ """
+ backlog = min(backlog, 65536)
+ raw_socket = await setup_unix_local_socket(path, mode, socket.SOCK_STREAM)
+ try:
+ raw_socket.listen(backlog)
+ return get_async_backend().create_unix_listener(raw_socket)
+ except BaseException:
+ raw_socket.close()
+ raise
+
+
+async def create_udp_socket(
+ family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
+ *,
+ local_host: IPAddressType | None = None,
+ local_port: int = 0,
+ reuse_port: bool = False,
+) -> UDPSocket:
+ """
+ Create a UDP socket.
+
+ If ``port`` has been given, the socket will be bound to this port on the local
+ machine, making this socket suitable for providing UDP based services.
+
+ :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
+ determined from ``local_host`` if omitted
+ :param local_host: IP address or host name of the local interface to bind to
+ :param local_port: local port to bind to
+ :param reuse_port: ``True`` to allow multiple sockets to bind to the same
+ address/port (not supported on Windows)
+ :return: a UDP socket
+
+ """
+ if family is AddressFamily.AF_UNSPEC and not local_host:
+ raise ValueError('Either "family" or "local_host" must be given')
+
+ if local_host:
+ gai_res = await getaddrinfo(
+ str(local_host),
+ local_port,
+ family=family,
+ type=socket.SOCK_DGRAM,
+ flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
+ )
+ family = cast(AnyIPAddressFamily, gai_res[0][0])
+ local_address = gai_res[0][-1]
+ elif family is AddressFamily.AF_INET6:
+ local_address = ("::", 0)
+ else:
+ local_address = ("0.0.0.0", 0)
+
+ sock = await get_async_backend().create_udp_socket(
+ family, local_address, None, reuse_port
+ )
+ return cast(UDPSocket, sock)
+
+
+async def create_connected_udp_socket(
+ remote_host: IPAddressType,
+ remote_port: int,
+ *,
+ family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
+ local_host: IPAddressType | None = None,
+ local_port: int = 0,
+ reuse_port: bool = False,
+) -> ConnectedUDPSocket:
+ """
+ Create a connected UDP socket.
+
+ Connected UDP sockets can only communicate with the specified remote host/port, an
+ any packets sent from other sources are dropped.
+
+ :param remote_host: remote host to set as the default target
+ :param remote_port: port on the remote host to set as the default target
+ :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
+ determined from ``local_host`` or ``remote_host`` if omitted
+ :param local_host: IP address or host name of the local interface to bind to
+ :param local_port: local port to bind to
+ :param reuse_port: ``True`` to allow multiple sockets to bind to the same
+ address/port (not supported on Windows)
+ :return: a connected UDP socket
+
+ """
+ local_address = None
+ if local_host:
+ gai_res = await getaddrinfo(
+ str(local_host),
+ local_port,
+ family=family,
+ type=socket.SOCK_DGRAM,
+ flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
+ )
+ family = cast(AnyIPAddressFamily, gai_res[0][0])
+ local_address = gai_res[0][-1]
+
+ gai_res = await getaddrinfo(
+ str(remote_host), remote_port, family=family, type=socket.SOCK_DGRAM
+ )
+ family = cast(AnyIPAddressFamily, gai_res[0][0])
+ remote_address = gai_res[0][-1]
+
+ sock = await get_async_backend().create_udp_socket(
+ family, local_address, remote_address, reuse_port
+ )
+ return cast(ConnectedUDPSocket, sock)
+
+
+async def create_unix_datagram_socket(
+ *,
+ local_path: None | str | bytes | PathLike[Any] = None,
+ local_mode: int | None = None,
+) -> UNIXDatagramSocket:
+ """
+ Create a UNIX datagram socket.
+
+ Not available on Windows.
+
+ If ``local_path`` has been given, the socket will be bound to this path, making this
+ socket suitable for receiving datagrams from other processes. Other processes can
+ send datagrams to this socket only if ``local_path`` is set.
+
+ If a socket already exists on the file system in the ``local_path``, it will be
+ removed first.
+
+ :param local_path: the path on which to bind to
+ :param local_mode: permissions to set on the local socket
+ :return: a UNIX datagram socket
+
+ """
+ raw_socket = await setup_unix_local_socket(
+ local_path, local_mode, socket.SOCK_DGRAM
+ )
+ return await get_async_backend().create_unix_datagram_socket(raw_socket, None)
+
+
+async def create_connected_unix_datagram_socket(
+ remote_path: str | bytes | PathLike[Any],
+ *,
+ local_path: None | str | bytes | PathLike[Any] = None,
+ local_mode: int | None = None,
+) -> ConnectedUNIXDatagramSocket:
+ """
+ Create a connected UNIX datagram socket.
+
+ Connected datagram sockets can only communicate with the specified remote path.
+
+ If ``local_path`` has been given, the socket will be bound to this path, making
+ this socket suitable for receiving datagrams from other processes. Other processes
+ can send datagrams to this socket only if ``local_path`` is set.
+
+ If a socket already exists on the file system in the ``local_path``, it will be
+ removed first.
+
+ :param remote_path: the path to set as the default target
+ :param local_path: the path on which to bind to
+ :param local_mode: permissions to set on the local socket
+ :return: a connected UNIX datagram socket
+
+ """
+ remote_path = os.fspath(remote_path)
+ raw_socket = await setup_unix_local_socket(
+ local_path, local_mode, socket.SOCK_DGRAM
+ )
+ return await get_async_backend().create_unix_datagram_socket(
+ raw_socket, remote_path
+ )
+
+
+async def getaddrinfo(
+ host: bytes | str | None,
+ port: str | int | None,
+ *,
+ family: int | AddressFamily = 0,
+ type: int | SocketKind = 0,
+ proto: int = 0,
+ flags: int = 0,
+) -> list[tuple[AddressFamily, SocketKind, int, str, tuple[str, int]]]:
+ """
+ Look up a numeric IP address given a host name.
+
+ Internationalized domain names are translated according to the (non-transitional)
+ IDNA 2008 standard.
+
+ .. note:: 4-tuple IPv6 socket addresses are automatically converted to 2-tuples of
+ (host, port), unlike what :func:`socket.getaddrinfo` does.
+
+ :param host: host name
+ :param port: port number
+ :param family: socket family (`'AF_INET``, ...)
+ :param type: socket type (``SOCK_STREAM``, ...)
+ :param proto: protocol number
+ :param flags: flags to pass to upstream ``getaddrinfo()``
+ :return: list of tuples containing (family, type, proto, canonname, sockaddr)
+
+ .. seealso:: :func:`socket.getaddrinfo`
+
+ """
+ # Handle unicode hostnames
+ if isinstance(host, str):
+ try:
+ encoded_host: bytes | None = host.encode("ascii")
+ except UnicodeEncodeError:
+ import idna
+
+ encoded_host = idna.encode(host, uts46=True)
+ else:
+ encoded_host = host
+
+ gai_res = await get_async_backend().getaddrinfo(
+ encoded_host, port, family=family, type=type, proto=proto, flags=flags
+ )
+ return [
+ (family, type, proto, canonname, convert_ipv6_sockaddr(sockaddr))
+ for family, type, proto, canonname, sockaddr in gai_res
+ # filter out IPv6 results when IPv6 is disabled
+ if not isinstance(sockaddr[0], int)
+ ]
+
+
+def getnameinfo(sockaddr: IPSockAddrType, flags: int = 0) -> Awaitable[tuple[str, str]]:
+ """
+ Look up the host name of an IP address.
+
+ :param sockaddr: socket address (e.g. (ipaddress, port) for IPv4)
+ :param flags: flags to pass to upstream ``getnameinfo()``
+ :return: a tuple of (host name, service name)
+
+ .. seealso:: :func:`socket.getnameinfo`
+
+ """
+ return get_async_backend().getnameinfo(sockaddr, flags)
+
+
+@deprecated("This function is deprecated; use `wait_readable` instead")
+def wait_socket_readable(sock: socket.socket) -> Awaitable[None]:
+ """
+ .. deprecated:: 4.7.0
+ Use :func:`wait_readable` instead.
+
+ Wait until the given socket has data to be read.
+
+ .. warning:: Only use this on raw sockets that have not been wrapped by any higher
+ level constructs like socket streams!
+
+ :param sock: a socket object
+ :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
+ socket to become readable
+ :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
+ to become readable
+
+ """
+ return get_async_backend().wait_readable(sock.fileno())
+
+
+@deprecated("This function is deprecated; use `wait_writable` instead")
+def wait_socket_writable(sock: socket.socket) -> Awaitable[None]:
+ """
+ .. deprecated:: 4.7.0
+ Use :func:`wait_writable` instead.
+
+ Wait until the given socket can be written to.
+
+ This does **NOT** work on Windows when using the asyncio backend with a proactor
+ event loop (default on py3.8+).
+
+ .. warning:: Only use this on raw sockets that have not been wrapped by any higher
+ level constructs like socket streams!
+
+ :param sock: a socket object
+ :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
+ socket to become writable
+ :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
+ to become writable
+
+ """
+ return get_async_backend().wait_writable(sock.fileno())
+
+
+def wait_readable(obj: FileDescriptorLike) -> Awaitable[None]:
+ """
+ Wait until the given object has data to be read.
+
+ On Unix systems, ``obj`` must either be an integer file descriptor, or else an
+ object with a ``.fileno()`` method which returns an integer file descriptor. Any
+ kind of file descriptor can be passed, though the exact semantics will depend on
+ your kernel. For example, this probably won't do anything useful for on-disk files.
+
+ On Windows systems, ``obj`` must either be an integer ``SOCKET`` handle, or else an
+ object with a ``.fileno()`` method which returns an integer ``SOCKET`` handle. File
+ descriptors aren't supported, and neither are handles that refer to anything besides
+ a ``SOCKET``.
+
+ On backends where this functionality is not natively provided (asyncio
+ ``ProactorEventLoop`` on Windows), it is provided using a separate selector thread
+ which is set to shut down when the interpreter shuts down.
+
+ .. warning:: Don't use this on raw sockets that have been wrapped by any higher
+ level constructs like socket streams!
+
+ :param obj: an object with a ``.fileno()`` method or an integer handle
+ :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the
+ object to become readable
+ :raises ~anyio.BusyResourceError: if another task is already waiting for the object
+ to become readable
+
+ """
+ return get_async_backend().wait_readable(obj)
+
+
+def wait_writable(obj: FileDescriptorLike) -> Awaitable[None]:
+ """
+ Wait until the given object can be written to.
+
+ :param obj: an object with a ``.fileno()`` method or an integer handle
+ :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the
+ object to become writable
+ :raises ~anyio.BusyResourceError: if another task is already waiting for the object
+ to become writable
+
+ .. seealso:: See the documentation of :func:`wait_readable` for the definition of
+ ``obj`` and notes on backend compatibility.
+
+ .. warning:: Don't use this on raw sockets that have been wrapped by any higher
+ level constructs like socket streams!
+
+ """
+ return get_async_backend().wait_writable(obj)
+
+
+#
+# Private API
+#
+
+
+def convert_ipv6_sockaddr(
+ sockaddr: tuple[str, int, int, int] | tuple[str, int],
+) -> tuple[str, int]:
+ """
+ Convert a 4-tuple IPv6 socket address to a 2-tuple (address, port) format.
+
+ If the scope ID is nonzero, it is added to the address, separated with ``%``.
+ Otherwise the flow id and scope id are simply cut off from the tuple.
+ Any other kinds of socket addresses are returned as-is.
+
+ :param sockaddr: the result of :meth:`~socket.socket.getsockname`
+ :return: the converted socket address
+
+ """
+ # This is more complicated than it should be because of MyPy
+ if isinstance(sockaddr, tuple) and len(sockaddr) == 4:
+ host, port, flowinfo, scope_id = sockaddr
+ if scope_id:
+ # PyPy (as of v7.3.11) leaves the interface name in the result, so
+ # we discard it and only get the scope ID from the end
+ # (https://foss.heptapod.net/pypy/pypy/-/issues/3938)
+ host = host.split("%")[0]
+
+ # Add scope_id to the address
+ return f"{host}%{scope_id}", port
+ else:
+ return host, port
+ else:
+ return sockaddr
+
+
+async def setup_unix_local_socket(
+ path: None | str | bytes | PathLike[Any],
+ mode: int | None,
+ socktype: int,
+) -> socket.socket:
+ """
+ Create a UNIX local socket object, deleting the socket at the given path if it
+ exists.
+
+ Not available on Windows.
+
+ :param path: path of the socket
+ :param mode: permissions to set on the socket
+ :param socktype: socket.SOCK_STREAM or socket.SOCK_DGRAM
+
+ """
+ path_str: str | None
+ if path is not None:
+ path_str = os.fsdecode(path)
+
+ # Linux abstract namespace sockets aren't backed by a concrete file so skip stat call
+ if not path_str.startswith("\0"):
+ # Copied from pathlib...
+ try:
+ stat_result = os.stat(path)
+ except OSError as e:
+ if e.errno not in (
+ errno.ENOENT,
+ errno.ENOTDIR,
+ errno.EBADF,
+ errno.ELOOP,
+ ):
+ raise
+ else:
+ if stat.S_ISSOCK(stat_result.st_mode):
+ os.unlink(path)
+ else:
+ path_str = None
+
+ raw_socket = socket.socket(socket.AF_UNIX, socktype)
+ raw_socket.setblocking(False)
+
+ if path_str is not None:
+ try:
+ await to_thread.run_sync(raw_socket.bind, path_str, abandon_on_cancel=True)
+ if mode is not None:
+ await to_thread.run_sync(chmod, path_str, mode, abandon_on_cancel=True)
+ except BaseException:
+ raw_socket.close()
+ raise
+
+ return raw_socket