aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/dns/rdata.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/dns/rdata.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/rdata.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/rdata.py911
1 files changed, 911 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/rdata.py b/.venv/lib/python3.12/site-packages/dns/rdata.py
new file mode 100644
index 00000000..8099c26a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/rdata.py
@@ -0,0 +1,911 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+# Copyright (C) 2001-2017 Nominum, Inc.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose with or without fee is hereby granted,
+# provided that the above copyright notice and this permission notice
+# appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""DNS rdata."""
+
+import base64
+import binascii
+import inspect
+import io
+import itertools
+import random
+from importlib import import_module
+from typing import Any, Dict, Optional, Tuple, Union
+
+import dns.exception
+import dns.immutable
+import dns.ipv4
+import dns.ipv6
+import dns.name
+import dns.rdataclass
+import dns.rdatatype
+import dns.tokenizer
+import dns.ttl
+import dns.wire
+
+_chunksize = 32
+
+# We currently allow comparisons for rdata with relative names for backwards
+# compatibility, but in the future we will not, as these kinds of comparisons
+# can lead to subtle bugs if code is not carefully written.
+#
+# This switch allows the future behavior to be turned on so code can be
+# tested with it.
+_allow_relative_comparisons = True
+
+
+class NoRelativeRdataOrdering(dns.exception.DNSException):
+ """An attempt was made to do an ordered comparison of one or more
+ rdata with relative names. The only reliable way of sorting rdata
+ is to use non-relativized rdata.
+
+ """
+
+
+def _wordbreak(data, chunksize=_chunksize, separator=b" "):
+ """Break a binary string into chunks of chunksize characters separated by
+ a space.
+ """
+
+ if not chunksize:
+ return data.decode()
+ return separator.join(
+ [data[i : i + chunksize] for i in range(0, len(data), chunksize)]
+ ).decode()
+
+
+# pylint: disable=unused-argument
+
+
+def _hexify(data, chunksize=_chunksize, separator=b" ", **kw):
+ """Convert a binary string into its hex encoding, broken up into chunks
+ of chunksize characters separated by a separator.
+ """
+
+ return _wordbreak(binascii.hexlify(data), chunksize, separator)
+
+
+def _base64ify(data, chunksize=_chunksize, separator=b" ", **kw):
+ """Convert a binary string into its base64 encoding, broken up into chunks
+ of chunksize characters separated by a separator.
+ """
+
+ return _wordbreak(base64.b64encode(data), chunksize, separator)
+
+
+# pylint: enable=unused-argument
+
+__escaped = b'"\\'
+
+
+def _escapify(qstring):
+ """Escape the characters in a quoted string which need it."""
+
+ if isinstance(qstring, str):
+ qstring = qstring.encode()
+ if not isinstance(qstring, bytearray):
+ qstring = bytearray(qstring)
+
+ text = ""
+ for c in qstring:
+ if c in __escaped:
+ text += "\\" + chr(c)
+ elif c >= 0x20 and c < 0x7F:
+ text += chr(c)
+ else:
+ text += "\\%03d" % c
+ return text
+
+
+def _truncate_bitmap(what):
+ """Determine the index of greatest byte that isn't all zeros, and
+ return the bitmap that contains all the bytes less than that index.
+ """
+
+ for i in range(len(what) - 1, -1, -1):
+ if what[i] != 0:
+ return what[0 : i + 1]
+ return what[0:1]
+
+
+# So we don't have to edit all the rdata classes...
+_constify = dns.immutable.constify
+
+
+@dns.immutable.immutable
+class Rdata:
+ """Base class for all DNS rdata types."""
+
+ __slots__ = ["rdclass", "rdtype", "rdcomment"]
+
+ def __init__(self, rdclass, rdtype):
+ """Initialize an rdata.
+
+ *rdclass*, an ``int`` is the rdataclass of the Rdata.
+
+ *rdtype*, an ``int`` is the rdatatype of the Rdata.
+ """
+
+ self.rdclass = self._as_rdataclass(rdclass)
+ self.rdtype = self._as_rdatatype(rdtype)
+ self.rdcomment = None
+
+ def _get_all_slots(self):
+ return itertools.chain.from_iterable(
+ getattr(cls, "__slots__", []) for cls in self.__class__.__mro__
+ )
+
+ def __getstate__(self):
+ # We used to try to do a tuple of all slots here, but it
+ # doesn't work as self._all_slots isn't available at
+ # __setstate__() time. Before that we tried to store a tuple
+ # of __slots__, but that didn't work as it didn't store the
+ # slots defined by ancestors. This older way didn't fail
+ # outright, but ended up with partially broken objects, e.g.
+ # if you unpickled an A RR it wouldn't have rdclass and rdtype
+ # attributes, and would compare badly.
+ state = {}
+ for slot in self._get_all_slots():
+ state[slot] = getattr(self, slot)
+ return state
+
+ def __setstate__(self, state):
+ for slot, val in state.items():
+ object.__setattr__(self, slot, val)
+ if not hasattr(self, "rdcomment"):
+ # Pickled rdata from 2.0.x might not have a rdcomment, so add
+ # it if needed.
+ object.__setattr__(self, "rdcomment", None)
+
+ def covers(self) -> dns.rdatatype.RdataType:
+ """Return the type a Rdata covers.
+
+ DNS SIG/RRSIG rdatas apply to a specific type; this type is
+ returned by the covers() function. If the rdata type is not
+ SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when
+ creating rdatasets, allowing the rdataset to contain only RRSIGs
+ of a particular type, e.g. RRSIG(NS).
+
+ Returns a ``dns.rdatatype.RdataType``.
+ """
+
+ return dns.rdatatype.NONE
+
+ def extended_rdatatype(self) -> int:
+ """Return a 32-bit type value, the least significant 16 bits of
+ which are the ordinary DNS type, and the upper 16 bits of which are
+ the "covered" type, if any.
+
+ Returns an ``int``.
+ """
+
+ return self.covers() << 16 | self.rdtype
+
+ def to_text(
+ self,
+ origin: Optional[dns.name.Name] = None,
+ relativize: bool = True,
+ **kw: Dict[str, Any],
+ ) -> str:
+ """Convert an rdata to text format.
+
+ Returns a ``str``.
+ """
+
+ raise NotImplementedError # pragma: no cover
+
+ def _to_wire(
+ self,
+ file: Optional[Any],
+ compress: Optional[dns.name.CompressType] = None,
+ origin: Optional[dns.name.Name] = None,
+ canonicalize: bool = False,
+ ) -> None:
+ raise NotImplementedError # pragma: no cover
+
+ def to_wire(
+ self,
+ file: Optional[Any] = None,
+ compress: Optional[dns.name.CompressType] = None,
+ origin: Optional[dns.name.Name] = None,
+ canonicalize: bool = False,
+ ) -> Optional[bytes]:
+ """Convert an rdata to wire format.
+
+ Returns a ``bytes`` if no output file was specified, or ``None`` otherwise.
+ """
+
+ if file:
+ # We call _to_wire() and then return None explicitly instead of
+ # of just returning the None from _to_wire() as mypy's func-returns-value
+ # unhelpfully errors out with "error: "_to_wire" of "Rdata" does not return
+ # a value (it only ever returns None)"
+ self._to_wire(file, compress, origin, canonicalize)
+ return None
+ else:
+ f = io.BytesIO()
+ self._to_wire(f, compress, origin, canonicalize)
+ return f.getvalue()
+
+ def to_generic(
+ self, origin: Optional[dns.name.Name] = None
+ ) -> "dns.rdata.GenericRdata":
+ """Creates a dns.rdata.GenericRdata equivalent of this rdata.
+
+ Returns a ``dns.rdata.GenericRdata``.
+ """
+ return dns.rdata.GenericRdata(
+ self.rdclass, self.rdtype, self.to_wire(origin=origin)
+ )
+
+ def to_digestable(self, origin: Optional[dns.name.Name] = None) -> bytes:
+ """Convert rdata to a format suitable for digesting in hashes. This
+ is also the DNSSEC canonical form.
+
+ Returns a ``bytes``.
+ """
+ wire = self.to_wire(origin=origin, canonicalize=True)
+ assert wire is not None # for mypy
+ return wire
+
+ def __repr__(self):
+ covers = self.covers()
+ if covers == dns.rdatatype.NONE:
+ ctext = ""
+ else:
+ ctext = "(" + dns.rdatatype.to_text(covers) + ")"
+ return (
+ "<DNS "
+ + dns.rdataclass.to_text(self.rdclass)
+ + " "
+ + dns.rdatatype.to_text(self.rdtype)
+ + ctext
+ + " rdata: "
+ + str(self)
+ + ">"
+ )
+
+ def __str__(self):
+ return self.to_text()
+
+ def _cmp(self, other):
+ """Compare an rdata with another rdata of the same rdtype and
+ rdclass.
+
+ For rdata with only absolute names:
+ Return < 0 if self < other in the DNSSEC ordering, 0 if self
+ == other, and > 0 if self > other.
+ For rdata with at least one relative names:
+ The rdata sorts before any rdata with only absolute names.
+ When compared with another relative rdata, all names are
+ made absolute as if they were relative to the root, as the
+ proper origin is not available. While this creates a stable
+ ordering, it is NOT guaranteed to be the DNSSEC ordering.
+ In the future, all ordering comparisons for rdata with
+ relative names will be disallowed.
+ """
+ try:
+ our = self.to_digestable()
+ our_relative = False
+ except dns.name.NeedAbsoluteNameOrOrigin:
+ if _allow_relative_comparisons:
+ our = self.to_digestable(dns.name.root)
+ our_relative = True
+ try:
+ their = other.to_digestable()
+ their_relative = False
+ except dns.name.NeedAbsoluteNameOrOrigin:
+ if _allow_relative_comparisons:
+ their = other.to_digestable(dns.name.root)
+ their_relative = True
+ if _allow_relative_comparisons:
+ if our_relative != their_relative:
+ # For the purpose of comparison, all rdata with at least one
+ # relative name is less than an rdata with only absolute names.
+ if our_relative:
+ return -1
+ else:
+ return 1
+ elif our_relative or their_relative:
+ raise NoRelativeRdataOrdering
+ if our == their:
+ return 0
+ elif our > their:
+ return 1
+ else:
+ return -1
+
+ def __eq__(self, other):
+ if not isinstance(other, Rdata):
+ return False
+ if self.rdclass != other.rdclass or self.rdtype != other.rdtype:
+ return False
+ our_relative = False
+ their_relative = False
+ try:
+ our = self.to_digestable()
+ except dns.name.NeedAbsoluteNameOrOrigin:
+ our = self.to_digestable(dns.name.root)
+ our_relative = True
+ try:
+ their = other.to_digestable()
+ except dns.name.NeedAbsoluteNameOrOrigin:
+ their = other.to_digestable(dns.name.root)
+ their_relative = True
+ if our_relative != their_relative:
+ return False
+ return our == their
+
+ def __ne__(self, other):
+ if not isinstance(other, Rdata):
+ return True
+ if self.rdclass != other.rdclass or self.rdtype != other.rdtype:
+ return True
+ return not self.__eq__(other)
+
+ def __lt__(self, other):
+ if (
+ not isinstance(other, Rdata)
+ or self.rdclass != other.rdclass
+ or self.rdtype != other.rdtype
+ ):
+ return NotImplemented
+ return self._cmp(other) < 0
+
+ def __le__(self, other):
+ if (
+ not isinstance(other, Rdata)
+ or self.rdclass != other.rdclass
+ or self.rdtype != other.rdtype
+ ):
+ return NotImplemented
+ return self._cmp(other) <= 0
+
+ def __ge__(self, other):
+ if (
+ not isinstance(other, Rdata)
+ or self.rdclass != other.rdclass
+ or self.rdtype != other.rdtype
+ ):
+ return NotImplemented
+ return self._cmp(other) >= 0
+
+ def __gt__(self, other):
+ if (
+ not isinstance(other, Rdata)
+ or self.rdclass != other.rdclass
+ or self.rdtype != other.rdtype
+ ):
+ return NotImplemented
+ return self._cmp(other) > 0
+
+ def __hash__(self):
+ return hash(self.to_digestable(dns.name.root))
+
+ @classmethod
+ def from_text(
+ cls,
+ rdclass: dns.rdataclass.RdataClass,
+ rdtype: dns.rdatatype.RdataType,
+ tok: dns.tokenizer.Tokenizer,
+ origin: Optional[dns.name.Name] = None,
+ relativize: bool = True,
+ relativize_to: Optional[dns.name.Name] = None,
+ ) -> "Rdata":
+ raise NotImplementedError # pragma: no cover
+
+ @classmethod
+ def from_wire_parser(
+ cls,
+ rdclass: dns.rdataclass.RdataClass,
+ rdtype: dns.rdatatype.RdataType,
+ parser: dns.wire.Parser,
+ origin: Optional[dns.name.Name] = None,
+ ) -> "Rdata":
+ raise NotImplementedError # pragma: no cover
+
+ def replace(self, **kwargs: Any) -> "Rdata":
+ """
+ Create a new Rdata instance based on the instance replace was
+ invoked on. It is possible to pass different parameters to
+ override the corresponding properties of the base Rdata.
+
+ Any field specific to the Rdata type can be replaced, but the
+ *rdtype* and *rdclass* fields cannot.
+
+ Returns an instance of the same Rdata subclass as *self*.
+ """
+
+ # Get the constructor parameters.
+ parameters = inspect.signature(self.__init__).parameters # type: ignore
+
+ # Ensure that all of the arguments correspond to valid fields.
+ # Don't allow rdclass or rdtype to be changed, though.
+ for key in kwargs:
+ if key == "rdcomment":
+ continue
+ if key not in parameters:
+ raise AttributeError(
+ f"'{self.__class__.__name__}' object has no attribute '{key}'"
+ )
+ if key in ("rdclass", "rdtype"):
+ raise AttributeError(
+ f"Cannot overwrite '{self.__class__.__name__}' attribute '{key}'"
+ )
+
+ # Construct the parameter list. For each field, use the value in
+ # kwargs if present, and the current value otherwise.
+ args = (kwargs.get(key, getattr(self, key)) for key in parameters)
+
+ # Create, validate, and return the new object.
+ rd = self.__class__(*args)
+ # The comment is not set in the constructor, so give it special
+ # handling.
+ rdcomment = kwargs.get("rdcomment", self.rdcomment)
+ if rdcomment is not None:
+ object.__setattr__(rd, "rdcomment", rdcomment)
+ return rd
+
+ # Type checking and conversion helpers. These are class methods as
+ # they don't touch object state and may be useful to others.
+
+ @classmethod
+ def _as_rdataclass(cls, value):
+ return dns.rdataclass.RdataClass.make(value)
+
+ @classmethod
+ def _as_rdatatype(cls, value):
+ return dns.rdatatype.RdataType.make(value)
+
+ @classmethod
+ def _as_bytes(
+ cls,
+ value: Any,
+ encode: bool = False,
+ max_length: Optional[int] = None,
+ empty_ok: bool = True,
+ ) -> bytes:
+ if encode and isinstance(value, str):
+ bvalue = value.encode()
+ elif isinstance(value, bytearray):
+ bvalue = bytes(value)
+ elif isinstance(value, bytes):
+ bvalue = value
+ else:
+ raise ValueError("not bytes")
+ if max_length is not None and len(bvalue) > max_length:
+ raise ValueError("too long")
+ if not empty_ok and len(bvalue) == 0:
+ raise ValueError("empty bytes not allowed")
+ return bvalue
+
+ @classmethod
+ def _as_name(cls, value):
+ # Note that proper name conversion (e.g. with origin and IDNA
+ # awareness) is expected to be done via from_text. This is just
+ # a simple thing for people invoking the constructor directly.
+ if isinstance(value, str):
+ return dns.name.from_text(value)
+ elif not isinstance(value, dns.name.Name):
+ raise ValueError("not a name")
+ return value
+
+ @classmethod
+ def _as_uint8(cls, value):
+ if not isinstance(value, int):
+ raise ValueError("not an integer")
+ if value < 0 or value > 255:
+ raise ValueError("not a uint8")
+ return value
+
+ @classmethod
+ def _as_uint16(cls, value):
+ if not isinstance(value, int):
+ raise ValueError("not an integer")
+ if value < 0 or value > 65535:
+ raise ValueError("not a uint16")
+ return value
+
+ @classmethod
+ def _as_uint32(cls, value):
+ if not isinstance(value, int):
+ raise ValueError("not an integer")
+ if value < 0 or value > 4294967295:
+ raise ValueError("not a uint32")
+ return value
+
+ @classmethod
+ def _as_uint48(cls, value):
+ if not isinstance(value, int):
+ raise ValueError("not an integer")
+ if value < 0 or value > 281474976710655:
+ raise ValueError("not a uint48")
+ return value
+
+ @classmethod
+ def _as_int(cls, value, low=None, high=None):
+ if not isinstance(value, int):
+ raise ValueError("not an integer")
+ if low is not None and value < low:
+ raise ValueError("value too small")
+ if high is not None and value > high:
+ raise ValueError("value too large")
+ return value
+
+ @classmethod
+ def _as_ipv4_address(cls, value):
+ if isinstance(value, str):
+ return dns.ipv4.canonicalize(value)
+ elif isinstance(value, bytes):
+ return dns.ipv4.inet_ntoa(value)
+ else:
+ raise ValueError("not an IPv4 address")
+
+ @classmethod
+ def _as_ipv6_address(cls, value):
+ if isinstance(value, str):
+ return dns.ipv6.canonicalize(value)
+ elif isinstance(value, bytes):
+ return dns.ipv6.inet_ntoa(value)
+ else:
+ raise ValueError("not an IPv6 address")
+
+ @classmethod
+ def _as_bool(cls, value):
+ if isinstance(value, bool):
+ return value
+ else:
+ raise ValueError("not a boolean")
+
+ @classmethod
+ def _as_ttl(cls, value):
+ if isinstance(value, int):
+ return cls._as_int(value, 0, dns.ttl.MAX_TTL)
+ elif isinstance(value, str):
+ return dns.ttl.from_text(value)
+ else:
+ raise ValueError("not a TTL")
+
+ @classmethod
+ def _as_tuple(cls, value, as_value):
+ try:
+ # For user convenience, if value is a singleton of the list
+ # element type, wrap it in a tuple.
+ return (as_value(value),)
+ except Exception:
+ # Otherwise, check each element of the iterable *value*
+ # against *as_value*.
+ return tuple(as_value(v) for v in value)
+
+ # Processing order
+
+ @classmethod
+ def _processing_order(cls, iterable):
+ items = list(iterable)
+ random.shuffle(items)
+ return items
+
+
+@dns.immutable.immutable
+class GenericRdata(Rdata):
+ """Generic Rdata Class
+
+ This class is used for rdata types for which we have no better
+ implementation. It implements the DNS "unknown RRs" scheme.
+ """
+
+ __slots__ = ["data"]
+
+ def __init__(self, rdclass, rdtype, data):
+ super().__init__(rdclass, rdtype)
+ self.data = data
+
+ def to_text(
+ self,
+ origin: Optional[dns.name.Name] = None,
+ relativize: bool = True,
+ **kw: Dict[str, Any],
+ ) -> str:
+ return r"\# %d " % len(self.data) + _hexify(self.data, **kw)
+
+ @classmethod
+ def from_text(
+ cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
+ ):
+ token = tok.get()
+ if not token.is_identifier() or token.value != r"\#":
+ raise dns.exception.SyntaxError(r"generic rdata does not start with \#")
+ length = tok.get_int()
+ hex = tok.concatenate_remaining_identifiers(True).encode()
+ data = binascii.unhexlify(hex)
+ if len(data) != length:
+ raise dns.exception.SyntaxError("generic rdata hex data has wrong length")
+ return cls(rdclass, rdtype, data)
+
+ def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
+ file.write(self.data)
+
+ @classmethod
+ def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
+ return cls(rdclass, rdtype, parser.get_remaining())
+
+
+_rdata_classes: Dict[Tuple[dns.rdataclass.RdataClass, dns.rdatatype.RdataType], Any] = (
+ {}
+)
+_module_prefix = "dns.rdtypes"
+_dynamic_load_allowed = True
+
+
+def get_rdata_class(rdclass, rdtype, use_generic=True):
+ cls = _rdata_classes.get((rdclass, rdtype))
+ if not cls:
+ cls = _rdata_classes.get((dns.rdatatype.ANY, rdtype))
+ if not cls and _dynamic_load_allowed:
+ rdclass_text = dns.rdataclass.to_text(rdclass)
+ rdtype_text = dns.rdatatype.to_text(rdtype)
+ rdtype_text = rdtype_text.replace("-", "_")
+ try:
+ mod = import_module(
+ ".".join([_module_prefix, rdclass_text, rdtype_text])
+ )
+ cls = getattr(mod, rdtype_text)
+ _rdata_classes[(rdclass, rdtype)] = cls
+ except ImportError:
+ try:
+ mod = import_module(".".join([_module_prefix, "ANY", rdtype_text]))
+ cls = getattr(mod, rdtype_text)
+ _rdata_classes[(dns.rdataclass.ANY, rdtype)] = cls
+ _rdata_classes[(rdclass, rdtype)] = cls
+ except ImportError:
+ pass
+ if not cls and use_generic:
+ cls = GenericRdata
+ _rdata_classes[(rdclass, rdtype)] = cls
+ return cls
+
+
+def load_all_types(disable_dynamic_load=True):
+ """Load all rdata types for which dnspython has a non-generic implementation.
+
+ Normally dnspython loads DNS rdatatype implementations on demand, but in some
+ specialized cases loading all types at an application-controlled time is preferred.
+
+ If *disable_dynamic_load*, a ``bool``, is ``True`` then dnspython will not attempt
+ to use its dynamic loading mechanism if an unknown type is subsequently encountered,
+ and will simply use the ``GenericRdata`` class.
+ """
+ # Load class IN and ANY types.
+ for rdtype in dns.rdatatype.RdataType:
+ get_rdata_class(dns.rdataclass.IN, rdtype, False)
+ # Load the one non-ANY implementation we have in CH. Everything
+ # else in CH is an ANY type, and we'll discover those on demand but won't
+ # have to import anything.
+ get_rdata_class(dns.rdataclass.CH, dns.rdatatype.A, False)
+ if disable_dynamic_load:
+ # Now disable dynamic loading so any subsequent unknown type immediately becomes
+ # GenericRdata without a load attempt.
+ global _dynamic_load_allowed
+ _dynamic_load_allowed = False
+
+
+def from_text(
+ rdclass: Union[dns.rdataclass.RdataClass, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ tok: Union[dns.tokenizer.Tokenizer, str],
+ origin: Optional[dns.name.Name] = None,
+ relativize: bool = True,
+ relativize_to: Optional[dns.name.Name] = None,
+ idna_codec: Optional[dns.name.IDNACodec] = None,
+) -> Rdata:
+ """Build an rdata object from text format.
+
+ This function attempts to dynamically load a class which
+ implements the specified rdata class and type. If there is no
+ class-and-type-specific implementation, the GenericRdata class
+ is used.
+
+ Once a class is chosen, its from_text() class method is called
+ with the parameters to this function.
+
+ If *tok* is a ``str``, then a tokenizer is created and the string
+ is used as its input.
+
+ *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass.
+
+ *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype.
+
+ *tok*, a ``dns.tokenizer.Tokenizer`` or a ``str``.
+
+ *origin*, a ``dns.name.Name`` (or ``None``), the
+ origin to use for relative names.
+
+ *relativize*, a ``bool``. If true, name will be relativized.
+
+ *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use
+ when relativizing names. If not set, the *origin* value will be used.
+
+ *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
+ encoder/decoder to use if a tokenizer needs to be created. If
+ ``None``, the default IDNA 2003 encoder/decoder is used. If a
+ tokenizer is not created, then the codec associated with the tokenizer
+ is the one that is used.
+
+ Returns an instance of the chosen Rdata subclass.
+
+ """
+ if isinstance(tok, str):
+ tok = dns.tokenizer.Tokenizer(tok, idna_codec=idna_codec)
+ rdclass = dns.rdataclass.RdataClass.make(rdclass)
+ rdtype = dns.rdatatype.RdataType.make(rdtype)
+ cls = get_rdata_class(rdclass, rdtype)
+ with dns.exception.ExceptionWrapper(dns.exception.SyntaxError):
+ rdata = None
+ if cls != GenericRdata:
+ # peek at first token
+ token = tok.get()
+ tok.unget(token)
+ if token.is_identifier() and token.value == r"\#":
+ #
+ # Known type using the generic syntax. Extract the
+ # wire form from the generic syntax, and then run
+ # from_wire on it.
+ #
+ grdata = GenericRdata.from_text(
+ rdclass, rdtype, tok, origin, relativize, relativize_to
+ )
+ rdata = from_wire(
+ rdclass, rdtype, grdata.data, 0, len(grdata.data), origin
+ )
+ #
+ # If this comparison isn't equal, then there must have been
+ # compressed names in the wire format, which is an error,
+ # there being no reasonable context to decompress with.
+ #
+ rwire = rdata.to_wire()
+ if rwire != grdata.data:
+ raise dns.exception.SyntaxError(
+ "compressed data in "
+ "generic syntax form "
+ "of known rdatatype"
+ )
+ if rdata is None:
+ rdata = cls.from_text(
+ rdclass, rdtype, tok, origin, relativize, relativize_to
+ )
+ token = tok.get_eol_as_token()
+ if token.comment is not None:
+ object.__setattr__(rdata, "rdcomment", token.comment)
+ return rdata
+
+
+def from_wire_parser(
+ rdclass: Union[dns.rdataclass.RdataClass, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ parser: dns.wire.Parser,
+ origin: Optional[dns.name.Name] = None,
+) -> Rdata:
+ """Build an rdata object from wire format
+
+ This function attempts to dynamically load a class which
+ implements the specified rdata class and type. If there is no
+ class-and-type-specific implementation, the GenericRdata class
+ is used.
+
+ Once a class is chosen, its from_wire() class method is called
+ with the parameters to this function.
+
+ *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass.
+
+ *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype.
+
+ *parser*, a ``dns.wire.Parser``, the parser, which should be
+ restricted to the rdata length.
+
+ *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``,
+ then names will be relativized to this origin.
+
+ Returns an instance of the chosen Rdata subclass.
+ """
+
+ rdclass = dns.rdataclass.RdataClass.make(rdclass)
+ rdtype = dns.rdatatype.RdataType.make(rdtype)
+ cls = get_rdata_class(rdclass, rdtype)
+ with dns.exception.ExceptionWrapper(dns.exception.FormError):
+ return cls.from_wire_parser(rdclass, rdtype, parser, origin)
+
+
+def from_wire(
+ rdclass: Union[dns.rdataclass.RdataClass, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ wire: bytes,
+ current: int,
+ rdlen: int,
+ origin: Optional[dns.name.Name] = None,
+) -> Rdata:
+ """Build an rdata object from wire format
+
+ This function attempts to dynamically load a class which
+ implements the specified rdata class and type. If there is no
+ class-and-type-specific implementation, the GenericRdata class
+ is used.
+
+ Once a class is chosen, its from_wire() class method is called
+ with the parameters to this function.
+
+ *rdclass*, an ``int``, the rdataclass.
+
+ *rdtype*, an ``int``, the rdatatype.
+
+ *wire*, a ``bytes``, the wire-format message.
+
+ *current*, an ``int``, the offset in wire of the beginning of
+ the rdata.
+
+ *rdlen*, an ``int``, the length of the wire-format rdata
+
+ *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``,
+ then names will be relativized to this origin.
+
+ Returns an instance of the chosen Rdata subclass.
+ """
+ parser = dns.wire.Parser(wire, current)
+ with parser.restrict_to(rdlen):
+ return from_wire_parser(rdclass, rdtype, parser, origin)
+
+
+class RdatatypeExists(dns.exception.DNSException):
+ """DNS rdatatype already exists."""
+
+ supp_kwargs = {"rdclass", "rdtype"}
+ fmt = (
+ "The rdata type with class {rdclass:d} and rdtype {rdtype:d} "
+ + "already exists."
+ )
+
+
+def register_type(
+ implementation: Any,
+ rdtype: int,
+ rdtype_text: str,
+ is_singleton: bool = False,
+ rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
+) -> None:
+ """Dynamically register a module to handle an rdatatype.
+
+ *implementation*, a module implementing the type in the usual dnspython
+ way.
+
+ *rdtype*, an ``int``, the rdatatype to register.
+
+ *rdtype_text*, a ``str``, the textual form of the rdatatype.
+
+ *is_singleton*, a ``bool``, indicating if the type is a singleton (i.e.
+ RRsets of the type can have only one member.)
+
+ *rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if
+ it applies to all classes.
+ """
+
+ rdtype = dns.rdatatype.RdataType.make(rdtype)
+ existing_cls = get_rdata_class(rdclass, rdtype)
+ if existing_cls != GenericRdata or dns.rdatatype.is_metatype(rdtype):
+ raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype)
+ _rdata_classes[(rdclass, rdtype)] = getattr(
+ implementation, rdtype_text.replace("-", "_")
+ )
+ dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton)