about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/dns/rdata.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/rdata.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/rdata.py911
1 files changed, 911 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/rdata.py b/.venv/lib/python3.12/site-packages/dns/rdata.py
new file mode 100644
index 00000000..8099c26a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/rdata.py
@@ -0,0 +1,911 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+# Copyright (C) 2001-2017 Nominum, Inc.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose with or without fee is hereby granted,
+# provided that the above copyright notice and this permission notice
+# appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""DNS rdata."""
+
+import base64
+import binascii
+import inspect
+import io
+import itertools
+import random
+from importlib import import_module
+from typing import Any, Dict, Optional, Tuple, Union
+
+import dns.exception
+import dns.immutable
+import dns.ipv4
+import dns.ipv6
+import dns.name
+import dns.rdataclass
+import dns.rdatatype
+import dns.tokenizer
+import dns.ttl
+import dns.wire
+
+_chunksize = 32
+
+# We currently allow comparisons for rdata with relative names for backwards
+# compatibility, but in the future we will not, as these kinds of comparisons
+# can lead to subtle bugs if code is not carefully written.
+#
+# This switch allows the future behavior to be turned on so code can be
+# tested with it.
+_allow_relative_comparisons = True
+
+
+class NoRelativeRdataOrdering(dns.exception.DNSException):
+    """An attempt was made to do an ordered comparison of one or more
+    rdata with relative names.  The only reliable way of sorting rdata
+    is to use non-relativized rdata.
+
+    """
+
+
+def _wordbreak(data, chunksize=_chunksize, separator=b" "):
+    """Break a binary string into chunks of chunksize characters separated by
+    a space.
+    """
+
+    if not chunksize:
+        return data.decode()
+    return separator.join(
+        [data[i : i + chunksize] for i in range(0, len(data), chunksize)]
+    ).decode()
+
+
+# pylint: disable=unused-argument
+
+
+def _hexify(data, chunksize=_chunksize, separator=b" ", **kw):
+    """Convert a binary string into its hex encoding, broken up into chunks
+    of chunksize characters separated by a separator.
+    """
+
+    return _wordbreak(binascii.hexlify(data), chunksize, separator)
+
+
+def _base64ify(data, chunksize=_chunksize, separator=b" ", **kw):
+    """Convert a binary string into its base64 encoding, broken up into chunks
+    of chunksize characters separated by a separator.
+    """
+
+    return _wordbreak(base64.b64encode(data), chunksize, separator)
+
+
+# pylint: enable=unused-argument
+
+__escaped = b'"\\'
+
+
+def _escapify(qstring):
+    """Escape the characters in a quoted string which need it."""
+
+    if isinstance(qstring, str):
+        qstring = qstring.encode()
+    if not isinstance(qstring, bytearray):
+        qstring = bytearray(qstring)
+
+    text = ""
+    for c in qstring:
+        if c in __escaped:
+            text += "\\" + chr(c)
+        elif c >= 0x20 and c < 0x7F:
+            text += chr(c)
+        else:
+            text += "\\%03d" % c
+    return text
+
+
+def _truncate_bitmap(what):
+    """Determine the index of greatest byte that isn't all zeros, and
+    return the bitmap that contains all the bytes less than that index.
+    """
+
+    for i in range(len(what) - 1, -1, -1):
+        if what[i] != 0:
+            return what[0 : i + 1]
+    return what[0:1]
+
+
+# So we don't have to edit all the rdata classes...
+_constify = dns.immutable.constify
+
+
+@dns.immutable.immutable
+class Rdata:
+    """Base class for all DNS rdata types."""
+
+    __slots__ = ["rdclass", "rdtype", "rdcomment"]
+
+    def __init__(self, rdclass, rdtype):
+        """Initialize an rdata.
+
+        *rdclass*, an ``int`` is the rdataclass of the Rdata.
+
+        *rdtype*, an ``int`` is the rdatatype of the Rdata.
+        """
+
+        self.rdclass = self._as_rdataclass(rdclass)
+        self.rdtype = self._as_rdatatype(rdtype)
+        self.rdcomment = None
+
+    def _get_all_slots(self):
+        return itertools.chain.from_iterable(
+            getattr(cls, "__slots__", []) for cls in self.__class__.__mro__
+        )
+
+    def __getstate__(self):
+        # We used to try to do a tuple of all slots here, but it
+        # doesn't work as self._all_slots isn't available at
+        # __setstate__() time.  Before that we tried to store a tuple
+        # of __slots__, but that didn't work as it didn't store the
+        # slots defined by ancestors.  This older way didn't fail
+        # outright, but ended up with partially broken objects, e.g.
+        # if you unpickled an A RR it wouldn't have rdclass and rdtype
+        # attributes, and would compare badly.
+        state = {}
+        for slot in self._get_all_slots():
+            state[slot] = getattr(self, slot)
+        return state
+
+    def __setstate__(self, state):
+        for slot, val in state.items():
+            object.__setattr__(self, slot, val)
+        if not hasattr(self, "rdcomment"):
+            # Pickled rdata from 2.0.x might not have a rdcomment, so add
+            # it if needed.
+            object.__setattr__(self, "rdcomment", None)
+
+    def covers(self) -> dns.rdatatype.RdataType:
+        """Return the type a Rdata covers.
+
+        DNS SIG/RRSIG rdatas apply to a specific type; this type is
+        returned by the covers() function.  If the rdata type is not
+        SIG or RRSIG, dns.rdatatype.NONE is returned.  This is useful when
+        creating rdatasets, allowing the rdataset to contain only RRSIGs
+        of a particular type, e.g. RRSIG(NS).
+
+        Returns a ``dns.rdatatype.RdataType``.
+        """
+
+        return dns.rdatatype.NONE
+
+    def extended_rdatatype(self) -> int:
+        """Return a 32-bit type value, the least significant 16 bits of
+        which are the ordinary DNS type, and the upper 16 bits of which are
+        the "covered" type, if any.
+
+        Returns an ``int``.
+        """
+
+        return self.covers() << 16 | self.rdtype
+
+    def to_text(
+        self,
+        origin: Optional[dns.name.Name] = None,
+        relativize: bool = True,
+        **kw: Dict[str, Any],
+    ) -> str:
+        """Convert an rdata to text format.
+
+        Returns a ``str``.
+        """
+
+        raise NotImplementedError  # pragma: no cover
+
+    def _to_wire(
+        self,
+        file: Optional[Any],
+        compress: Optional[dns.name.CompressType] = None,
+        origin: Optional[dns.name.Name] = None,
+        canonicalize: bool = False,
+    ) -> None:
+        raise NotImplementedError  # pragma: no cover
+
+    def to_wire(
+        self,
+        file: Optional[Any] = None,
+        compress: Optional[dns.name.CompressType] = None,
+        origin: Optional[dns.name.Name] = None,
+        canonicalize: bool = False,
+    ) -> Optional[bytes]:
+        """Convert an rdata to wire format.
+
+        Returns a ``bytes`` if no output file was specified, or ``None`` otherwise.
+        """
+
+        if file:
+            # We call _to_wire() and then return None explicitly instead of
+            # of just returning the None from _to_wire() as mypy's func-returns-value
+            # unhelpfully errors out with "error: "_to_wire" of "Rdata" does not return
+            # a value (it only ever returns None)"
+            self._to_wire(file, compress, origin, canonicalize)
+            return None
+        else:
+            f = io.BytesIO()
+            self._to_wire(f, compress, origin, canonicalize)
+            return f.getvalue()
+
+    def to_generic(
+        self, origin: Optional[dns.name.Name] = None
+    ) -> "dns.rdata.GenericRdata":
+        """Creates a dns.rdata.GenericRdata equivalent of this rdata.
+
+        Returns a ``dns.rdata.GenericRdata``.
+        """
+        return dns.rdata.GenericRdata(
+            self.rdclass, self.rdtype, self.to_wire(origin=origin)
+        )
+
+    def to_digestable(self, origin: Optional[dns.name.Name] = None) -> bytes:
+        """Convert rdata to a format suitable for digesting in hashes.  This
+        is also the DNSSEC canonical form.
+
+        Returns a ``bytes``.
+        """
+        wire = self.to_wire(origin=origin, canonicalize=True)
+        assert wire is not None  # for mypy
+        return wire
+
+    def __repr__(self):
+        covers = self.covers()
+        if covers == dns.rdatatype.NONE:
+            ctext = ""
+        else:
+            ctext = "(" + dns.rdatatype.to_text(covers) + ")"
+        return (
+            "<DNS "
+            + dns.rdataclass.to_text(self.rdclass)
+            + " "
+            + dns.rdatatype.to_text(self.rdtype)
+            + ctext
+            + " rdata: "
+            + str(self)
+            + ">"
+        )
+
+    def __str__(self):
+        return self.to_text()
+
+    def _cmp(self, other):
+        """Compare an rdata with another rdata of the same rdtype and
+        rdclass.
+
+        For rdata with only absolute names:
+            Return < 0 if self < other in the DNSSEC ordering, 0 if self
+            == other, and > 0 if self > other.
+        For rdata with at least one relative names:
+            The rdata sorts before any rdata with only absolute names.
+            When compared with another relative rdata, all names are
+            made absolute as if they were relative to the root, as the
+            proper origin is not available.  While this creates a stable
+            ordering, it is NOT guaranteed to be the DNSSEC ordering.
+            In the future, all ordering comparisons for rdata with
+            relative names will be disallowed.
+        """
+        try:
+            our = self.to_digestable()
+            our_relative = False
+        except dns.name.NeedAbsoluteNameOrOrigin:
+            if _allow_relative_comparisons:
+                our = self.to_digestable(dns.name.root)
+            our_relative = True
+        try:
+            their = other.to_digestable()
+            their_relative = False
+        except dns.name.NeedAbsoluteNameOrOrigin:
+            if _allow_relative_comparisons:
+                their = other.to_digestable(dns.name.root)
+            their_relative = True
+        if _allow_relative_comparisons:
+            if our_relative != their_relative:
+                # For the purpose of comparison, all rdata with at least one
+                # relative name is less than an rdata with only absolute names.
+                if our_relative:
+                    return -1
+                else:
+                    return 1
+        elif our_relative or their_relative:
+            raise NoRelativeRdataOrdering
+        if our == their:
+            return 0
+        elif our > their:
+            return 1
+        else:
+            return -1
+
+    def __eq__(self, other):
+        if not isinstance(other, Rdata):
+            return False
+        if self.rdclass != other.rdclass or self.rdtype != other.rdtype:
+            return False
+        our_relative = False
+        their_relative = False
+        try:
+            our = self.to_digestable()
+        except dns.name.NeedAbsoluteNameOrOrigin:
+            our = self.to_digestable(dns.name.root)
+            our_relative = True
+        try:
+            their = other.to_digestable()
+        except dns.name.NeedAbsoluteNameOrOrigin:
+            their = other.to_digestable(dns.name.root)
+            their_relative = True
+        if our_relative != their_relative:
+            return False
+        return our == their
+
+    def __ne__(self, other):
+        if not isinstance(other, Rdata):
+            return True
+        if self.rdclass != other.rdclass or self.rdtype != other.rdtype:
+            return True
+        return not self.__eq__(other)
+
+    def __lt__(self, other):
+        if (
+            not isinstance(other, Rdata)
+            or self.rdclass != other.rdclass
+            or self.rdtype != other.rdtype
+        ):
+            return NotImplemented
+        return self._cmp(other) < 0
+
+    def __le__(self, other):
+        if (
+            not isinstance(other, Rdata)
+            or self.rdclass != other.rdclass
+            or self.rdtype != other.rdtype
+        ):
+            return NotImplemented
+        return self._cmp(other) <= 0
+
+    def __ge__(self, other):
+        if (
+            not isinstance(other, Rdata)
+            or self.rdclass != other.rdclass
+            or self.rdtype != other.rdtype
+        ):
+            return NotImplemented
+        return self._cmp(other) >= 0
+
+    def __gt__(self, other):
+        if (
+            not isinstance(other, Rdata)
+            or self.rdclass != other.rdclass
+            or self.rdtype != other.rdtype
+        ):
+            return NotImplemented
+        return self._cmp(other) > 0
+
+    def __hash__(self):
+        return hash(self.to_digestable(dns.name.root))
+
+    @classmethod
+    def from_text(
+        cls,
+        rdclass: dns.rdataclass.RdataClass,
+        rdtype: dns.rdatatype.RdataType,
+        tok: dns.tokenizer.Tokenizer,
+        origin: Optional[dns.name.Name] = None,
+        relativize: bool = True,
+        relativize_to: Optional[dns.name.Name] = None,
+    ) -> "Rdata":
+        raise NotImplementedError  # pragma: no cover
+
+    @classmethod
+    def from_wire_parser(
+        cls,
+        rdclass: dns.rdataclass.RdataClass,
+        rdtype: dns.rdatatype.RdataType,
+        parser: dns.wire.Parser,
+        origin: Optional[dns.name.Name] = None,
+    ) -> "Rdata":
+        raise NotImplementedError  # pragma: no cover
+
+    def replace(self, **kwargs: Any) -> "Rdata":
+        """
+        Create a new Rdata instance based on the instance replace was
+        invoked on. It is possible to pass different parameters to
+        override the corresponding properties of the base Rdata.
+
+        Any field specific to the Rdata type can be replaced, but the
+        *rdtype* and *rdclass* fields cannot.
+
+        Returns an instance of the same Rdata subclass as *self*.
+        """
+
+        # Get the constructor parameters.
+        parameters = inspect.signature(self.__init__).parameters  # type: ignore
+
+        # Ensure that all of the arguments correspond to valid fields.
+        # Don't allow rdclass or rdtype to be changed, though.
+        for key in kwargs:
+            if key == "rdcomment":
+                continue
+            if key not in parameters:
+                raise AttributeError(
+                    f"'{self.__class__.__name__}' object has no attribute '{key}'"
+                )
+            if key in ("rdclass", "rdtype"):
+                raise AttributeError(
+                    f"Cannot overwrite '{self.__class__.__name__}' attribute '{key}'"
+                )
+
+        # Construct the parameter list.  For each field, use the value in
+        # kwargs if present, and the current value otherwise.
+        args = (kwargs.get(key, getattr(self, key)) for key in parameters)
+
+        # Create, validate, and return the new object.
+        rd = self.__class__(*args)
+        # The comment is not set in the constructor, so give it special
+        # handling.
+        rdcomment = kwargs.get("rdcomment", self.rdcomment)
+        if rdcomment is not None:
+            object.__setattr__(rd, "rdcomment", rdcomment)
+        return rd
+
+    # Type checking and conversion helpers.  These are class methods as
+    # they don't touch object state and may be useful to others.
+
+    @classmethod
+    def _as_rdataclass(cls, value):
+        return dns.rdataclass.RdataClass.make(value)
+
+    @classmethod
+    def _as_rdatatype(cls, value):
+        return dns.rdatatype.RdataType.make(value)
+
+    @classmethod
+    def _as_bytes(
+        cls,
+        value: Any,
+        encode: bool = False,
+        max_length: Optional[int] = None,
+        empty_ok: bool = True,
+    ) -> bytes:
+        if encode and isinstance(value, str):
+            bvalue = value.encode()
+        elif isinstance(value, bytearray):
+            bvalue = bytes(value)
+        elif isinstance(value, bytes):
+            bvalue = value
+        else:
+            raise ValueError("not bytes")
+        if max_length is not None and len(bvalue) > max_length:
+            raise ValueError("too long")
+        if not empty_ok and len(bvalue) == 0:
+            raise ValueError("empty bytes not allowed")
+        return bvalue
+
+    @classmethod
+    def _as_name(cls, value):
+        # Note that proper name conversion (e.g. with origin and IDNA
+        # awareness) is expected to be done via from_text.  This is just
+        # a simple thing for people invoking the constructor directly.
+        if isinstance(value, str):
+            return dns.name.from_text(value)
+        elif not isinstance(value, dns.name.Name):
+            raise ValueError("not a name")
+        return value
+
+    @classmethod
+    def _as_uint8(cls, value):
+        if not isinstance(value, int):
+            raise ValueError("not an integer")
+        if value < 0 or value > 255:
+            raise ValueError("not a uint8")
+        return value
+
+    @classmethod
+    def _as_uint16(cls, value):
+        if not isinstance(value, int):
+            raise ValueError("not an integer")
+        if value < 0 or value > 65535:
+            raise ValueError("not a uint16")
+        return value
+
+    @classmethod
+    def _as_uint32(cls, value):
+        if not isinstance(value, int):
+            raise ValueError("not an integer")
+        if value < 0 or value > 4294967295:
+            raise ValueError("not a uint32")
+        return value
+
+    @classmethod
+    def _as_uint48(cls, value):
+        if not isinstance(value, int):
+            raise ValueError("not an integer")
+        if value < 0 or value > 281474976710655:
+            raise ValueError("not a uint48")
+        return value
+
+    @classmethod
+    def _as_int(cls, value, low=None, high=None):
+        if not isinstance(value, int):
+            raise ValueError("not an integer")
+        if low is not None and value < low:
+            raise ValueError("value too small")
+        if high is not None and value > high:
+            raise ValueError("value too large")
+        return value
+
+    @classmethod
+    def _as_ipv4_address(cls, value):
+        if isinstance(value, str):
+            return dns.ipv4.canonicalize(value)
+        elif isinstance(value, bytes):
+            return dns.ipv4.inet_ntoa(value)
+        else:
+            raise ValueError("not an IPv4 address")
+
+    @classmethod
+    def _as_ipv6_address(cls, value):
+        if isinstance(value, str):
+            return dns.ipv6.canonicalize(value)
+        elif isinstance(value, bytes):
+            return dns.ipv6.inet_ntoa(value)
+        else:
+            raise ValueError("not an IPv6 address")
+
+    @classmethod
+    def _as_bool(cls, value):
+        if isinstance(value, bool):
+            return value
+        else:
+            raise ValueError("not a boolean")
+
+    @classmethod
+    def _as_ttl(cls, value):
+        if isinstance(value, int):
+            return cls._as_int(value, 0, dns.ttl.MAX_TTL)
+        elif isinstance(value, str):
+            return dns.ttl.from_text(value)
+        else:
+            raise ValueError("not a TTL")
+
+    @classmethod
+    def _as_tuple(cls, value, as_value):
+        try:
+            # For user convenience, if value is a singleton of the list
+            # element type, wrap it in a tuple.
+            return (as_value(value),)
+        except Exception:
+            # Otherwise, check each element of the iterable *value*
+            # against *as_value*.
+            return tuple(as_value(v) for v in value)
+
+    # Processing order
+
+    @classmethod
+    def _processing_order(cls, iterable):
+        items = list(iterable)
+        random.shuffle(items)
+        return items
+
+
+@dns.immutable.immutable
+class GenericRdata(Rdata):
+    """Generic Rdata Class
+
+    This class is used for rdata types for which we have no better
+    implementation.  It implements the DNS "unknown RRs" scheme.
+    """
+
+    __slots__ = ["data"]
+
+    def __init__(self, rdclass, rdtype, data):
+        super().__init__(rdclass, rdtype)
+        self.data = data
+
+    def to_text(
+        self,
+        origin: Optional[dns.name.Name] = None,
+        relativize: bool = True,
+        **kw: Dict[str, Any],
+    ) -> str:
+        return r"\# %d " % len(self.data) + _hexify(self.data, **kw)
+
+    @classmethod
+    def from_text(
+        cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
+    ):
+        token = tok.get()
+        if not token.is_identifier() or token.value != r"\#":
+            raise dns.exception.SyntaxError(r"generic rdata does not start with \#")
+        length = tok.get_int()
+        hex = tok.concatenate_remaining_identifiers(True).encode()
+        data = binascii.unhexlify(hex)
+        if len(data) != length:
+            raise dns.exception.SyntaxError("generic rdata hex data has wrong length")
+        return cls(rdclass, rdtype, data)
+
+    def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
+        file.write(self.data)
+
+    @classmethod
+    def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
+        return cls(rdclass, rdtype, parser.get_remaining())
+
+
+_rdata_classes: Dict[Tuple[dns.rdataclass.RdataClass, dns.rdatatype.RdataType], Any] = (
+    {}
+)
+_module_prefix = "dns.rdtypes"
+_dynamic_load_allowed = True
+
+
+def get_rdata_class(rdclass, rdtype, use_generic=True):
+    cls = _rdata_classes.get((rdclass, rdtype))
+    if not cls:
+        cls = _rdata_classes.get((dns.rdatatype.ANY, rdtype))
+        if not cls and _dynamic_load_allowed:
+            rdclass_text = dns.rdataclass.to_text(rdclass)
+            rdtype_text = dns.rdatatype.to_text(rdtype)
+            rdtype_text = rdtype_text.replace("-", "_")
+            try:
+                mod = import_module(
+                    ".".join([_module_prefix, rdclass_text, rdtype_text])
+                )
+                cls = getattr(mod, rdtype_text)
+                _rdata_classes[(rdclass, rdtype)] = cls
+            except ImportError:
+                try:
+                    mod = import_module(".".join([_module_prefix, "ANY", rdtype_text]))
+                    cls = getattr(mod, rdtype_text)
+                    _rdata_classes[(dns.rdataclass.ANY, rdtype)] = cls
+                    _rdata_classes[(rdclass, rdtype)] = cls
+                except ImportError:
+                    pass
+    if not cls and use_generic:
+        cls = GenericRdata
+        _rdata_classes[(rdclass, rdtype)] = cls
+    return cls
+
+
+def load_all_types(disable_dynamic_load=True):
+    """Load all rdata types for which dnspython has a non-generic implementation.
+
+    Normally dnspython loads DNS rdatatype implementations on demand, but in some
+    specialized cases loading all types at an application-controlled time is preferred.
+
+    If *disable_dynamic_load*, a ``bool``, is ``True`` then dnspython will not attempt
+    to use its dynamic loading mechanism if an unknown type is subsequently encountered,
+    and will simply use the ``GenericRdata`` class.
+    """
+    # Load class IN and ANY types.
+    for rdtype in dns.rdatatype.RdataType:
+        get_rdata_class(dns.rdataclass.IN, rdtype, False)
+    # Load the one non-ANY implementation we have in CH.  Everything
+    # else in CH is an ANY type, and we'll discover those on demand but won't
+    # have to import anything.
+    get_rdata_class(dns.rdataclass.CH, dns.rdatatype.A, False)
+    if disable_dynamic_load:
+        # Now disable dynamic loading so any subsequent unknown type immediately becomes
+        # GenericRdata without a load attempt.
+        global _dynamic_load_allowed
+        _dynamic_load_allowed = False
+
+
+def from_text(
+    rdclass: Union[dns.rdataclass.RdataClass, str],
+    rdtype: Union[dns.rdatatype.RdataType, str],
+    tok: Union[dns.tokenizer.Tokenizer, str],
+    origin: Optional[dns.name.Name] = None,
+    relativize: bool = True,
+    relativize_to: Optional[dns.name.Name] = None,
+    idna_codec: Optional[dns.name.IDNACodec] = None,
+) -> Rdata:
+    """Build an rdata object from text format.
+
+    This function attempts to dynamically load a class which
+    implements the specified rdata class and type.  If there is no
+    class-and-type-specific implementation, the GenericRdata class
+    is used.
+
+    Once a class is chosen, its from_text() class method is called
+    with the parameters to this function.
+
+    If *tok* is a ``str``, then a tokenizer is created and the string
+    is used as its input.
+
+    *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass.
+
+    *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype.
+
+    *tok*, a ``dns.tokenizer.Tokenizer`` or a ``str``.
+
+    *origin*, a ``dns.name.Name`` (or ``None``), the
+    origin to use for relative names.
+
+    *relativize*, a ``bool``.  If true, name will be relativized.
+
+    *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use
+    when relativizing names.  If not set, the *origin* value will be used.
+
+    *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
+    encoder/decoder to use if a tokenizer needs to be created.  If
+    ``None``, the default IDNA 2003 encoder/decoder is used.  If a
+    tokenizer is not created, then the codec associated with the tokenizer
+    is the one that is used.
+
+    Returns an instance of the chosen Rdata subclass.
+
+    """
+    if isinstance(tok, str):
+        tok = dns.tokenizer.Tokenizer(tok, idna_codec=idna_codec)
+    rdclass = dns.rdataclass.RdataClass.make(rdclass)
+    rdtype = dns.rdatatype.RdataType.make(rdtype)
+    cls = get_rdata_class(rdclass, rdtype)
+    with dns.exception.ExceptionWrapper(dns.exception.SyntaxError):
+        rdata = None
+        if cls != GenericRdata:
+            # peek at first token
+            token = tok.get()
+            tok.unget(token)
+            if token.is_identifier() and token.value == r"\#":
+                #
+                # Known type using the generic syntax.  Extract the
+                # wire form from the generic syntax, and then run
+                # from_wire on it.
+                #
+                grdata = GenericRdata.from_text(
+                    rdclass, rdtype, tok, origin, relativize, relativize_to
+                )
+                rdata = from_wire(
+                    rdclass, rdtype, grdata.data, 0, len(grdata.data), origin
+                )
+                #
+                # If this comparison isn't equal, then there must have been
+                # compressed names in the wire format, which is an error,
+                # there being no reasonable context to decompress with.
+                #
+                rwire = rdata.to_wire()
+                if rwire != grdata.data:
+                    raise dns.exception.SyntaxError(
+                        "compressed data in "
+                        "generic syntax form "
+                        "of known rdatatype"
+                    )
+        if rdata is None:
+            rdata = cls.from_text(
+                rdclass, rdtype, tok, origin, relativize, relativize_to
+            )
+        token = tok.get_eol_as_token()
+        if token.comment is not None:
+            object.__setattr__(rdata, "rdcomment", token.comment)
+        return rdata
+
+
+def from_wire_parser(
+    rdclass: Union[dns.rdataclass.RdataClass, str],
+    rdtype: Union[dns.rdatatype.RdataType, str],
+    parser: dns.wire.Parser,
+    origin: Optional[dns.name.Name] = None,
+) -> Rdata:
+    """Build an rdata object from wire format
+
+    This function attempts to dynamically load a class which
+    implements the specified rdata class and type.  If there is no
+    class-and-type-specific implementation, the GenericRdata class
+    is used.
+
+    Once a class is chosen, its from_wire() class method is called
+    with the parameters to this function.
+
+    *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass.
+
+    *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype.
+
+    *parser*, a ``dns.wire.Parser``, the parser, which should be
+    restricted to the rdata length.
+
+    *origin*, a ``dns.name.Name`` (or ``None``).  If not ``None``,
+    then names will be relativized to this origin.
+
+    Returns an instance of the chosen Rdata subclass.
+    """
+
+    rdclass = dns.rdataclass.RdataClass.make(rdclass)
+    rdtype = dns.rdatatype.RdataType.make(rdtype)
+    cls = get_rdata_class(rdclass, rdtype)
+    with dns.exception.ExceptionWrapper(dns.exception.FormError):
+        return cls.from_wire_parser(rdclass, rdtype, parser, origin)
+
+
+def from_wire(
+    rdclass: Union[dns.rdataclass.RdataClass, str],
+    rdtype: Union[dns.rdatatype.RdataType, str],
+    wire: bytes,
+    current: int,
+    rdlen: int,
+    origin: Optional[dns.name.Name] = None,
+) -> Rdata:
+    """Build an rdata object from wire format
+
+    This function attempts to dynamically load a class which
+    implements the specified rdata class and type.  If there is no
+    class-and-type-specific implementation, the GenericRdata class
+    is used.
+
+    Once a class is chosen, its from_wire() class method is called
+    with the parameters to this function.
+
+    *rdclass*, an ``int``, the rdataclass.
+
+    *rdtype*, an ``int``, the rdatatype.
+
+    *wire*, a ``bytes``, the wire-format message.
+
+    *current*, an ``int``, the offset in wire of the beginning of
+    the rdata.
+
+    *rdlen*, an ``int``, the length of the wire-format rdata
+
+    *origin*, a ``dns.name.Name`` (or ``None``).  If not ``None``,
+    then names will be relativized to this origin.
+
+    Returns an instance of the chosen Rdata subclass.
+    """
+    parser = dns.wire.Parser(wire, current)
+    with parser.restrict_to(rdlen):
+        return from_wire_parser(rdclass, rdtype, parser, origin)
+
+
+class RdatatypeExists(dns.exception.DNSException):
+    """DNS rdatatype already exists."""
+
+    supp_kwargs = {"rdclass", "rdtype"}
+    fmt = (
+        "The rdata type with class {rdclass:d} and rdtype {rdtype:d} "
+        + "already exists."
+    )
+
+
+def register_type(
+    implementation: Any,
+    rdtype: int,
+    rdtype_text: str,
+    is_singleton: bool = False,
+    rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
+) -> None:
+    """Dynamically register a module to handle an rdatatype.
+
+    *implementation*, a module implementing the type in the usual dnspython
+    way.
+
+    *rdtype*, an ``int``, the rdatatype to register.
+
+    *rdtype_text*, a ``str``, the textual form of the rdatatype.
+
+    *is_singleton*, a ``bool``, indicating if the type is a singleton (i.e.
+    RRsets of the type can have only one member.)
+
+    *rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if
+    it applies to all classes.
+    """
+
+    rdtype = dns.rdatatype.RdataType.make(rdtype)
+    existing_cls = get_rdata_class(rdclass, rdtype)
+    if existing_cls != GenericRdata or dns.rdatatype.is_metatype(rdtype):
+        raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype)
+    _rdata_classes[(rdclass, rdtype)] = getattr(
+        implementation, rdtype_text.replace("-", "_")
+    )
+    dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton)