about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/dns/_ddr.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/dns/_ddr.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/_ddr.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/_ddr.py154
1 files changed, 154 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/_ddr.py b/.venv/lib/python3.12/site-packages/dns/_ddr.py
new file mode 100644
index 00000000..bf5c11eb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/_ddr.py
@@ -0,0 +1,154 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+#
+# Support for Discovery of Designated Resolvers
+
+import socket
+import time
+from urllib.parse import urlparse
+
+import dns.asyncbackend
+import dns.inet
+import dns.name
+import dns.nameserver
+import dns.query
+import dns.rdtypes.svcbbase
+
+# The special name of the local resolver when using DDR
+_local_resolver_name = dns.name.from_text("_dns.resolver.arpa")
+
+
+#
+# Processing is split up into I/O independent and I/O dependent parts to
+# make supporting sync and async versions easy.
+#
+
+
+class _SVCBInfo:
+    def __init__(self, bootstrap_address, port, hostname, nameservers):
+        self.bootstrap_address = bootstrap_address
+        self.port = port
+        self.hostname = hostname
+        self.nameservers = nameservers
+
+    def ddr_check_certificate(self, cert):
+        """Verify that the _SVCBInfo's address is in the cert's subjectAltName (SAN)"""
+        for name, value in cert["subjectAltName"]:
+            if name == "IP Address" and value == self.bootstrap_address:
+                return True
+        return False
+
+    def make_tls_context(self):
+        ssl = dns.query.ssl
+        ctx = ssl.create_default_context()
+        ctx.minimum_version = ssl.TLSVersion.TLSv1_2
+        return ctx
+
+    def ddr_tls_check_sync(self, lifetime):
+        ctx = self.make_tls_context()
+        expiration = time.time() + lifetime
+        with socket.create_connection(
+            (self.bootstrap_address, self.port), lifetime
+        ) as s:
+            with ctx.wrap_socket(s, server_hostname=self.hostname) as ts:
+                ts.settimeout(dns.query._remaining(expiration))
+                ts.do_handshake()
+                cert = ts.getpeercert()
+                return self.ddr_check_certificate(cert)
+
+    async def ddr_tls_check_async(self, lifetime, backend=None):
+        if backend is None:
+            backend = dns.asyncbackend.get_default_backend()
+        ctx = self.make_tls_context()
+        expiration = time.time() + lifetime
+        async with await backend.make_socket(
+            dns.inet.af_for_address(self.bootstrap_address),
+            socket.SOCK_STREAM,
+            0,
+            None,
+            (self.bootstrap_address, self.port),
+            lifetime,
+            ctx,
+            self.hostname,
+        ) as ts:
+            cert = await ts.getpeercert(dns.query._remaining(expiration))
+            return self.ddr_check_certificate(cert)
+
+
+def _extract_nameservers_from_svcb(answer):
+    bootstrap_address = answer.nameserver
+    if not dns.inet.is_address(bootstrap_address):
+        return []
+    infos = []
+    for rr in answer.rrset.processing_order():
+        nameservers = []
+        param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.ALPN)
+        if param is None:
+            continue
+        alpns = set(param.ids)
+        host = rr.target.to_text(omit_final_dot=True)
+        port = None
+        param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.PORT)
+        if param is not None:
+            port = param.port
+        # For now we ignore address hints and address resolution and always use the
+        # bootstrap address
+        if b"h2" in alpns:
+            param = rr.params.get(dns.rdtypes.svcbbase.ParamKey.DOHPATH)
+            if param is None or not param.value.endswith(b"{?dns}"):
+                continue
+            path = param.value[:-6].decode()
+            if not path.startswith("/"):
+                path = "/" + path
+            if port is None:
+                port = 443
+            url = f"https://{host}:{port}{path}"
+            # check the URL
+            try:
+                urlparse(url)
+                nameservers.append(dns.nameserver.DoHNameserver(url, bootstrap_address))
+            except Exception:
+                # continue processing other ALPN types
+                pass
+        if b"dot" in alpns:
+            if port is None:
+                port = 853
+            nameservers.append(
+                dns.nameserver.DoTNameserver(bootstrap_address, port, host)
+            )
+        if b"doq" in alpns:
+            if port is None:
+                port = 853
+            nameservers.append(
+                dns.nameserver.DoQNameserver(bootstrap_address, port, True, host)
+            )
+        if len(nameservers) > 0:
+            infos.append(_SVCBInfo(bootstrap_address, port, host, nameservers))
+    return infos
+
+
+def _get_nameservers_sync(answer, lifetime):
+    """Return a list of TLS-validated resolver nameservers extracted from an SVCB
+    answer."""
+    nameservers = []
+    infos = _extract_nameservers_from_svcb(answer)
+    for info in infos:
+        try:
+            if info.ddr_tls_check_sync(lifetime):
+                nameservers.extend(info.nameservers)
+        except Exception:
+            pass
+    return nameservers
+
+
+async def _get_nameservers_async(answer, lifetime):
+    """Return a list of TLS-validated resolver nameservers extracted from an SVCB
+    answer."""
+    nameservers = []
+    infos = _extract_nameservers_from_svcb(answer)
+    for info in infos:
+        try:
+            if await info.ddr_tls_check_async(lifetime):
+                nameservers.extend(info.nameservers)
+        except Exception:
+            pass
+    return nameservers