aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/anyio/_core/_sockets.py
blob: 054bcddaa57adbd848f475b9e85513ba927069a1 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
from __future__ import annotations

import errno
import os
import socket
import ssl
import stat
import sys
from collections.abc import Awaitable
from ipaddress import IPv6Address, ip_address
from os import PathLike, chmod
from socket import AddressFamily, SocketKind
from typing import TYPE_CHECKING, Any, Literal, cast, overload

from .. import to_thread
from ..abc import (
    ConnectedUDPSocket,
    ConnectedUNIXDatagramSocket,
    IPAddressType,
    IPSockAddrType,
    SocketListener,
    SocketStream,
    UDPSocket,
    UNIXDatagramSocket,
    UNIXSocketStream,
)
from ..streams.stapled import MultiListener
from ..streams.tls import TLSStream
from ._eventloop import get_async_backend
from ._resources import aclose_forcefully
from ._synchronization import Event
from ._tasks import create_task_group, move_on_after

if TYPE_CHECKING:
    from _typeshed import FileDescriptorLike
else:
    FileDescriptorLike = object

if sys.version_info < (3, 11):
    from exceptiongroup import ExceptionGroup

if sys.version_info < (3, 13):
    from typing_extensions import deprecated
else:
    from warnings import deprecated

IPPROTO_IPV6 = getattr(socket, "IPPROTO_IPV6", 41)  # https://bugs.python.org/issue29515

AnyIPAddressFamily = Literal[
    AddressFamily.AF_UNSPEC, AddressFamily.AF_INET, AddressFamily.AF_INET6
]
IPAddressFamily = Literal[AddressFamily.AF_INET, AddressFamily.AF_INET6]


# tls_hostname given
@overload
async def connect_tcp(
    remote_host: IPAddressType,
    remote_port: int,
    *,
    local_host: IPAddressType | None = ...,
    ssl_context: ssl.SSLContext | None = ...,
    tls_standard_compatible: bool = ...,
    tls_hostname: str,
    happy_eyeballs_delay: float = ...,
) -> TLSStream: ...


# ssl_context given
@overload
async def connect_tcp(
    remote_host: IPAddressType,
    remote_port: int,
    *,
    local_host: IPAddressType | None = ...,
    ssl_context: ssl.SSLContext,
    tls_standard_compatible: bool = ...,
    tls_hostname: str | None = ...,
    happy_eyeballs_delay: float = ...,
) -> TLSStream: ...


# tls=True
@overload
async def connect_tcp(
    remote_host: IPAddressType,
    remote_port: int,
    *,
    local_host: IPAddressType | None = ...,
    tls: Literal[True],
    ssl_context: ssl.SSLContext | None = ...,
    tls_standard_compatible: bool = ...,
    tls_hostname: str | None = ...,
    happy_eyeballs_delay: float = ...,
) -> TLSStream: ...


# tls=False
@overload
async def connect_tcp(
    remote_host: IPAddressType,
    remote_port: int,
    *,
    local_host: IPAddressType | None = ...,
    tls: Literal[False],
    ssl_context: ssl.SSLContext | None = ...,
    tls_standard_compatible: bool = ...,
    tls_hostname: str | None = ...,
    happy_eyeballs_delay: float = ...,
) -> SocketStream: ...


# No TLS arguments
@overload
async def connect_tcp(
    remote_host: IPAddressType,
    remote_port: int,
    *,
    local_host: IPAddressType | None = ...,
    happy_eyeballs_delay: float = ...,
) -> SocketStream: ...


async def connect_tcp(
    remote_host: IPAddressType,
    remote_port: int,
    *,
    local_host: IPAddressType | None = None,
    tls: bool = False,
    ssl_context: ssl.SSLContext | None = None,
    tls_standard_compatible: bool = True,
    tls_hostname: str | None = None,
    happy_eyeballs_delay: float = 0.25,
) -> SocketStream | TLSStream:
    """
    Connect to a host using the TCP protocol.

    This function implements the stateless version of the Happy Eyeballs algorithm (RFC
    6555). If ``remote_host`` is a host name that resolves to multiple IP addresses,
    each one is tried until one connection attempt succeeds. If the first attempt does
    not connected within 250 milliseconds, a second attempt is started using the next
    address in the list, and so on. On IPv6 enabled systems, an IPv6 address (if
    available) is tried first.

    When the connection has been established, a TLS handshake will be done if either
    ``ssl_context`` or ``tls_hostname`` is not ``None``, or if ``tls`` is ``True``.

    :param remote_host: the IP address or host name to connect to
    :param remote_port: port on the target host to connect to
    :param local_host: the interface address or name to bind the socket to before
        connecting
    :param tls: ``True`` to do a TLS handshake with the connected stream and return a
        :class:`~anyio.streams.tls.TLSStream` instead
    :param ssl_context: the SSL context object to use (if omitted, a default context is
        created)
    :param tls_standard_compatible: If ``True``, performs the TLS shutdown handshake
        before closing the stream and requires that the server does this as well.
        Otherwise, :exc:`~ssl.SSLEOFError` may be raised during reads from the stream.
        Some protocols, such as HTTP, require this option to be ``False``.
        See :meth:`~ssl.SSLContext.wrap_socket` for details.
    :param tls_hostname: host name to check the server certificate against (defaults to
        the value of ``remote_host``)
    :param happy_eyeballs_delay: delay (in seconds) before starting the next connection
        attempt
    :return: a socket stream object if no TLS handshake was done, otherwise a TLS stream
    :raises OSError: if the connection attempt fails

    """
    # Placed here due to https://github.com/python/mypy/issues/7057
    connected_stream: SocketStream | None = None

    async def try_connect(remote_host: str, event: Event) -> None:
        nonlocal connected_stream
        try:
            stream = await asynclib.connect_tcp(remote_host, remote_port, local_address)
        except OSError as exc:
            oserrors.append(exc)
            return
        else:
            if connected_stream is None:
                connected_stream = stream
                tg.cancel_scope.cancel()
            else:
                await stream.aclose()
        finally:
            event.set()

    asynclib = get_async_backend()
    local_address: IPSockAddrType | None = None
    family = socket.AF_UNSPEC
    if local_host:
        gai_res = await getaddrinfo(str(local_host), None)
        family, *_, local_address = gai_res[0]

    target_host = str(remote_host)
    try:
        addr_obj = ip_address(remote_host)
    except ValueError:
        addr_obj = None

    if addr_obj is not None:
        if isinstance(addr_obj, IPv6Address):
            target_addrs = [(socket.AF_INET6, addr_obj.compressed)]
        else:
            target_addrs = [(socket.AF_INET, addr_obj.compressed)]
    else:
        # getaddrinfo() will raise an exception if name resolution fails
        gai_res = await getaddrinfo(
            target_host, remote_port, family=family, type=socket.SOCK_STREAM
        )

        # Organize the list so that the first address is an IPv6 address (if available)
        # and the second one is an IPv4 addresses. The rest can be in whatever order.
        v6_found = v4_found = False
        target_addrs = []
        for af, *rest, sa in gai_res:
            if af == socket.AF_INET6 and not v6_found:
                v6_found = True
                target_addrs.insert(0, (af, sa[0]))
            elif af == socket.AF_INET and not v4_found and v6_found:
                v4_found = True
                target_addrs.insert(1, (af, sa[0]))
            else:
                target_addrs.append((af, sa[0]))

    oserrors: list[OSError] = []
    try:
        async with create_task_group() as tg:
            for i, (af, addr) in enumerate(target_addrs):
                event = Event()
                tg.start_soon(try_connect, addr, event)
                with move_on_after(happy_eyeballs_delay):
                    await event.wait()

        if connected_stream is None:
            cause = (
                oserrors[0]
                if len(oserrors) == 1
                else ExceptionGroup("multiple connection attempts failed", oserrors)
            )
            raise OSError("All connection attempts failed") from cause
    finally:
        oserrors.clear()

    if tls or tls_hostname or ssl_context:
        try:
            return await TLSStream.wrap(
                connected_stream,
                server_side=False,
                hostname=tls_hostname or str(remote_host),
                ssl_context=ssl_context,
                standard_compatible=tls_standard_compatible,
            )
        except BaseException:
            await aclose_forcefully(connected_stream)
            raise

    return connected_stream


async def connect_unix(path: str | bytes | PathLike[Any]) -> UNIXSocketStream:
    """
    Connect to the given UNIX socket.

    Not available on Windows.

    :param path: path to the socket
    :return: a socket stream object

    """
    path = os.fspath(path)
    return await get_async_backend().connect_unix(path)


async def create_tcp_listener(
    *,
    local_host: IPAddressType | None = None,
    local_port: int = 0,
    family: AnyIPAddressFamily = socket.AddressFamily.AF_UNSPEC,
    backlog: int = 65536,
    reuse_port: bool = False,
) -> MultiListener[SocketStream]:
    """
    Create a TCP socket listener.

    :param local_port: port number to listen on
    :param local_host: IP address of the interface to listen on. If omitted, listen on
        all IPv4 and IPv6 interfaces. To listen on all interfaces on a specific address
        family, use ``0.0.0.0`` for IPv4 or ``::`` for IPv6.
    :param family: address family (used if ``local_host`` was omitted)
    :param backlog: maximum number of queued incoming connections (up to a maximum of
        2**16, or 65536)
    :param reuse_port: ``True`` to allow multiple sockets to bind to the same
        address/port (not supported on Windows)
    :return: a list of listener objects

    """
    asynclib = get_async_backend()
    backlog = min(backlog, 65536)
    local_host = str(local_host) if local_host is not None else None
    gai_res = await getaddrinfo(
        local_host,
        local_port,
        family=family,
        type=socket.SocketKind.SOCK_STREAM if sys.platform == "win32" else 0,
        flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
    )
    listeners: list[SocketListener] = []
    try:
        # The set() is here to work around a glibc bug:
        # https://sourceware.org/bugzilla/show_bug.cgi?id=14969
        sockaddr: tuple[str, int] | tuple[str, int, int, int]
        for fam, kind, *_, sockaddr in sorted(set(gai_res)):
            # Workaround for an uvloop bug where we don't get the correct scope ID for
            # IPv6 link-local addresses when passing type=socket.SOCK_STREAM to
            # getaddrinfo(): https://github.com/MagicStack/uvloop/issues/539
            if sys.platform != "win32" and kind is not SocketKind.SOCK_STREAM:
                continue

            raw_socket = socket.socket(fam)
            raw_socket.setblocking(False)

            # For Windows, enable exclusive address use. For others, enable address
            # reuse.
            if sys.platform == "win32":
                raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
            else:
                raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

            if reuse_port:
                raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)

            # If only IPv6 was requested, disable dual stack operation
            if fam == socket.AF_INET6:
                raw_socket.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)

                # Workaround for #554
                if "%" in sockaddr[0]:
                    addr, scope_id = sockaddr[0].split("%", 1)
                    sockaddr = (addr, sockaddr[1], 0, int(scope_id))

            raw_socket.bind(sockaddr)
            raw_socket.listen(backlog)
            listener = asynclib.create_tcp_listener(raw_socket)
            listeners.append(listener)
    except BaseException:
        for listener in listeners:
            await listener.aclose()

        raise

    return MultiListener(listeners)


async def create_unix_listener(
    path: str | bytes | PathLike[Any],
    *,
    mode: int | None = None,
    backlog: int = 65536,
) -> SocketListener:
    """
    Create a UNIX socket listener.

    Not available on Windows.

    :param path: path of the socket
    :param mode: permissions to set on the socket
    :param backlog: maximum number of queued incoming connections (up to a maximum of
        2**16, or 65536)
    :return: a listener object

    .. versionchanged:: 3.0
        If a socket already exists on the file system in the given path, it will be
        removed first.

    """
    backlog = min(backlog, 65536)
    raw_socket = await setup_unix_local_socket(path, mode, socket.SOCK_STREAM)
    try:
        raw_socket.listen(backlog)
        return get_async_backend().create_unix_listener(raw_socket)
    except BaseException:
        raw_socket.close()
        raise


async def create_udp_socket(
    family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
    *,
    local_host: IPAddressType | None = None,
    local_port: int = 0,
    reuse_port: bool = False,
) -> UDPSocket:
    """
    Create a UDP socket.

    If ``port`` has been given, the socket will be bound to this port on the local
    machine, making this socket suitable for providing UDP based services.

    :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
        determined from ``local_host`` if omitted
    :param local_host: IP address or host name of the local interface to bind to
    :param local_port: local port to bind to
    :param reuse_port: ``True`` to allow multiple sockets to bind to the same
        address/port (not supported on Windows)
    :return: a UDP socket

    """
    if family is AddressFamily.AF_UNSPEC and not local_host:
        raise ValueError('Either "family" or "local_host" must be given')

    if local_host:
        gai_res = await getaddrinfo(
            str(local_host),
            local_port,
            family=family,
            type=socket.SOCK_DGRAM,
            flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
        )
        family = cast(AnyIPAddressFamily, gai_res[0][0])
        local_address = gai_res[0][-1]
    elif family is AddressFamily.AF_INET6:
        local_address = ("::", 0)
    else:
        local_address = ("0.0.0.0", 0)

    sock = await get_async_backend().create_udp_socket(
        family, local_address, None, reuse_port
    )
    return cast(UDPSocket, sock)


async def create_connected_udp_socket(
    remote_host: IPAddressType,
    remote_port: int,
    *,
    family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
    local_host: IPAddressType | None = None,
    local_port: int = 0,
    reuse_port: bool = False,
) -> ConnectedUDPSocket:
    """
    Create a connected UDP socket.

    Connected UDP sockets can only communicate with the specified remote host/port, an
    any packets sent from other sources are dropped.

    :param remote_host: remote host to set as the default target
    :param remote_port: port on the remote host to set as the default target
    :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
        determined from ``local_host`` or ``remote_host`` if omitted
    :param local_host: IP address or host name of the local interface to bind to
    :param local_port: local port to bind to
    :param reuse_port: ``True`` to allow multiple sockets to bind to the same
        address/port (not supported on Windows)
    :return: a connected UDP socket

    """
    local_address = None
    if local_host:
        gai_res = await getaddrinfo(
            str(local_host),
            local_port,
            family=family,
            type=socket.SOCK_DGRAM,
            flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
        )
        family = cast(AnyIPAddressFamily, gai_res[0][0])
        local_address = gai_res[0][-1]

    gai_res = await getaddrinfo(
        str(remote_host), remote_port, family=family, type=socket.SOCK_DGRAM
    )
    family = cast(AnyIPAddressFamily, gai_res[0][0])
    remote_address = gai_res[0][-1]

    sock = await get_async_backend().create_udp_socket(
        family, local_address, remote_address, reuse_port
    )
    return cast(ConnectedUDPSocket, sock)


async def create_unix_datagram_socket(
    *,
    local_path: None | str | bytes | PathLike[Any] = None,
    local_mode: int | None = None,
) -> UNIXDatagramSocket:
    """
    Create a UNIX datagram socket.

    Not available on Windows.

    If ``local_path`` has been given, the socket will be bound to this path, making this
    socket suitable for receiving datagrams from other processes. Other processes can
    send datagrams to this socket only if ``local_path`` is set.

    If a socket already exists on the file system in the ``local_path``, it will be
    removed first.

    :param local_path: the path on which to bind to
    :param local_mode: permissions to set on the local socket
    :return: a UNIX datagram socket

    """
    raw_socket = await setup_unix_local_socket(
        local_path, local_mode, socket.SOCK_DGRAM
    )
    return await get_async_backend().create_unix_datagram_socket(raw_socket, None)


async def create_connected_unix_datagram_socket(
    remote_path: str | bytes | PathLike[Any],
    *,
    local_path: None | str | bytes | PathLike[Any] = None,
    local_mode: int | None = None,
) -> ConnectedUNIXDatagramSocket:
    """
    Create a connected UNIX datagram socket.

    Connected datagram sockets can only communicate with the specified remote path.

    If ``local_path`` has been given, the socket will be bound to this path, making
    this socket suitable for receiving datagrams from other processes. Other processes
    can send datagrams to this socket only if ``local_path`` is set.

    If a socket already exists on the file system in the ``local_path``, it will be
    removed first.

    :param remote_path: the path to set as the default target
    :param local_path: the path on which to bind to
    :param local_mode: permissions to set on the local socket
    :return: a connected UNIX datagram socket

    """
    remote_path = os.fspath(remote_path)
    raw_socket = await setup_unix_local_socket(
        local_path, local_mode, socket.SOCK_DGRAM
    )
    return await get_async_backend().create_unix_datagram_socket(
        raw_socket, remote_path
    )


