aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/dns/asyncresolver.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/dns/asyncresolver.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/asyncresolver.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/asyncresolver.py475
1 files changed, 475 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/asyncresolver.py b/.venv/lib/python3.12/site-packages/dns/asyncresolver.py
new file mode 100644
index 00000000..8f5e062a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/asyncresolver.py
@@ -0,0 +1,475 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+# Copyright (C) 2003-2017 Nominum, Inc.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose with or without fee is hereby granted,
+# provided that the above copyright notice and this permission notice
+# appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""Asynchronous DNS stub resolver."""
+
+import socket
+import time
+from typing import Any, Dict, List, Optional, Union
+
+import dns._ddr
+import dns.asyncbackend
+import dns.asyncquery
+import dns.exception
+import dns.name
+import dns.query
+import dns.rdataclass
+import dns.rdatatype
+import dns.resolver # lgtm[py/import-and-import-from]
+
+# import some resolver symbols for brevity
+from dns.resolver import NXDOMAIN, NoAnswer, NoRootSOA, NotAbsolute
+
+# for indentation purposes below
+_udp = dns.asyncquery.udp
+_tcp = dns.asyncquery.tcp
+
+
+class Resolver(dns.resolver.BaseResolver):
+ """Asynchronous DNS stub resolver."""
+
+ async def resolve(
+ self,
+ qname: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
+ rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+ tcp: bool = False,
+ source: Optional[str] = None,
+ raise_on_no_answer: bool = True,
+ source_port: int = 0,
+ lifetime: Optional[float] = None,
+ search: Optional[bool] = None,
+ backend: Optional[dns.asyncbackend.Backend] = None,
+ ) -> dns.resolver.Answer:
+ """Query nameservers asynchronously to find the answer to the question.
+
+ *backend*, a ``dns.asyncbackend.Backend``, or ``None``. If ``None``,
+ the default, then dnspython will use the default backend.
+
+ See :py:func:`dns.resolver.Resolver.resolve()` for the
+ documentation of the other parameters, exceptions, and return
+ type of this method.
+ """
+
+ resolution = dns.resolver._Resolution(
+ self, qname, rdtype, rdclass, tcp, raise_on_no_answer, search
+ )
+ if not backend:
+ backend = dns.asyncbackend.get_default_backend()
+ start = time.time()
+ while True:
+ (request, answer) = resolution.next_request()
+ # Note we need to say "if answer is not None" and not just
+ # "if answer" because answer implements __len__, and python
+ # will call that. We want to return if we have an answer
+ # object, including in cases where its length is 0.
+ if answer is not None:
+ # cache hit!
+ return answer
+ assert request is not None # needed for type checking
+ done = False
+ while not done:
+ (nameserver, tcp, backoff) = resolution.next_nameserver()
+ if backoff:
+ await backend.sleep(backoff)
+ timeout = self._compute_timeout(start, lifetime, resolution.errors)
+ try:
+ response = await nameserver.async_query(
+ request,
+ timeout=timeout,
+ source=source,
+ source_port=source_port,
+ max_size=tcp,
+ backend=backend,
+ )
+ except Exception as ex:
+ (_, done) = resolution.query_result(None, ex)
+ continue
+ (answer, done) = resolution.query_result(response, None)
+ # Note we need to say "if answer is not None" and not just
+ # "if answer" because answer implements __len__, and python
+ # will call that. We want to return if we have an answer
+ # object, including in cases where its length is 0.
+ if answer is not None:
+ return answer
+
+ async def resolve_address(
+ self, ipaddr: str, *args: Any, **kwargs: Any
+ ) -> dns.resolver.Answer:
+ """Use an asynchronous resolver to run a reverse query for PTR
+ records.
+
+ This utilizes the resolve() method to perform a PTR lookup on the
+ specified IP address.
+
+ *ipaddr*, a ``str``, the IPv4 or IPv6 address you want to get
+ the PTR record for.
+
+ All other arguments that can be passed to the resolve() function
+ except for rdtype and rdclass are also supported by this
+ function.
+
+ """
+ # We make a modified kwargs for type checking happiness, as otherwise
+ # we get a legit warning about possibly having rdtype and rdclass
+ # in the kwargs more than once.
+ modified_kwargs: Dict[str, Any] = {}
+ modified_kwargs.update(kwargs)
+ modified_kwargs["rdtype"] = dns.rdatatype.PTR
+ modified_kwargs["rdclass"] = dns.rdataclass.IN
+ return await self.resolve(
+ dns.reversename.from_address(ipaddr), *args, **modified_kwargs
+ )
+
+ async def resolve_name(
+ self,
+ name: Union[dns.name.Name, str],
+ family: int = socket.AF_UNSPEC,
+ **kwargs: Any,
+ ) -> dns.resolver.HostAnswers:
+ """Use an asynchronous resolver to query for address records.
+
+ This utilizes the resolve() method to perform A and/or AAAA lookups on
+ the specified name.
+
+ *qname*, a ``dns.name.Name`` or ``str``, the name to resolve.
+
+ *family*, an ``int``, the address family. If socket.AF_UNSPEC
+ (the default), both A and AAAA records will be retrieved.
+
+ All other arguments that can be passed to the resolve() function
+ except for rdtype and rdclass are also supported by this
+ function.
+ """
+ # We make a modified kwargs for type checking happiness, as otherwise
+ # we get a legit warning about possibly having rdtype and rdclass
+ # in the kwargs more than once.
+ modified_kwargs: Dict[str, Any] = {}
+ modified_kwargs.update(kwargs)
+ modified_kwargs.pop("rdtype", None)
+ modified_kwargs["rdclass"] = dns.rdataclass.IN
+
+ if family == socket.AF_INET:
+ v4 = await self.resolve(name, dns.rdatatype.A, **modified_kwargs)
+ return dns.resolver.HostAnswers.make(v4=v4)
+ elif family == socket.AF_INET6:
+ v6 = await self.resolve(name, dns.rdatatype.AAAA, **modified_kwargs)
+ return dns.resolver.HostAnswers.make(v6=v6)
+ elif family != socket.AF_UNSPEC:
+ raise NotImplementedError(f"unknown address family {family}")
+
+ raise_on_no_answer = modified_kwargs.pop("raise_on_no_answer", True)
+ lifetime = modified_kwargs.pop("lifetime", None)
+ start = time.time()
+ v6 = await self.resolve(
+ name,
+ dns.rdatatype.AAAA,
+ raise_on_no_answer=False,
+ lifetime=self._compute_timeout(start, lifetime),
+ **modified_kwargs,
+ )
+ # Note that setting name ensures we query the same name
+ # for A as we did for AAAA. (This is just in case search lists
+ # are active by default in the resolver configuration and
+ # we might be talking to a server that says NXDOMAIN when it
+ # wants to say NOERROR no data.
+ name = v6.qname
+ v4 = await self.resolve(
+ name,
+ dns.rdatatype.A,
+ raise_on_no_answer=False,
+ lifetime=self._compute_timeout(start, lifetime),
+ **modified_kwargs,
+ )
+ answers = dns.resolver.HostAnswers.make(
+ v6=v6, v4=v4, add_empty=not raise_on_no_answer
+ )
+ if not answers:
+ raise NoAnswer(response=v6.response)
+ return answers
+
+ # pylint: disable=redefined-outer-name
+
+ async def canonical_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name:
+ """Determine the canonical name of *name*.
+
+ The canonical name is the name the resolver uses for queries
+ after all CNAME and DNAME renamings have been applied.
+
+ *name*, a ``dns.name.Name`` or ``str``, the query name.
+
+ This method can raise any exception that ``resolve()`` can
+ raise, other than ``dns.resolver.NoAnswer`` and
+ ``dns.resolver.NXDOMAIN``.
+
+ Returns a ``dns.name.Name``.
+ """
+ try:
+ answer = await self.resolve(name, raise_on_no_answer=False)
+ canonical_name = answer.canonical_name
+ except dns.resolver.NXDOMAIN as e:
+ canonical_name = e.canonical_name
+ return canonical_name
+
+ async def try_ddr(self, lifetime: float = 5.0) -> None:
+ """Try to update the resolver's nameservers using Discovery of Designated
+ Resolvers (DDR). If successful, the resolver will subsequently use
+ DNS-over-HTTPS or DNS-over-TLS for future queries.
+
+ *lifetime*, a float, is the maximum time to spend attempting DDR. The default
+ is 5 seconds.
+
+ If the SVCB query is successful and results in a non-empty list of nameservers,
+ then the resolver's nameservers are set to the returned servers in priority
+ order.
+
+ The current implementation does not use any address hints from the SVCB record,
+ nor does it resolve addresses for the SCVB target name, rather it assumes that
+ the bootstrap nameserver will always be one of the addresses and uses it.
+ A future revision to the code may offer fuller support. The code verifies that
+ the bootstrap nameserver is in the Subject Alternative Name field of the
+ TLS certficate.
+ """
+ try:
+ expiration = time.time() + lifetime
+ answer = await self.resolve(
+ dns._ddr._local_resolver_name, "svcb", lifetime=lifetime
+ )
+ timeout = dns.query._remaining(expiration)
+ nameservers = await dns._ddr._get_nameservers_async(answer, timeout)
+ if len(nameservers) > 0:
+ self.nameservers = nameservers
+ except Exception:
+ pass
+
+
+default_resolver = None
+
+
+def get_default_resolver() -> Resolver:
+ """Get the default asynchronous resolver, initializing it if necessary."""
+ if default_resolver is None:
+ reset_default_resolver()
+ assert default_resolver is not None
+ return default_resolver
+
+
+def reset_default_resolver() -> None:
+ """Re-initialize default asynchronous resolver.
+
+ Note that the resolver configuration (i.e. /etc/resolv.conf on UNIX
+ systems) will be re-read immediately.
+ """
+
+ global default_resolver
+ default_resolver = Resolver()
+
+
+async def resolve(
+ qname: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
+ rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+ tcp: bool = False,
+ source: Optional[str] = None,
+ raise_on_no_answer: bool = True,
+ source_port: int = 0,
+ lifetime: Optional[float] = None,
+ search: Optional[bool] = None,
+ backend: Optional[dns.asyncbackend.Backend] = None,
+) -> dns.resolver.Answer:
+ """Query nameservers asynchronously to find the answer to the question.
+
+ This is a convenience function that uses the default resolver
+ object to make the query.
+
+ See :py:func:`dns.asyncresolver.Resolver.resolve` for more
+ information on the parameters.
+ """
+
+ return await get_default_resolver().resolve(
+ qname,
+ rdtype,
+ rdclass,
+ tcp,
+ source,
+ raise_on_no_answer,
+ source_port,
+ lifetime,
+ search,
+ backend,
+ )
+
+
+async def resolve_address(
+ ipaddr: str, *args: Any, **kwargs: Any
+) -> dns.resolver.Answer:
+ """Use a resolver to run a reverse query for PTR records.
+
+ See :py:func:`dns.asyncresolver.Resolver.resolve_address` for more
+ information on the parameters.
+ """
+
+ return await get_default_resolver().resolve_address(ipaddr, *args, **kwargs)
+
+
+async def resolve_name(
+ name: Union[dns.name.Name, str], family: int = socket.AF_UNSPEC, **kwargs: Any
+) -> dns.resolver.HostAnswers:
+ """Use a resolver to asynchronously query for address records.
+
+ See :py:func:`dns.asyncresolver.Resolver.resolve_name` for more
+ information on the parameters.
+ """
+
+ return await get_default_resolver().resolve_name(name, family, **kwargs)
+
+
+async def canonical_name(name: Union[dns.name.Name, str]) -> dns.name.Name:
+ """Determine the canonical name of *name*.
+
+ See :py:func:`dns.resolver.Resolver.canonical_name` for more
+ information on the parameters and possible exceptions.
+ """
+
+ return await get_default_resolver().canonical_name(name)
+
+
+async def try_ddr(timeout: float = 5.0) -> None:
+ """Try to update the default resolver's nameservers using Discovery of Designated
+ Resolvers (DDR). If successful, the resolver will subsequently use
+ DNS-over-HTTPS or DNS-over-TLS for future queries.
+
+ See :py:func:`dns.resolver.Resolver.try_ddr` for more information.
+ """
+ return await get_default_resolver().try_ddr(timeout)
+
+
+async def zone_for_name(
+ name: Union[dns.name.Name, str],
+ rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
+ tcp: bool = False,
+ resolver: Optional[Resolver] = None,
+ backend: Optional[dns.asyncbackend.Backend] = None,
+) -> dns.name.Name:
+ """Find the name of the zone which contains the specified name.
+
+ See :py:func:`dns.resolver.Resolver.zone_for_name` for more
+ information on the parameters and possible exceptions.
+ """
+
+ if isinstance(name, str):
+ name = dns.name.from_text(name, dns.name.root)
+ if resolver is None:
+ resolver = get_default_resolver()
+ if not name.is_absolute():
+ raise NotAbsolute(name)
+ while True:
+ try:
+ answer = await resolver.resolve(
+ name, dns.rdatatype.SOA, rdclass, tcp, backend=backend
+ )
+ assert answer.rrset is not None
+ if answer.rrset.name == name:
+ return name
+ # otherwise we were CNAMEd or DNAMEd and need to look higher
+ except (NXDOMAIN, NoAnswer):
+ pass
+ try:
+ name = name.parent()
+ except dns.name.NoParent: # pragma: no cover
+ raise NoRootSOA
+
+
+async def make_resolver_at(
+ where: Union[dns.name.Name, str],
+ port: int = 53,
+ family: int = socket.AF_UNSPEC,
+ resolver: Optional[Resolver] = None,
+) -> Resolver:
+ """Make a stub resolver using the specified destination as the full resolver.
+
+ *where*, a ``dns.name.Name`` or ``str`` the domain name or IP address of the
+ full resolver.
+
+ *port*, an ``int``, the port to use. If not specified, the default is 53.
+
+ *family*, an ``int``, the address family to use. This parameter is used if
+ *where* is not an address. The default is ``socket.AF_UNSPEC`` in which case
+ the first address returned by ``resolve_name()`` will be used, otherwise the
+ first address of the specified family will be used.
+
+ *resolver*, a ``dns.asyncresolver.Resolver`` or ``None``, the resolver to use for
+ resolution of hostnames. If not specified, the default resolver will be used.
+
+ Returns a ``dns.resolver.Resolver`` or raises an exception.
+ """
+ if resolver is None:
+ resolver = get_default_resolver()
+ nameservers: List[Union[str, dns.nameserver.Nameserver]] = []
+ if isinstance(where, str) and dns.inet.is_address(where):
+ nameservers.append(dns.nameserver.Do53Nameserver(where, port))
+ else:
+ answers = await resolver.resolve_name(where, family)
+ for address in answers.addresses():
+ nameservers.append(dns.nameserver.Do53Nameserver(address, port))
+ res = dns.asyncresolver.Resolver(configure=False)
+ res.nameservers = nameservers
+ return res
+
+
+async def resolve_at(
+ where: Union[dns.name.Name, str],
+ qname: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.A,
+ rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+ tcp: bool = False,
+ source: Optional[str] = None,
+ raise_on_no_answer: bool = True,
+ source_port: int = 0,
+ lifetime: Optional[float] = None,
+ search: Optional[bool] = None,
+ backend: Optional[dns.asyncbackend.Backend] = None,
+ port: int = 53,
+ family: int = socket.AF_UNSPEC,
+ resolver: Optional[Resolver] = None,
+) -> dns.resolver.Answer:
+ """Query nameservers to find the answer to the question.
+
+ This is a convenience function that calls ``dns.asyncresolver.make_resolver_at()``
+ to make a resolver, and then uses it to resolve the query.
+
+ See ``dns.asyncresolver.Resolver.resolve`` for more information on the resolution
+ parameters, and ``dns.asyncresolver.make_resolver_at`` for information about the
+ resolver parameters *where*, *port*, *family*, and *resolver*.
+
+ If making more than one query, it is more efficient to call
+ ``dns.asyncresolver.make_resolver_at()`` and then use that resolver for the queries
+ instead of calling ``resolve_at()`` multiple times.
+ """
+ res = await make_resolver_at(where, port, family, resolver)
+ return await res.resolve(
+ qname,
+ rdtype,
+ rdclass,
+ tcp,
+ source,
+ raise_on_no_answer,
+ source_port,
+ lifetime,
+ search,
+ backend,
+ )