about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/aiohappyeyeballs/utils.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiohappyeyeballs/utils.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiohappyeyeballs/utils.py97
1 files changed, 97 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiohappyeyeballs/utils.py b/.venv/lib/python3.12/site-packages/aiohappyeyeballs/utils.py
new file mode 100644
index 00000000..ea29adb9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiohappyeyeballs/utils.py
@@ -0,0 +1,97 @@
+"""Utility functions for aiohappyeyeballs."""
+
+import ipaddress
+import socket
+from typing import Dict, List, Optional, Tuple, Union
+
+from .types import AddrInfoType
+
+
+def addr_to_addr_infos(
+    addr: Optional[
+        Union[Tuple[str, int, int, int], Tuple[str, int, int], Tuple[str, int]]
+    ],
+) -> Optional[List[AddrInfoType]]:
+    """Convert an address tuple to a list of addr_info tuples."""
+    if addr is None:
+        return None
+    host = addr[0]
+    port = addr[1]
+    is_ipv6 = ":" in host
+    if is_ipv6:
+        flowinfo = 0
+        scopeid = 0
+        addr_len = len(addr)
+        if addr_len >= 4:
+            scopeid = addr[3]  # type: ignore[misc]
+        if addr_len >= 3:
+            flowinfo = addr[2]  # type: ignore[misc]
+        addr = (host, port, flowinfo, scopeid)
+        family = socket.AF_INET6
+    else:
+        addr = (host, port)
+        family = socket.AF_INET
+    return [(family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr)]
+
+
+def pop_addr_infos_interleave(
+    addr_infos: List[AddrInfoType], interleave: Optional[int] = None
+) -> None:
+    """
+    Pop addr_info from the list of addr_infos by family up to interleave times.
+
+    The interleave parameter is used to know how many addr_infos for
+    each family should be popped of the top of the list.
+    """
+    seen: Dict[int, int] = {}
+    if interleave is None:
+        interleave = 1
+    to_remove: List[AddrInfoType] = []
+    for addr_info in addr_infos:
+        family = addr_info[0]
+        if family not in seen:
+            seen[family] = 0
+        if seen[family] < interleave:
+            to_remove.append(addr_info)
+        seen[family] += 1
+    for addr_info in to_remove:
+        addr_infos.remove(addr_info)
+
+
+def _addr_tuple_to_ip_address(
+    addr: Union[Tuple[str, int], Tuple[str, int, int, int]],
+) -> Union[
+    Tuple[ipaddress.IPv4Address, int], Tuple[ipaddress.IPv6Address, int, int, int]
+]:
+    """Convert an address tuple to an IPv4Address."""
+    return (ipaddress.ip_address(addr[0]), *addr[1:])
+
+
+def remove_addr_infos(
+    addr_infos: List[AddrInfoType],
+    addr: Union[Tuple[str, int], Tuple[str, int, int, int]],
+) -> None:
+    """
+    Remove an address from the list of addr_infos.
+
+    The addr value is typically the return value of
+    sock.getpeername().
+    """
+    bad_addrs_infos: List[AddrInfoType] = []
+    for addr_info in addr_infos:
+        if addr_info[-1] == addr:
+            bad_addrs_infos.append(addr_info)
+    if bad_addrs_infos:
+        for bad_addr_info in bad_addrs_infos:
+            addr_infos.remove(bad_addr_info)
+        return
+    # Slow path in case addr is formatted differently
+    match_addr = _addr_tuple_to_ip_address(addr)
+    for addr_info in addr_infos:
+        if match_addr == _addr_tuple_to_ip_address(addr_info[-1]):
+            bad_addrs_infos.append(addr_info)
+    if bad_addrs_infos:
+        for bad_addr_info in bad_addrs_infos:
+            addr_infos.remove(bad_addr_info)
+        return
+    raise ValueError(f"Address {addr} not found in addr_infos")