async def getaddrinfo(
    host: bytes | str | None,
    port: str | int | None,
    *,
    family: int | AddressFamily = 0,
    type: int | SocketKind = 0,
    proto: int = 0,
    flags: int = 0,
) -> list[tuple[AddressFamily, SocketKind, int, str, tuple[str, int]]]:
    """
    Look up a numeric IP address given a host name.

    Internationalized domain names are translated according to the (non-transitional)
    IDNA 2008 standard.

    .. note:: 4-tuple IPv6 socket addresses are automatically converted to 2-tuples of
        (host, port), unlike what :func:`socket.getaddrinfo` does.

    :param host: host name
    :param port: port number
    :param family: socket family (`'AF_INET``, ...)
    :param type: socket type (``SOCK_STREAM``, ...)
    :param proto: protocol number
    :param flags: flags to pass to upstream ``getaddrinfo()``
    :return: list of tuples containing (family, type, proto, canonname, sockaddr)

    .. seealso:: :func:`socket.getaddrinfo`

    """
    # Handle unicode hostnames
    if isinstance(host, str):
        try:
            encoded_host: bytes | None = host.encode("ascii")
        except UnicodeEncodeError:
            import idna

            encoded_host = idna.encode(host, uts46=True)
    else:
        encoded_host = host

    gai_res = await get_async_backend().getaddrinfo(
        encoded_host, port, family=family, type=type, proto=proto, flags=flags
    )
    return [
        (family, type, proto, canonname, convert_ipv6_sockaddr(sockaddr))
        for family, type, proto, canonname, sockaddr in gai_res
        # filter out IPv6 results when IPv6 is disabled
        if not isinstance(sockaddr[0], int)
    ]


def getnameinfo(sockaddr: IPSockAddrType, flags: int = 0) -> Awaitable[tuple[str, str]]:
    """
    Look up the host name of an IP address.

    :param sockaddr: socket address (e.g. (ipaddress, port) for IPv4)
    :param flags: flags to pass to upstream ``getnameinfo()``
    :return: a tuple of (host name, service name)

    .. seealso:: :func:`socket.getnameinfo`

    """
    return get_async_backend().getnameinfo(sockaddr, flags)


@deprecated("This function is deprecated; use `wait_readable` instead")
def wait_socket_readable(sock: socket.socket) -> Awaitable[None]:
    """
    .. deprecated:: 4.7.0
       Use :func:`wait_readable` instead.

    Wait until the given socket has data to be read.

    .. warning:: Only use this on raw sockets that have not been wrapped by any higher
        level constructs like socket streams!

    :param sock: a socket object
    :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
        socket to become readable
    :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
        to become readable

    """
    return get_async_backend().wait_readable(sock.fileno())


@deprecated("This function is deprecated; use `wait_writable` instead")
def wait_socket_writable(sock: socket.socket) -> Awaitable[None]:
    """
    .. deprecated:: 4.7.0
       Use :func:`wait_writable` instead.

    Wait until the given socket can be written to.

    This does **NOT** work on Windows when using the asyncio backend with a proactor
    event loop (default on py3.8+).

    .. warning:: Only use this on raw sockets that have not been wrapped by any higher
        level constructs like socket streams!

    :param sock: a socket object
    :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
        socket to become writable
    :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
        to become writable

    """
    return get_async_backend().wait_writable(sock.fileno())


def wait_readable(obj: FileDescriptorLike) -> Awaitable[None]:
    """
    Wait until the given object has data to be read.

    On Unix systems, ``obj`` must either be an integer file descriptor, or else an
    object with a ``.fileno()`` method which returns an integer file descriptor. Any
    kind of file descriptor can be passed, though the exact semantics will depend on
    your kernel. For example, this probably won't do anything useful for on-disk files.

    On Windows systems, ``obj`` must either be an integer ``SOCKET`` handle, or else an
    object with a ``.fileno()`` method which returns an integer ``SOCKET`` handle. File
    descriptors aren't supported, and neither are handles that refer to anything besides
    a ``SOCKET``.

    On backends where this functionality is not natively provided (asyncio
    ``ProactorEventLoop`` on Windows), it is provided using a separate selector thread
    which is set to shut down when the interpreter shuts down.

    .. warning:: Don't use this on raw sockets that have been wrapped by any higher
        level constructs like socket streams!

    :param obj: an object with a ``.fileno()`` method or an integer handle
    :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the
        object to become readable
    :raises ~anyio.BusyResourceError: if another task is already waiting for the object
        to become readable

    """
    return get_async_backend().wait_readable(obj)


def wait_writable(obj: FileDescriptorLike) -> Awaitable[None]:
    """
    Wait until the given object can be written to.

    :param obj: an object with a ``.fileno()`` method or an integer handle
    :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the
        object to become writable
    :raises ~anyio.BusyResourceError: if another task is already waiting for the object
        to become writable

    .. seealso:: See the documentation of :func:`wait_readable` for the definition of
       ``obj`` and notes on backend compatibility.

    .. warning:: Don't use this on raw sockets that have been wrapped by any higher
        level constructs like socket streams!

    """
    return get_async_backend().wait_writable(obj)


#
# Private API
#


def convert_ipv6_sockaddr(
    sockaddr: tuple[str, int, int, int] | tuple[str, int],
) -> tuple[str, int]:
    """
    Convert a 4-tuple IPv6 socket address to a 2-tuple (address, port) format.

    If the scope ID is nonzero, it is added to the address, separated with ``%``.
    Otherwise the flow id and scope id are simply cut off from the tuple.
    Any other kinds of socket addresses are returned as-is.

    :param sockaddr: the result of :meth:`~socket.socket.getsockname`
    :return: the converted socket address

    """
    # This is more complicated than it should be because of MyPy
    if isinstance(sockaddr, tuple) and len(sockaddr) == 4:
        host, port, flowinfo, scope_id = sockaddr
        if scope_id:
            # PyPy (as of v7.3.11) leaves the interface name in the result, so
            # we discard it and only get the scope ID from the end
            # (https://foss.heptapod.net/pypy/pypy/-/issues/3938)
            host = host.split("%")[0]

            # Add scope_id to the address
            return f"{host}%{scope_id}", port
        else:
            return host, port
    else:
        return sockaddr


async def setup_unix_local_socket(
    path: None | str | bytes | PathLike[Any],
    mode: int | None,
    socktype: int,
) -> socket.socket:
    """
    Create a UNIX local socket object, deleting the socket at the given path if it
    exists.

    Not available on Windows.

    :param path: path of the socket
    :param mode: permissions to set on the socket
    :param socktype: socket.SOCK_STREAM or socket.SOCK_DGRAM

    """
    path_str: str | None
    if path is not None:
        path_str = os.fsdecode(path)

        # Linux abstract namespace sockets aren't backed by a concrete file so skip stat call
        if not path_str.startswith("\0"):
            # Copied from pathlib...
            try:
                stat_result = os.stat(path)
            except OSError as e:
                if e.errno not in (
                    errno.ENOENT,
                    errno.ENOTDIR,
                    errno.EBADF,
                    errno.ELOOP,
                ):
                    raise
            else:
                if stat.S_ISSOCK(stat_result.st_mode):
                    os.unlink(path)
    else:
        path_str = None

    raw_socket = socket.socket(socket.AF_UNIX, socktype)
    raw_socket.setblocking(False)

    if path_str is not None:
        try:
            await to_thread.run_sync(raw_socket.bind, path_str, abandon_on_cancel=True)
            if mode is not None:
                await to_thread.run_sync(chmod, path_str, mode, abandon_on_cancel=True)
        except BaseException:
            raw_socket.close()
            raise

    return raw_socket