about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/dns/message.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/message.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/message.py1933
1 files changed, 1933 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/message.py b/.venv/lib/python3.12/site-packages/dns/message.py
new file mode 100644
index 00000000..e978a0a2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/message.py
@@ -0,0 +1,1933 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+# Copyright (C) 2001-2017 Nominum, Inc.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose with or without fee is hereby granted,
+# provided that the above copyright notice and this permission notice
+# appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""DNS Messages"""
+
+import contextlib
+import enum
+import io
+import time
+from typing import Any, Dict, List, Optional, Tuple, Union, cast
+
+import dns.edns
+import dns.entropy
+import dns.enum
+import dns.exception
+import dns.flags
+import dns.name
+import dns.opcode
+import dns.rcode
+import dns.rdata
+import dns.rdataclass
+import dns.rdatatype
+import dns.rdtypes.ANY.OPT
+import dns.rdtypes.ANY.TSIG
+import dns.renderer
+import dns.rrset
+import dns.tsig
+import dns.ttl
+import dns.wire
+
+
+class ShortHeader(dns.exception.FormError):
+    """The DNS packet passed to from_wire() is too short."""
+
+
+class TrailingJunk(dns.exception.FormError):
+    """The DNS packet passed to from_wire() has extra junk at the end of it."""
+
+
+class UnknownHeaderField(dns.exception.DNSException):
+    """The header field name was not recognized when converting from text
+    into a message."""
+
+
+class BadEDNS(dns.exception.FormError):
+    """An OPT record occurred somewhere other than
+    the additional data section."""
+
+
+class BadTSIG(dns.exception.FormError):
+    """A TSIG record occurred somewhere other than the end of
+    the additional data section."""
+
+
+class UnknownTSIGKey(dns.exception.DNSException):
+    """A TSIG with an unknown key was received."""
+
+
+class Truncated(dns.exception.DNSException):
+    """The truncated flag is set."""
+
+    supp_kwargs = {"message"}
+
+    # We do this as otherwise mypy complains about unexpected keyword argument
+    # idna_exception
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
+    def message(self):
+        """As much of the message as could be processed.
+
+        Returns a ``dns.message.Message``.
+        """
+        return self.kwargs["message"]
+
+
+class NotQueryResponse(dns.exception.DNSException):
+    """Message is not a response to a query."""
+
+
+class ChainTooLong(dns.exception.DNSException):
+    """The CNAME chain is too long."""
+
+
+class AnswerForNXDOMAIN(dns.exception.DNSException):
+    """The rcode is NXDOMAIN but an answer was found."""
+
+
+class NoPreviousName(dns.exception.SyntaxError):
+    """No previous name was known."""
+
+
+class MessageSection(dns.enum.IntEnum):
+    """Message sections"""
+
+    QUESTION = 0
+    ANSWER = 1
+    AUTHORITY = 2
+    ADDITIONAL = 3
+
+    @classmethod
+    def _maximum(cls):
+        return 3
+
+
+class MessageError:
+    def __init__(self, exception: Exception, offset: int):
+        self.exception = exception
+        self.offset = offset
+
+
+DEFAULT_EDNS_PAYLOAD = 1232
+MAX_CHAIN = 16
+
+IndexKeyType = Tuple[
+    int,
+    dns.name.Name,
+    dns.rdataclass.RdataClass,
+    dns.rdatatype.RdataType,
+    Optional[dns.rdatatype.RdataType],
+    Optional[dns.rdataclass.RdataClass],
+]
+IndexType = Dict[IndexKeyType, dns.rrset.RRset]
+SectionType = Union[int, str, List[dns.rrset.RRset]]
+
+
+class Message:
+    """A DNS message."""
+
+    _section_enum = MessageSection
+
+    def __init__(self, id: Optional[int] = None):
+        if id is None:
+            self.id = dns.entropy.random_16()
+        else:
+            self.id = id
+        self.flags = 0
+        self.sections: List[List[dns.rrset.RRset]] = [[], [], [], []]
+        self.opt: Optional[dns.rrset.RRset] = None
+        self.request_payload = 0
+        self.pad = 0
+        self.keyring: Any = None
+        self.tsig: Optional[dns.rrset.RRset] = None
+        self.request_mac = b""
+        self.xfr = False
+        self.origin: Optional[dns.name.Name] = None
+        self.tsig_ctx: Optional[Any] = None
+        self.index: IndexType = {}
+        self.errors: List[MessageError] = []
+        self.time = 0.0
+        self.wire: Optional[bytes] = None
+
+    @property
+    def question(self) -> List[dns.rrset.RRset]:
+        """The question section."""
+        return self.sections[0]
+
+    @question.setter
+    def question(self, v):
+        self.sections[0] = v
+
+    @property
+    def answer(self) -> List[dns.rrset.RRset]:
+        """The answer section."""
+        return self.sections[1]
+
+    @answer.setter
+    def answer(self, v):
+        self.sections[1] = v
+
+    @property
+    def authority(self) -> List[dns.rrset.RRset]:
+        """The authority section."""
+        return self.sections[2]
+
+    @authority.setter
+    def authority(self, v):
+        self.sections[2] = v
+
+    @property
+    def additional(self) -> List[dns.rrset.RRset]:
+        """The additional data section."""
+        return self.sections[3]
+
+    @additional.setter
+    def additional(self, v):
+        self.sections[3] = v
+
+    def __repr__(self):
+        return "<DNS message, ID " + repr(self.id) + ">"
+
+    def __str__(self):
+        return self.to_text()
+
+    def to_text(
+        self,
+        origin: Optional[dns.name.Name] = None,
+        relativize: bool = True,
+        **kw: Dict[str, Any],
+    ) -> str:
+        """Convert the message to text.
+
+        The *origin*, *relativize*, and any other keyword
+        arguments are passed to the RRset ``to_wire()`` method.
+
+        Returns a ``str``.
+        """
+
+        s = io.StringIO()
+        s.write("id %d\n" % self.id)
+        s.write(f"opcode {dns.opcode.to_text(self.opcode())}\n")
+        s.write(f"rcode {dns.rcode.to_text(self.rcode())}\n")
+        s.write(f"flags {dns.flags.to_text(self.flags)}\n")
+        if self.edns >= 0:
+            s.write(f"edns {self.edns}\n")
+            if self.ednsflags != 0:
+                s.write(f"eflags {dns.flags.edns_to_text(self.ednsflags)}\n")
+            s.write("payload %d\n" % self.payload)
+        for opt in self.options:
+            s.write(f"option {opt.to_text()}\n")
+        for name, which in self._section_enum.__members__.items():
+            s.write(f";{name}\n")
+            for rrset in self.section_from_number(which):
+                s.write(rrset.to_text(origin, relativize, **kw))
+                s.write("\n")
+        #
+        # We strip off the final \n so the caller can print the result without
+        # doing weird things to get around eccentricities in Python print
+        # formatting
+        #
+        return s.getvalue()[:-1]
+
+    def __eq__(self, other):
+        """Two messages are equal if they have the same content in the
+        header, question, answer, and authority sections.
+
+        Returns a ``bool``.
+        """
+
+        if not isinstance(other, Message):
+            return False
+        if self.id != other.id:
+            return False
+        if self.flags != other.flags:
+            return False
+        for i, section in enumerate(self.sections):
+            other_section = other.sections[i]
+            for n in section:
+                if n not in other_section:
+                    return False
+            for n in other_section:
+                if n not in section:
+                    return False
+        return True
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+    def is_response(self, other: "Message") -> bool:
+        """Is *other*, also a ``dns.message.Message``, a response to this
+        message?
+
+        Returns a ``bool``.
+        """
+
+        if (
+            other.flags & dns.flags.QR == 0
+            or self.id != other.id
+            or dns.opcode.from_flags(self.flags) != dns.opcode.from_flags(other.flags)
+        ):
+            return False
+        if other.rcode() in {
+            dns.rcode.FORMERR,
+            dns.rcode.SERVFAIL,
+            dns.rcode.NOTIMP,
+            dns.rcode.REFUSED,
+        }:
+            # We don't check the question section in these cases if
+            # the other question section is empty, even though they
+            # still really ought to have a question section.
+            if len(other.question) == 0:
+                return True
+        if dns.opcode.is_update(self.flags):
+            # This is assuming the "sender doesn't include anything
+            # from the update", but we don't care to check the other
+            # case, which is that all the sections are returned and
+            # identical.
+            return True
+        for n in self.question:
+            if n not in other.question:
+                return False
+        for n in other.question:
+            if n not in self.question:
+                return False
+        return True
+
+    def section_number(self, section: List[dns.rrset.RRset]) -> int:
+        """Return the "section number" of the specified section for use
+        in indexing.
+
+        *section* is one of the section attributes of this message.
+
+        Raises ``ValueError`` if the section isn't known.
+
+        Returns an ``int``.
+        """
+
+        for i, our_section in enumerate(self.sections):
+            if section is our_section:
+                return self._section_enum(i)
+        raise ValueError("unknown section")
+
+    def section_from_number(self, number: int) -> List[dns.rrset.RRset]:
+        """Return the section list associated with the specified section
+        number.
+
+        *number* is a section number `int` or the text form of a section
+        name.
+
+        Raises ``ValueError`` if the section isn't known.
+
+        Returns a ``list``.
+        """
+
+        section = self._section_enum.make(number)
+        return self.sections[section]
+
+    def find_rrset(
+        self,
+        section: SectionType,
+        name: dns.name.Name,
+        rdclass: dns.rdataclass.RdataClass,
+        rdtype: dns.rdatatype.RdataType,
+        covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
+        deleting: Optional[dns.rdataclass.RdataClass] = None,
+        create: bool = False,
+        force_unique: bool = False,
+        idna_codec: Optional[dns.name.IDNACodec] = None,
+    ) -> dns.rrset.RRset:
+        """Find the RRset with the given attributes in the specified section.
+
+        *section*, an ``int`` section number, a ``str`` section name, or one of
+        the section attributes of this message.  This specifies the
+        the section of the message to search.  For example::
+
+            my_message.find_rrset(my_message.answer, name, rdclass, rdtype)
+            my_message.find_rrset(dns.message.ANSWER, name, rdclass, rdtype)
+            my_message.find_rrset("ANSWER", name, rdclass, rdtype)
+
+        *name*, a ``dns.name.Name`` or ``str``, the name of the RRset.
+
+        *rdclass*, an ``int`` or ``str``, the class of the RRset.
+
+        *rdtype*, an ``int`` or ``str``, the type of the RRset.
+
+        *covers*, an ``int`` or ``str``, the covers value of the RRset.
+        The default is ``dns.rdatatype.NONE``.
+
+        *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the
+        RRset.  The default is ``None``.
+
+        *create*, a ``bool``.  If ``True``, create the RRset if it is not found.
+        The created RRset is appended to *section*.
+
+        *force_unique*, a ``bool``.  If ``True`` and *create* is also ``True``,
+        create a new RRset regardless of whether a matching RRset exists
+        already.  The default is ``False``.  This is useful when creating
+        DDNS Update messages, as order matters for them.
+
+        *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
+        encoder/decoder.  If ``None``, the default IDNA 2003 encoder/decoder
+        is used.
+
+        Raises ``KeyError`` if the RRset was not found and create was
+        ``False``.
+
+        Returns a ``dns.rrset.RRset object``.
+        """
+
+        if isinstance(section, int):
+            section_number = section
+            section = self.section_from_number(section_number)
+        elif isinstance(section, str):
+            section_number = self._section_enum.from_text(section)
+            section = self.section_from_number(section_number)
+        else:
+            section_number = self.section_number(section)
+        if isinstance(name, str):
+            name = dns.name.from_text(name, idna_codec=idna_codec)
+        rdtype = dns.rdatatype.RdataType.make(rdtype)
+        rdclass = dns.rdataclass.RdataClass.make(rdclass)
+        covers = dns.rdatatype.RdataType.make(covers)
+        if deleting is not None:
+            deleting = dns.rdataclass.RdataClass.make(deleting)
+        key = (section_number, name, rdclass, rdtype, covers, deleting)
+        if not force_unique:
+            if self.index is not None:
+                rrset = self.index.get(key)
+                if rrset is not None:
+                    return rrset
+            else:
+                for rrset in section:
+                    if rrset.full_match(name, rdclass, rdtype, covers, deleting):
+                        return rrset
+        if not create:
+            raise KeyError
+        rrset = dns.rrset.RRset(name, rdclass, rdtype, covers, deleting)
+        section.append(rrset)
+        if self.index is not None:
+            self.index[key] = rrset
+        return rrset
+
+    def get_rrset(
+        self,
+        section: SectionType,
+        name: dns.name.Name,
+        rdclass: dns.rdataclass.RdataClass,
+        rdtype: dns.rdatatype.RdataType,
+        covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
+        deleting: Optional[dns.rdataclass.RdataClass] = None,
+        create: bool = False,
+        force_unique: bool = False,
+        idna_codec: Optional[dns.name.IDNACodec] = None,
+    ) -> Optional[dns.rrset.RRset]:
+        """Get the RRset with the given attributes in the specified section.
+
+        If the RRset is not found, None is returned.
+
+        *section*, an ``int`` section number, a ``str`` section name, or one of
+        the section attributes of this message.  This specifies the
+        the section of the message to search.  For example::
+
+            my_message.get_rrset(my_message.answer, name, rdclass, rdtype)
+            my_message.get_rrset(dns.message.ANSWER, name, rdclass, rdtype)
+            my_message.get_rrset("ANSWER", name, rdclass, rdtype)
+
+        *name*, a ``dns.name.Name`` or ``str``, the name of the RRset.
+
+        *rdclass*, an ``int`` or ``str``, the class of the RRset.
+
+        *rdtype*, an ``int`` or ``str``, the type of the RRset.
+
+        *covers*, an ``int`` or ``str``, the covers value of the RRset.
+        The default is ``dns.rdatatype.NONE``.
+
+        *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the
+        RRset.  The default is ``None``.
+
+        *create*, a ``bool``.  If ``True``, create the RRset if it is not found.
+        The created RRset is appended to *section*.
+
+        *force_unique*, a ``bool``.  If ``True`` and *create* is also ``True``,
+        create a new RRset regardless of whether a matching RRset exists
+        already.  The default is ``False``.  This is useful when creating
+        DDNS Update messages, as order matters for them.
+
+        *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
+        encoder/decoder.  If ``None``, the default IDNA 2003 encoder/decoder
+        is used.
+
+        Returns a ``dns.rrset.RRset object`` or ``None``.
+        """
+
+        try:
+            rrset = self.find_rrset(
+                section,
+                name,
+                rdclass,
+                rdtype,
+                covers,
+                deleting,
+                create,
+                force_unique,
+                idna_codec,
+            )
+        except KeyError:
+            rrset = None
+        return rrset
+
+    def section_count(self, section: SectionType) -> int:
+        """Returns the number of records in the specified section.
+
+        *section*, an ``int`` section number, a ``str`` section name, or one of
+        the section attributes of this message.  This specifies the
+        the section of the message to count.  For example::
+
+            my_message.section_count(my_message.answer)
+            my_message.section_count(dns.message.ANSWER)
+            my_message.section_count("ANSWER")
+        """
+
+        if isinstance(section, int):
+            section_number = section
+            section = self.section_from_number(section_number)
+        elif isinstance(section, str):
+            section_number = self._section_enum.from_text(section)
+            section = self.section_from_number(section_number)
+        else:
+            section_number = self.section_number(section)
+        count = sum(max(1, len(rrs)) for rrs in section)
+        if section_number == MessageSection.ADDITIONAL:
+            if self.opt is not None:
+                count += 1
+            if self.tsig is not None:
+                count += 1
+        return count
+
+    def _compute_opt_reserve(self) -> int:
+        """Compute the size required for the OPT RR, padding excluded"""
+        if not self.opt:
+            return 0
+        # 1 byte for the root name, 10 for the standard RR fields
+        size = 11
+        # This would be more efficient if options had a size() method, but we won't
+        # worry about that for now.  We also don't worry if there is an existing padding
+        # option, as it is unlikely and probably harmless, as the worst case is that we
+        # may add another, and this seems to be legal.
+        for option in self.opt[0].options:
+            wire = option.to_wire()
+            # We add 4 here to account for the option type and length
+            size += len(wire) + 4
+        if self.pad:
+            # Padding will be added, so again add the option type and length.
+            size += 4
+        return size
+
+    def _compute_tsig_reserve(self) -> int:
+        """Compute the size required for the TSIG RR"""
+        # This would be more efficient if TSIGs had a size method, but we won't
+        # worry about for now.  Also, we can't really cope with the potential
+        # compressibility of the TSIG owner name, so we estimate with the uncompressed
+        # size.  We will disable compression when TSIG and padding are both is active
+        # so that the padding comes out right.
+        if not self.tsig:
+            return 0
+        f = io.BytesIO()
+        self.tsig.to_wire(f)
+        return len(f.getvalue())
+
+    def to_wire(
+        self,
+        origin: Optional[dns.name.Name] = None,
+        max_size: int = 0,
+        multi: bool = False,
+        tsig_ctx: Optional[Any] = None,
+        prepend_length: bool = False,
+        prefer_truncation: bool = False,
+        **kw: Dict[str, Any],
+    ) -> bytes:
+        """Return a string containing the message in DNS compressed wire
+        format.
+
+        Additional keyword arguments are passed to the RRset ``to_wire()``
+        method.
+
+        *origin*, a ``dns.name.Name`` or ``None``, the origin to be appended
+        to any relative names.  If ``None``, and the message has an origin
+        attribute that is not ``None``, then it will be used.
+
+        *max_size*, an ``int``, the maximum size of the wire format
+        output; default is 0, which means "the message's request
+        payload, if nonzero, or 65535".
+
+        *multi*, a ``bool``, should be set to ``True`` if this message is
+        part of a multiple message sequence.
+
+        *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the
+        ongoing TSIG context, used when signing zone transfers.
+
+        *prepend_length*, a ``bool``, should be set to ``True`` if the caller
+        wants the message length prepended to the message itself.  This is
+        useful for messages sent over TCP, TLS (DoT), or QUIC (DoQ).
+
+        *prefer_truncation*, a ``bool``, should be set to ``True`` if the caller
+        wants the message to be truncated if it would otherwise exceed the
+        maximum length.  If the truncation occurs before the additional section,
+        the TC bit will be set.
+
+        Raises ``dns.exception.TooBig`` if *max_size* was exceeded.
+
+        Returns a ``bytes``.
+        """
+
+        if origin is None and self.origin is not None:
+            origin = self.origin
+        if max_size == 0:
+            if self.request_payload != 0:
+                max_size = self.request_payload
+            else:
+                max_size = 65535
+        if max_size < 512:
+            max_size = 512
+        elif max_size > 65535:
+            max_size = 65535
+        r = dns.renderer.Renderer(self.id, self.flags, max_size, origin)
+        opt_reserve = self._compute_opt_reserve()
+        r.reserve(opt_reserve)
+        tsig_reserve = self._compute_tsig_reserve()
+        r.reserve(tsig_reserve)
+        try:
+            for rrset in self.question:
+                r.add_question(rrset.name, rrset.rdtype, rrset.rdclass)
+            for rrset in self.answer:
+                r.add_rrset(dns.renderer.ANSWER, rrset, **kw)
+            for rrset in self.authority:
+                r.add_rrset(dns.renderer.AUTHORITY, rrset, **kw)
+            for rrset in self.additional:
+                r.add_rrset(dns.renderer.ADDITIONAL, rrset, **kw)
+        except dns.exception.TooBig:
+            if prefer_truncation:
+                if r.section < dns.renderer.ADDITIONAL:
+                    r.flags |= dns.flags.TC
+            else:
+                raise
+        r.release_reserved()
+        if self.opt is not None:
+            r.add_opt(self.opt, self.pad, opt_reserve, tsig_reserve)
+        r.write_header()
+        if self.tsig is not None:
+            (new_tsig, ctx) = dns.tsig.sign(
+                r.get_wire(),
+                self.keyring,
+                self.tsig[0],
+                int(time.time()),
+                self.request_mac,
+                tsig_ctx,
+                multi,
+            )
+            self.tsig.clear()
+            self.tsig.add(new_tsig)
+            r.add_rrset(dns.renderer.ADDITIONAL, self.tsig)
+            r.write_header()
+            if multi:
+                self.tsig_ctx = ctx
+        wire = r.get_wire()
+        self.wire = wire
+        if prepend_length:
+            wire = len(wire).to_bytes(2, "big") + wire
+        return wire
+
+    @staticmethod
+    def _make_tsig(
+        keyname, algorithm, time_signed, fudge, mac, original_id, error, other
+    ):
+        tsig = dns.rdtypes.ANY.TSIG.TSIG(
+            dns.rdataclass.ANY,
+            dns.rdatatype.TSIG,
+            algorithm,
+            time_signed,
+            fudge,
+            mac,
+            original_id,
+            error,
+            other,
+        )
+        return dns.rrset.from_rdata(keyname, 0, tsig)
+
+    def use_tsig(
+        self,
+        keyring: Any,
+        keyname: Optional[Union[dns.name.Name, str]] = None,
+        fudge: int = 300,
+        original_id: Optional[int] = None,
+        tsig_error: int = 0,
+        other_data: bytes = b"",
+        algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm,
+    ) -> None:
+        """When sending, a TSIG signature using the specified key
+        should be added.
+
+        *key*, a ``dns.tsig.Key`` is the key to use.  If a key is specified,
+        the *keyring* and *algorithm* fields are not used.
+
+        *keyring*, a ``dict``, ``callable`` or ``dns.tsig.Key``, is either
+        the TSIG keyring or key to use.
+
+        The format of a keyring dict is a mapping from TSIG key name, as
+        ``dns.name.Name`` to ``dns.tsig.Key`` or a TSIG secret, a ``bytes``.
+        If a ``dict`` *keyring* is specified but a *keyname* is not, the key
+        used will be the first key in the *keyring*.  Note that the order of
+        keys in a dictionary is not defined, so applications should supply a
+        keyname when a ``dict`` keyring is used, unless they know the keyring
+        contains only one key.  If a ``callable`` keyring is specified, the
+        callable will be called with the message and the keyname, and is
+        expected to return a key.
+
+        *keyname*, a ``dns.name.Name``, ``str`` or ``None``, the name of
+        this TSIG key to use; defaults to ``None``.  If *keyring* is a
+        ``dict``, the key must be defined in it.  If *keyring* is a
+        ``dns.tsig.Key``, this is ignored.
+
+        *fudge*, an ``int``, the TSIG time fudge.
+
+        *original_id*, an ``int``, the TSIG original id.  If ``None``,
+        the message's id is used.
+
+        *tsig_error*, an ``int``, the TSIG error code.
+
+        *other_data*, a ``bytes``, the TSIG other data.
+
+        *algorithm*, a ``dns.name.Name`` or ``str``, the TSIG algorithm to use.  This is
+        only used if *keyring* is a ``dict``, and the key entry is a ``bytes``.
+        """
+
+        if isinstance(keyring, dns.tsig.Key):
+            key = keyring
+            keyname = key.name
+        elif callable(keyring):
+            key = keyring(self, keyname)
+        else:
+            if isinstance(keyname, str):
+                keyname = dns.name.from_text(keyname)
+            if keyname is None:
+                keyname = next(iter(keyring))
+            key = keyring[keyname]
+            if isinstance(key, bytes):
+                key = dns.tsig.Key(keyname, key, algorithm)
+        self.keyring = key
+        if original_id is None:
+            original_id = self.id
+        self.tsig = self._make_tsig(
+            keyname,
+            self.keyring.algorithm,
+            0,
+            fudge,
+            b"\x00" * dns.tsig.mac_sizes[self.keyring.algorithm],
+            original_id,
+            tsig_error,
+            other_data,
+        )
+
+    @property
+    def keyname(self) -> Optional[dns.name.Name]:
+        if self.tsig:
+            return self.tsig.name
+        else:
+            return None
+
+    @property
+    def keyalgorithm(self) -> Optional[dns.name.Name]:
+        if self.tsig:
+            return self.tsig[0].algorithm
+        else:
+            return None
+
+    @property
+    def mac(self) -> Optional[bytes]:
+        if self.tsig:
+            return self.tsig[0].mac
+        else:
+            return None
+
+    @property
+    def tsig_error(self) -> Optional[int]:
+        if self.tsig:
+            return self.tsig[0].error
+        else:
+            return None
+
+    @property
+    def had_tsig(self) -> bool:
+        return bool(self.tsig)
+
+    @staticmethod
+    def _make_opt(flags=0, payload=DEFAULT_EDNS_PAYLOAD, options=None):
+        opt = dns.rdtypes.ANY.OPT.OPT(payload, dns.rdatatype.OPT, options or ())
+        return dns.rrset.from_rdata(dns.name.root, int(flags), opt)
+
+    def use_edns(
+        self,
+        edns: Optional[Union[int, bool]] = 0,
+        ednsflags: int = 0,
+        payload: int = DEFAULT_EDNS_PAYLOAD,
+        request_payload: Optional[int] = None,
+        options: Optional[List[dns.edns.Option]] = None,
+        pad: int = 0,
+    ) -> None:
+        """Configure EDNS behavior.
+
+        *edns*, an ``int``, is the EDNS level to use.  Specifying ``None``, ``False``,
+        or ``-1`` means "do not use EDNS", and in this case the other parameters are
+        ignored.  Specifying ``True`` is equivalent to specifying 0, i.e. "use EDNS0".
+
+        *ednsflags*, an ``int``, the EDNS flag values.
+
+        *payload*, an ``int``, is the EDNS sender's payload field, which is the maximum
+        size of UDP datagram the sender can handle.  I.e. how big a response to this
+        message can be.
+
+        *request_payload*, an ``int``, is the EDNS payload size to use when sending this
+        message.  If not specified, defaults to the value of *payload*.
+
+        *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS options.
+
+        *pad*, a non-negative ``int``.  If 0, the default, do not pad; otherwise add
+        padding bytes to make the message size a multiple of *pad*.  Note that if
+        padding is non-zero, an EDNS PADDING option will always be added to the
+        message.
+        """
+
+        if edns is None or edns is False:
+            edns = -1
+        elif edns is True:
+            edns = 0
+        if edns < 0:
+            self.opt = None
+            self.request_payload = 0
+        else:
+            # make sure the EDNS version in ednsflags agrees with edns
+            ednsflags &= 0xFF00FFFF
+            ednsflags |= edns << 16
+            if options is None:
+                options = []
+            self.opt = self._make_opt(ednsflags, payload, options)
+            if request_payload is None:
+                request_payload = payload
+            self.request_payload = request_payload
+            if pad < 0:
+                raise ValueError("pad must be non-negative")
+            self.pad = pad
+
+    @property
+    def edns(self) -> int:
+        if self.opt:
+            return (self.ednsflags & 0xFF0000) >> 16
+        else:
+            return -1
+
+    @property
+    def ednsflags(self) -> int:
+        if self.opt:
+            return self.opt.ttl
+        else:
+            return 0
+
+    @ednsflags.setter
+    def ednsflags(self, v):
+        if self.opt:
+            self.opt.ttl = v
+        elif v:
+            self.opt = self._make_opt(v)
+
+    @property
+    def payload(self) -> int:
+        if self.opt:
+            return self.opt[0].payload
+        else:
+            return 0
+
+    @property
+    def options(self) -> Tuple:
+        if self.opt:
+            return self.opt[0].options
+        else:
+            return ()
+
+    def want_dnssec(self, wanted: bool = True) -> None:
+        """Enable or disable 'DNSSEC desired' flag in requests.
+
+        *wanted*, a ``bool``.  If ``True``, then DNSSEC data is
+        desired in the response, EDNS is enabled if required, and then
+        the DO bit is set.  If ``False``, the DO bit is cleared if
+        EDNS is enabled.
+        """
+
+        if wanted:
+            self.ednsflags |= dns.flags.DO
+        elif self.opt:
+            self.ednsflags &= ~int(dns.flags.DO)
+
+    def rcode(self) -> dns.rcode.Rcode:
+        """Return the rcode.
+
+        Returns a ``dns.rcode.Rcode``.
+        """
+        return dns.rcode.from_flags(int(self.flags), int(self.ednsflags))
+
+    def set_rcode(self, rcode: dns.rcode.Rcode) -> None:
+        """Set the rcode.
+
+        *rcode*, a ``dns.rcode.Rcode``, is the rcode to set.
+        """
+        (value, evalue) = dns.rcode.to_flags(rcode)
+        self.flags &= 0xFFF0
+        self.flags |= value
+        self.ednsflags &= 0x00FFFFFF
+        self.ednsflags |= evalue
+
+    def opcode(self) -> dns.opcode.Opcode:
+        """Return the opcode.
+
+        Returns a ``dns.opcode.Opcode``.
+        """
+        return dns.opcode.from_flags(int(self.flags))
+
+    def set_opcode(self, opcode: dns.opcode.Opcode) -> None:
+        """Set the opcode.
+
+        *opcode*, a ``dns.opcode.Opcode``, is the opcode to set.
+        """
+        self.flags &= 0x87FF
+        self.flags |= dns.opcode.to_flags(opcode)
+
+    def get_options(self, otype: dns.edns.OptionType) -> List[dns.edns.Option]:
+        """Return the list of options of the specified type."""
+        return [option for option in self.options if option.otype == otype]
+
+    def extended_errors(self) -> List[dns.edns.EDEOption]:
+        """Return the list of Extended DNS Error (EDE) options in the message"""
+        return cast(List[dns.edns.EDEOption], self.get_options(dns.edns.OptionType.EDE))
+
+    def _get_one_rr_per_rrset(self, value):
+        # What the caller picked is fine.
+        return value
+
+    # pylint: disable=unused-argument
+
+    def _parse_rr_header(self, section, name, rdclass, rdtype):
+        return (rdclass, rdtype, None, False)
+
+    # pylint: enable=unused-argument
+
+    def _parse_special_rr_header(self, section, count, position, name, rdclass, rdtype):
+        if rdtype == dns.rdatatype.OPT:
+            if (
+                section != MessageSection.ADDITIONAL
+                or self.opt
+                or name != dns.name.root
+            ):
+                raise BadEDNS
+        elif rdtype == dns.rdatatype.TSIG:
+            if (
+                section != MessageSection.ADDITIONAL
+                or rdclass != dns.rdatatype.ANY
+                or position != count - 1
+            ):
+                raise BadTSIG
+        return (rdclass, rdtype, None, False)
+
+
+class ChainingResult:
+    """The result of a call to dns.message.QueryMessage.resolve_chaining().
+
+    The ``answer`` attribute is the answer RRSet, or ``None`` if it doesn't
+    exist.
+
+    The ``canonical_name`` attribute is the canonical name after all
+    chaining has been applied (this is the same name as ``rrset.name`` in cases
+    where rrset is not ``None``).
+
+    The ``minimum_ttl`` attribute is the minimum TTL, i.e. the TTL to
+    use if caching the data.  It is the smallest of all the CNAME TTLs
+    and either the answer TTL if it exists or the SOA TTL and SOA
+    minimum values for negative answers.
+
+    The ``cnames`` attribute is a list of all the CNAME RRSets followed to
+    get to the canonical name.
+    """
+
+    def __init__(
+        self,
+        canonical_name: dns.name.Name,
+        answer: Optional[dns.rrset.RRset],
+        minimum_ttl: int,
+        cnames: List[dns.rrset.RRset],
+    ):
+        self.canonical_name = canonical_name
+        self.answer = answer
+        self.minimum_ttl = minimum_ttl
+        self.cnames = cnames
+
+
+class QueryMessage(Message):
+    def resolve_chaining(self) -> ChainingResult:
+        """Follow the CNAME chain in the response to determine the answer
+        RRset.
+
+        Raises ``dns.message.NotQueryResponse`` if the message is not
+        a response.
+
+        Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long.
+
+        Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN
+        but an answer was found.
+
+        Raises ``dns.exception.FormError`` if the question count is not 1.
+
+        Returns a ChainingResult object.
+        """
+        if self.flags & dns.flags.QR == 0:
+            raise NotQueryResponse
+        if len(self.question) != 1:
+            raise dns.exception.FormError
+        question = self.question[0]
+        qname = question.name
+        min_ttl = dns.ttl.MAX_TTL
+        answer = None
+        count = 0
+        cnames = []
+        while count < MAX_CHAIN:
+            try:
+                answer = self.find_rrset(
+                    self.answer, qname, question.rdclass, question.rdtype
+                )
+                min_ttl = min(min_ttl, answer.ttl)
+                break
+            except KeyError:
+                if question.rdtype != dns.rdatatype.CNAME:
+                    try:
+                        crrset = self.find_rrset(
+                            self.answer, qname, question.rdclass, dns.rdatatype.CNAME
+                        )
+                        cnames.append(crrset)
+                        min_ttl = min(min_ttl, crrset.ttl)
+                        for rd in crrset:
+                            qname = rd.target
+                            break
+                        count += 1
+                        continue
+                    except KeyError:
+                        # Exit the chaining loop
+                        break
+                else:
+                    # Exit the chaining loop
+                    break
+        if count >= MAX_CHAIN:
+            raise ChainTooLong
+        if self.rcode() == dns.rcode.NXDOMAIN and answer is not None:
+            raise AnswerForNXDOMAIN
+        if answer is None:
+            # Further minimize the TTL with NCACHE.
+            auname = qname
+            while True:
+                # Look for an SOA RR whose owner name is a superdomain
+                # of qname.
+                try:
+                    srrset = self.find_rrset(
+                        self.authority, auname, question.rdclass, dns.rdatatype.SOA
+                    )
+                    min_ttl = min(min_ttl, srrset.ttl, srrset[0].minimum)
+                    break
+                except KeyError:
+                    try:
+                        auname = auname.parent()
+                    except dns.name.NoParent:
+                        break
+        return ChainingResult(qname, answer, min_ttl, cnames)
+
+    def canonical_name(self) -> dns.name.Name:
+        """Return the canonical name of the first name in the question
+        section.
+
+        Raises ``dns.message.NotQueryResponse`` if the message is not
+        a response.
+
+        Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long.
+
+        Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN
+        but an answer was found.
+
+        Raises ``dns.exception.FormError`` if the question count is not 1.
+        """
+        return self.resolve_chaining().canonical_name
+
+
+def _maybe_import_update():
+    # We avoid circular imports by doing this here.  We do it in another
+    # function as doing it in _message_factory_from_opcode() makes "dns"
+    # a local symbol, and the first line fails :)
+
+    # pylint: disable=redefined-outer-name,import-outside-toplevel,unused-import
+    import dns.update  # noqa: F401
+
+
+def _message_factory_from_opcode(opcode):
+    if opcode == dns.opcode.QUERY:
+        return QueryMessage
+    elif opcode == dns.opcode.UPDATE:
+        _maybe_import_update()
+        return dns.update.UpdateMessage
+    else:
+        return Message
+
+
+class _WireReader:
+    """Wire format reader.
+
+    parser: the binary parser
+    message: The message object being built
+    initialize_message: Callback to set message parsing options
+    question_only: Are we only reading the question?
+    one_rr_per_rrset: Put each RR into its own RRset?
+    keyring: TSIG keyring
+    ignore_trailing: Ignore trailing junk at end of request?
+    multi: Is this message part of a multi-message sequence?
+    DNS dynamic updates.
+    continue_on_error: try to extract as much information as possible from
+    the message, accumulating MessageErrors in the *errors* attribute instead of
+    raising them.
+    """
+
+    def __init__(
+        self,
+        wire,
+        initialize_message,
+        question_only=False,
+        one_rr_per_rrset=False,
+        ignore_trailing=False,
+        keyring=None,
+        multi=False,
+        continue_on_error=False,
+    ):
+        self.parser = dns.wire.Parser(wire)
+        self.message = None
+        self.initialize_message = initialize_message
+        self.question_only = question_only
+        self.one_rr_per_rrset = one_rr_per_rrset
+        self.ignore_trailing = ignore_trailing
+        self.keyring = keyring
+        self.multi = multi
+        self.continue_on_error = continue_on_error
+        self.errors = []
+
+    def _get_question(self, section_number, qcount):
+        """Read the next *qcount* records from the wire data and add them to
+        the question section.
+        """
+        assert self.message is not None
+        section = self.message.sections[section_number]
+        for _ in range(qcount):
+            qname = self.parser.get_name(self.message.origin)
+            (rdtype, rdclass) = self.parser.get_struct("!HH")
+            (rdclass, rdtype, _, _) = self.message._parse_rr_header(
+                section_number, qname, rdclass, rdtype
+            )
+            self.message.find_rrset(
+                section, qname, rdclass, rdtype, create=True, force_unique=True
+            )
+
+    def _add_error(self, e):
+        self.errors.append(MessageError(e, self.parser.current))
+
+    def _get_section(self, section_number, count):
+        """Read the next I{count} records from the wire data and add them to
+        the specified section.
+
+        section_number: the section of the message to which to add records
+        count: the number of records to read
+        """
+        assert self.message is not None
+        section = self.message.sections[section_number]
+        force_unique = self.one_rr_per_rrset
+        for i in range(count):
+            rr_start = self.parser.current
+            absolute_name = self.parser.get_name()
+            if self.message.origin is not None:
+                name = absolute_name.relativize(self.message.origin)
+            else:
+                name = absolute_name
+            (rdtype, rdclass, ttl, rdlen) = self.parser.get_struct("!HHIH")
+            if rdtype in (dns.rdatatype.OPT, dns.rdatatype.TSIG):
+                (
+                    rdclass,
+                    rdtype,
+                    deleting,
+                    empty,
+                ) = self.message._parse_special_rr_header(
+                    section_number, count, i, name, rdclass, rdtype
+                )
+            else:
+                (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header(
+                    section_number, name, rdclass, rdtype
+                )
+            rdata_start = self.parser.current
+            try:
+                if empty:
+                    if rdlen > 0:
+                        raise dns.exception.FormError
+                    rd = None
+                    covers = dns.rdatatype.NONE
+                else:
+                    with self.parser.restrict_to(rdlen):
+                        rd = dns.rdata.from_wire_parser(
+                            rdclass, rdtype, self.parser, self.message.origin
+                        )
+                    covers = rd.covers()
+                if self.message.xfr and rdtype == dns.rdatatype.SOA:
+                    force_unique = True
+                if rdtype == dns.rdatatype.OPT:
+                    self.message.opt = dns.rrset.from_rdata(name, ttl, rd)
+                elif rdtype == dns.rdatatype.TSIG:
+                    if self.keyring is None or self.keyring is True:
+                        raise UnknownTSIGKey("got signed message without keyring")
+                    elif isinstance(self.keyring, dict):
+                        key = self.keyring.get(absolute_name)
+                        if isinstance(key, bytes):
+                            key = dns.tsig.Key(absolute_name, key, rd.algorithm)
+                    elif callable(self.keyring):
+                        key = self.keyring(self.message, absolute_name)
+                    else:
+                        key = self.keyring
+                    if key is None:
+                        raise UnknownTSIGKey(f"key '{name}' unknown")
+                    if key:
+                        self.message.keyring = key
+                        self.message.tsig_ctx = dns.tsig.validate(
+                            self.parser.wire,
+                            key,
+                            absolute_name,
+                            rd,
+                            int(time.time()),
+                            self.message.request_mac,
+                            rr_start,
+                            self.message.tsig_ctx,
+                            self.multi,
+                        )
+                    self.message.tsig = dns.rrset.from_rdata(absolute_name, 0, rd)
+                else:
+                    rrset = self.message.find_rrset(
+                        section,
+                        name,
+                        rdclass,
+                        rdtype,
+                        covers,
+                        deleting,
+                        True,
+                        force_unique,
+                    )
+                    if rd is not None:
+                        if ttl > 0x7FFFFFFF:
+                            ttl = 0
+                        rrset.add(rd, ttl)
+            except Exception as e:
+                if self.continue_on_error:
+                    self._add_error(e)
+                    self.parser.seek(rdata_start + rdlen)
+                else:
+                    raise
+
+    def read(self):
+        """Read a wire format DNS message and build a dns.message.Message
+        object."""
+
+        if self.parser.remaining() < 12:
+            raise ShortHeader
+        (id, flags, qcount, ancount, aucount, adcount) = self.parser.get_struct(
+            "!HHHHHH"
+        )
+        factory = _message_factory_from_opcode(dns.opcode.from_flags(flags))
+        self.message = factory(id=id)
+        self.message.flags = dns.flags.Flag(flags)
+        self.message.wire = self.parser.wire
+        self.initialize_message(self.message)
+        self.one_rr_per_rrset = self.message._get_one_rr_per_rrset(
+            self.one_rr_per_rrset
+        )
+        try:
+            self._get_question(MessageSection.QUESTION, qcount)
+            if self.question_only:
+                return self.message
+            self._get_section(MessageSection.ANSWER, ancount)
+            self._get_section(MessageSection.AUTHORITY, aucount)
+            self._get_section(MessageSection.ADDITIONAL, adcount)
+            if not self.ignore_trailing and self.parser.remaining() != 0:
+                raise TrailingJunk
+            if self.multi and self.message.tsig_ctx and not self.message.had_tsig:
+                self.message.tsig_ctx.update(self.parser.wire)
+        except Exception as e:
+            if self.continue_on_error:
+                self._add_error(e)
+            else:
+                raise
+        return self.message
+
+
+def from_wire(
+    wire: bytes,
+    keyring: Optional[Any] = None,
+    request_mac: Optional[bytes] = b"",
+    xfr: bool = False,
+    origin: Optional[dns.name.Name] = None,
+    tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]] = None,
+    multi: bool = False,
+    question_only: bool = False,
+    one_rr_per_rrset: bool = False,
+    ignore_trailing: bool = False,
+    raise_on_truncation: bool = False,
+    continue_on_error: bool = False,
+) -> Message:
+    """Convert a DNS wire format message into a message object.
+
+    *keyring*, a ``dns.tsig.Key``, ``dict``, ``bool``, or ``None``, the key or keyring
+    to use if the message is signed.  If ``None`` or ``True``, then trying to decode
+    a message with a TSIG will fail as it cannot be validated.  If ``False``, then
+    TSIG validation is disabled.
+
+    *request_mac*, a ``bytes`` or ``None``.  If the message is a response to a
+    TSIG-signed request, *request_mac* should be set to the MAC of that request.
+
+    *xfr*, a ``bool``, should be set to ``True`` if this message is part of a zone
+    transfer.
+
+    *origin*, a ``dns.name.Name`` or ``None``.  If the message is part of a zone
+    transfer, *origin* should be the origin name of the zone.  If not ``None``, names
+    will be relativized to the origin.
+
+    *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the ongoing TSIG
+    context, used when validating zone transfers.
+
+    *multi*, a ``bool``, should be set to ``True`` if this message is part of a multiple
+    message sequence.
+
+    *question_only*, a ``bool``.  If ``True``, read only up to the end of the question
+    section.
+
+    *one_rr_per_rrset*, a ``bool``.  If ``True``, put each RR into its own RRset.
+
+    *ignore_trailing*, a ``bool``.  If ``True``, ignore trailing junk at end of the
+    message.
+
+    *raise_on_truncation*, a ``bool``.  If ``True``, raise an exception if the TC bit is
+    set.
+
+    *continue_on_error*, a ``bool``.  If ``True``, try to continue parsing even if
+    errors occur.  Erroneous rdata will be ignored.  Errors will be accumulated as a
+    list of MessageError objects in the message's ``errors`` attribute.  This option is
+    recommended only for DNS analysis tools, or for use in a server as part of an error
+    handling path.  The default is ``False``.
+
+    Raises ``dns.message.ShortHeader`` if the message is less than 12 octets long.
+
+    Raises ``dns.message.TrailingJunk`` if there were octets in the message past the end
+    of the proper DNS message, and *ignore_trailing* is ``False``.
+
+    Raises ``dns.message.BadEDNS`` if an OPT record was in the wrong section, or
+    occurred more than once.
+
+    Raises ``dns.message.BadTSIG`` if a TSIG record was not the last record of the
+    additional data section.
+
+    Raises ``dns.message.Truncated`` if the TC flag is set and *raise_on_truncation* is
+    ``True``.
+
+    Returns a ``dns.message.Message``.
+    """
+
+    # We permit None for request_mac solely for backwards compatibility
+    if request_mac is None:
+        request_mac = b""
+
+    def initialize_message(message):
+        message.request_mac = request_mac
+        message.xfr = xfr
+        message.origin = origin
+        message.tsig_ctx = tsig_ctx
+
+    reader = _WireReader(
+        wire,
+        initialize_message,
+        question_only,
+        one_rr_per_rrset,
+        ignore_trailing,
+        keyring,
+        multi,
+        continue_on_error,
+    )
+    try:
+        m = reader.read()
+    except dns.exception.FormError:
+        if (
+            reader.message
+            and (reader.message.flags & dns.flags.TC)
+            and raise_on_truncation
+        ):
+            raise Truncated(message=reader.message)
+        else:
+            raise
+    # Reading a truncated message might not have any errors, so we
+    # have to do this check here too.
+    if m.flags & dns.flags.TC and raise_on_truncation:
+        raise Truncated(message=m)
+    if continue_on_error:
+        m.errors = reader.errors
+
+    return m
+
+
+class _TextReader:
+    """Text format reader.
+
+    tok: the tokenizer.
+    message: The message object being built.
+    DNS dynamic updates.
+    last_name: The most recently read name when building a message object.
+    one_rr_per_rrset: Put each RR into its own RRset?
+    origin: The origin for relative names
+    relativize: relativize names?
+    relativize_to: the origin to relativize to.
+    """
+
+    def __init__(
+        self,
+        text,
+        idna_codec,
+        one_rr_per_rrset=False,
+        origin=None,
+        relativize=True,
+        relativize_to=None,
+    ):
+        self.message = None
+        self.tok = dns.tokenizer.Tokenizer(text, idna_codec=idna_codec)
+        self.last_name = None
+        self.one_rr_per_rrset = one_rr_per_rrset
+        self.origin = origin
+        self.relativize = relativize
+        self.relativize_to = relativize_to
+        self.id = None
+        self.edns = -1
+        self.ednsflags = 0
+        self.payload = DEFAULT_EDNS_PAYLOAD
+        self.rcode = None
+        self.opcode = dns.opcode.QUERY
+        self.flags = 0
+
+    def _header_line(self, _):
+        """Process one line from the text format header section."""
+
+        token = self.tok.get()
+        what = token.value
+        if what == "id":
+            self.id = self.tok.get_int()
+        elif what == "flags":
+            while True:
+                token = self.tok.get()
+                if not token.is_identifier():
+                    self.tok.unget(token)
+                    break
+                self.flags = self.flags | dns.flags.from_text(token.value)
+        elif what == "edns":
+            self.edns = self.tok.get_int()
+            self.ednsflags = self.ednsflags | (self.edns << 16)
+        elif what == "eflags":
+            if self.edns < 0:
+                self.edns = 0
+            while True:
+                token = self.tok.get()
+                if not token.is_identifier():
+                    self.tok.unget(token)
+                    break
+                self.ednsflags = self.ednsflags | dns.flags.edns_from_text(token.value)
+        elif what == "payload":
+            self.payload = self.tok.get_int()
+            if self.edns < 0:
+                self.edns = 0
+        elif what == "opcode":
+            text = self.tok.get_string()
+            self.opcode = dns.opcode.from_text(text)
+            self.flags = self.flags | dns.opcode.to_flags(self.opcode)
+        elif what == "rcode":
+            text = self.tok.get_string()
+            self.rcode = dns.rcode.from_text(text)
+        else:
+            raise UnknownHeaderField
+        self.tok.get_eol()
+
+    def _question_line(self, section_number):
+        """Process one line from the text format question section."""
+
+        section = self.message.sections[section_number]
+        token = self.tok.get(want_leading=True)
+        if not token.is_whitespace():
+            self.last_name = self.tok.as_name(
+                token, self.message.origin, self.relativize, self.relativize_to
+            )
+        name = self.last_name
+        if name is None:
+            raise NoPreviousName
+        token = self.tok.get()
+        if not token.is_identifier():
+            raise dns.exception.SyntaxError
+        # Class
+        try:
+            rdclass = dns.rdataclass.from_text(token.value)
+            token = self.tok.get()
+            if not token.is_identifier():
+                raise dns.exception.SyntaxError
+        except dns.exception.SyntaxError:
+            raise dns.exception.SyntaxError
+        except Exception:
+            rdclass = dns.rdataclass.IN
+        # Type
+        rdtype = dns.rdatatype.from_text(token.value)
+        (rdclass, rdtype, _, _) = self.message._parse_rr_header(
+            section_number, name, rdclass, rdtype
+        )
+        self.message.find_rrset(
+            section, name, rdclass, rdtype, create=True, force_unique=True
+        )
+        self.tok.get_eol()
+
+    def _rr_line(self, section_number):
+        """Process one line from the text format answer, authority, or
+        additional data sections.
+        """
+
+        section = self.message.sections[section_number]
+        # Name
+        token = self.tok.get(want_leading=True)
+        if not token.is_whitespace():
+            self.last_name = self.tok.as_name(
+                token, self.message.origin, self.relativize, self.relativize_to
+            )
+        name = self.last_name
+        if name is None:
+            raise NoPreviousName
+        token = self.tok.get()
+        if not token.is_identifier():
+            raise dns.exception.SyntaxError
+        # TTL
+        try:
+            ttl = int(token.value, 0)
+            token = self.tok.get()
+            if not token.is_identifier():
+                raise dns.exception.SyntaxError
+        except dns.exception.SyntaxError:
+            raise dns.exception.SyntaxError
+        except Exception:
+            ttl = 0
+        # Class
+        try:
+            rdclass = dns.rdataclass.from_text(token.value)
+            token = self.tok.get()
+            if not token.is_identifier():
+                raise dns.exception.SyntaxError
+        except dns.exception.SyntaxError:
+            raise dns.exception.SyntaxError
+        except Exception:
+            rdclass = dns.rdataclass.IN
+        # Type
+        rdtype = dns.rdatatype.from_text(token.value)
+        (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header(
+            section_number, name, rdclass, rdtype
+        )
+        token = self.tok.get()
+        if empty and not token.is_eol_or_eof():
+            raise dns.exception.SyntaxError
+        if not empty and token.is_eol_or_eof():
+            raise dns.exception.UnexpectedEnd
+        if not token.is_eol_or_eof():
+            self.tok.unget(token)
+            rd = dns.rdata.from_text(
+                rdclass,
+                rdtype,
+                self.tok,
+                self.message.origin,
+                self.relativize,
+                self.relativize_to,
+            )
+            covers = rd.covers()
+        else:
+            rd = None
+            covers = dns.rdatatype.NONE
+        rrset = self.message.find_rrset(
+            section,
+            name,
+            rdclass,
+            rdtype,
+            covers,
+            deleting,
+            True,
+            self.one_rr_per_rrset,
+        )
+        if rd is not None:
+            rrset.add(rd, ttl)
+
+    def _make_message(self):
+        factory = _message_factory_from_opcode(self.opcode)
+        message = factory(id=self.id)
+        message.flags = self.flags
+        if self.edns >= 0:
+            message.use_edns(self.edns, self.ednsflags, self.payload)
+        if self.rcode:
+            message.set_rcode(self.rcode)
+        if self.origin:
+            message.origin = self.origin
+        return message
+
+    def read(self):
+        """Read a text format DNS message and build a dns.message.Message
+        object."""
+
+        line_method = self._header_line
+        section_number = None
+        while 1:
+            token = self.tok.get(True, True)
+            if token.is_eol_or_eof():
+                break
+            if token.is_comment():
+                u = token.value.upper()
+                if u == "HEADER":
+                    line_method = self._header_line
+
+                if self.message:
+                    message = self.message
+                else:
+                    # If we don't have a message, create one with the current
+                    # opcode, so that we know which section names to parse.
+                    message = self._make_message()
+                try:
+                    section_number = message._section_enum.from_text(u)
+                    # We found a section name.  If we don't have a message,
+                    # use the one we just created.
+                    if not self.message:
+                        self.message = message
+                        self.one_rr_per_rrset = message._get_one_rr_per_rrset(
+                            self.one_rr_per_rrset
+                        )
+                    if section_number == MessageSection.QUESTION:
+                        line_method = self._question_line
+                    else:
+                        line_method = self._rr_line
+                except Exception:
+                    # It's just a comment.
+                    pass
+                self.tok.get_eol()
+                continue
+            self.tok.unget(token)
+            line_method(section_number)
+        if not self.message:
+            self.message = self._make_message()
+        return self.message
+
+
+def from_text(
+    text: str,
+    idna_codec: Optional[dns.name.IDNACodec] = None,
+    one_rr_per_rrset: bool = False,
+    origin: Optional[dns.name.Name] = None,
+    relativize: bool = True,
+    relativize_to: Optional[dns.name.Name] = None,
+) -> Message:
+    """Convert the text format message into a message object.
+
+    The reader stops after reading the first blank line in the input to
+    facilitate reading multiple messages from a single file with
+    ``dns.message.from_file()``.
+
+    *text*, a ``str``, the text format message.
+
+    *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
+    encoder/decoder.  If ``None``, the default IDNA 2003 encoder/decoder
+    is used.
+
+    *one_rr_per_rrset*, a ``bool``.  If ``True``, then each RR is put
+    into its own rrset.  The default is ``False``.
+
+    *origin*, a ``dns.name.Name`` (or ``None``), the
+    origin to use for relative names.
+
+    *relativize*, a ``bool``.  If true, name will be relativized.
+
+    *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use
+    when relativizing names.  If not set, the *origin* value will be used.
+
+    Raises ``dns.message.UnknownHeaderField`` if a header is unknown.
+
+    Raises ``dns.exception.SyntaxError`` if the text is badly formed.
+
+    Returns a ``dns.message.Message object``
+    """
+
+    # 'text' can also be a file, but we don't publish that fact
+    # since it's an implementation detail.  The official file
+    # interface is from_file().
+
+    reader = _TextReader(
+        text, idna_codec, one_rr_per_rrset, origin, relativize, relativize_to
+    )
+    return reader.read()
+
+
+def from_file(
+    f: Any,
+    idna_codec: Optional[dns.name.IDNACodec] = None,
+    one_rr_per_rrset: bool = False,
+) -> Message:
+    """Read the next text format message from the specified file.
+
+    Message blocks are separated by a single blank line.
+
+    *f*, a ``file`` or ``str``.  If *f* is text, it is treated as the
+    pathname of a file to open.
+
+    *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
+    encoder/decoder.  If ``None``, the default IDNA 2003 encoder/decoder
+    is used.
+
+    *one_rr_per_rrset*, a ``bool``.  If ``True``, then each RR is put
+    into its own rrset.  The default is ``False``.
+
+    Raises ``dns.message.UnknownHeaderField`` if a header is unknown.
+
+    Raises ``dns.exception.SyntaxError`` if the text is badly formed.
+
+    Returns a ``dns.message.Message object``
+    """
+
+    if isinstance(f, str):
+        cm: contextlib.AbstractContextManager = open(f)
+    else:
+        cm = contextlib.nullcontext(f)
+    with cm as f:
+        return from_text(f, idna_codec, one_rr_per_rrset)
+    assert False  # for mypy  lgtm[py/unreachable-statement]
+
+
+def make_query(
+    qname: Union[dns.name.Name, str],
+    rdtype: Union[dns.rdatatype.RdataType, str],
+    rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+    use_edns: Optional[Union[int, bool]] = None,
+    want_dnssec: bool = False,
+    ednsflags: Optional[int] = None,
+    payload: Optional[int] = None,
+    request_payload: Optional[int] = None,
+    options: Optional[List[dns.edns.Option]] = None,
+    idna_codec: Optional[dns.name.IDNACodec] = None,
+    id: Optional[int] = None,
+    flags: int = dns.flags.RD,
+    pad: int = 0,
+) -> QueryMessage:
+    """Make a query message.
+
+    The query name, type, and class may all be specified either
+    as objects of the appropriate type, or as strings.
+
+    The query will have a randomly chosen query id, and its DNS flags
+    will be set to dns.flags.RD.
+
+    qname, a ``dns.name.Name`` or ``str``, the query name.
+
+    *rdtype*, an ``int`` or ``str``, the desired rdata type.
+
+    *rdclass*, an ``int`` or ``str``,  the desired rdata class; the default
+    is class IN.
+
+    *use_edns*, an ``int``, ``bool`` or ``None``.  The EDNS level to use; the
+    default is ``None``.  If ``None``, EDNS will be enabled only if other
+    parameters (*ednsflags*, *payload*, *request_payload*, or *options*) are
+    set.
+    See the description of dns.message.Message.use_edns() for the possible
+    values for use_edns and their meanings.
+
+    *want_dnssec*, a ``bool``.  If ``True``, DNSSEC data is desired.
+
+    *ednsflags*, an ``int``, the EDNS flag values.
+
+    *payload*, an ``int``, is the EDNS sender's payload field, which is the
+    maximum size of UDP datagram the sender can handle.  I.e. how big
+    a response to this message can be.
+
+    *request_payload*, an ``int``, is the EDNS payload size to use when
+    sending this message.  If not specified, defaults to the value of
+    *payload*.
+
+    *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS
+    options.
+
+    *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
+    encoder/decoder.  If ``None``, the default IDNA 2003 encoder/decoder
+    is used.
+
+    *id*, an ``int`` or ``None``, the desired query id.  The default is
+    ``None``, which generates a random query id.
+
+    *flags*, an ``int``, the desired query flags.  The default is
+    ``dns.flags.RD``.
+
+    *pad*, a non-negative ``int``.  If 0, the default, do not pad; otherwise add
+    padding bytes to make the message size a multiple of *pad*.  Note that if
+    padding is non-zero, an EDNS PADDING option will always be added to the
+    message.
+
+    Returns a ``dns.message.QueryMessage``
+    """
+
+    if isinstance(qname, str):
+        qname = dns.name.from_text(qname, idna_codec=idna_codec)
+    rdtype = dns.rdatatype.RdataType.make(rdtype)
+    rdclass = dns.rdataclass.RdataClass.make(rdclass)
+    m = QueryMessage(id=id)
+    m.flags = dns.flags.Flag(flags)
+    m.find_rrset(m.question, qname, rdclass, rdtype, create=True, force_unique=True)
+    # only pass keywords on to use_edns if they have been set to a
+    # non-None value.  Setting a field will turn EDNS on if it hasn't
+    # been configured.
+    kwargs: Dict[str, Any] = {}
+    if ednsflags is not None:
+        kwargs["ednsflags"] = ednsflags
+    if payload is not None:
+        kwargs["payload"] = payload
+    if request_payload is not None:
+        kwargs["request_payload"] = request_payload
+    if options is not None:
+        kwargs["options"] = options
+    if kwargs and use_edns is None:
+        use_edns = 0
+    kwargs["edns"] = use_edns
+    kwargs["pad"] = pad
+    m.use_edns(**kwargs)
+    m.want_dnssec(want_dnssec)
+    return m
+
+
+class CopyMode(enum.Enum):
+    """
+    How should sections be copied when making an update response?
+    """
+
+    NOTHING = 0
+    QUESTION = 1
+    EVERYTHING = 2
+
+
+def make_response(
+    query: Message,
+    recursion_available: bool = False,
+    our_payload: int = 8192,
+    fudge: int = 300,
+    tsig_error: int = 0,
+    pad: Optional[int] = None,
+    copy_mode: Optional[CopyMode] = None,
+) -> Message:
+    """Make a message which is a response for the specified query.
+    The message returned is really a response skeleton; it has all of the infrastructure
+    required of a response, but none of the content.
+
+    Response section(s) which are copied are shallow copies of the matching section(s)
+    in the query, so the query's RRsets should not be changed.
+
+    *query*, a ``dns.message.Message``, the query to respond to.
+
+    *recursion_available*, a ``bool``, should RA be set in the response?
+
+    *our_payload*, an ``int``, the payload size to advertise in EDNS responses.
+
+    *fudge*, an ``int``, the TSIG time fudge.
+
+    *tsig_error*, an ``int``, the TSIG error.
+
+    *pad*, a non-negative ``int`` or ``None``.  If 0, the default, do not pad; otherwise
+    if not ``None`` add padding bytes to make the message size a multiple of *pad*. Note
+    that if padding is non-zero, an EDNS PADDING option will always be added to the
+    message.  If ``None``, add padding following RFC 8467, namely if the request is
+    padded, pad the response to 468 otherwise do not pad.
+
+    *copy_mode*, a ``dns.message.CopyMode`` or ``None``, determines how sections are
+    copied.  The default, ``None`` copies sections according to the default for the
+    message's opcode, which is currently ``dns.message.CopyMode.QUESTION`` for all
+    opcodes.   ``dns.message.CopyMode.QUESTION`` copies only the question section.
+    ``dns.message.CopyMode.EVERYTHING`` copies all sections other than OPT or TSIG
+    records, which are created appropriately if needed. ``dns.message.CopyMode.NOTHING``
+    copies no sections; note that this mode is for server testing purposes and is
+    otherwise not recommended for use.  In particular, ``dns.message.is_response()``
+    will be ``False`` if you create a response this way and the rcode is not
+    ``FORMERR``, ``SERVFAIL``, ``NOTIMP``, or ``REFUSED``.
+
+    Returns a ``dns.message.Message`` object whose specific class is appropriate for the
+    query.  For example, if query is a ``dns.update.UpdateMessage``, the response will
+    be one too.
+    """
+
+    if query.flags & dns.flags.QR:
+        raise dns.exception.FormError("specified query message is not a query")
+    opcode = query.opcode()
+    factory = _message_factory_from_opcode(opcode)
+    response = factory(id=query.id)
+    response.flags = dns.flags.QR | (query.flags & dns.flags.RD)
+    if recursion_available:
+        response.flags |= dns.flags.RA
+    response.set_opcode(opcode)
+    if copy_mode is None:
+        copy_mode = CopyMode.QUESTION
+    if copy_mode != CopyMode.NOTHING:
+        response.question = list(query.question)
+    if copy_mode == CopyMode.EVERYTHING:
+        response.answer = list(query.answer)
+        response.authority = list(query.authority)
+        response.additional = list(query.additional)
+    if query.edns >= 0:
+        if pad is None:
+            # Set response padding per RFC 8467
+            pad = 0
+            for option in query.options:
+                if option.otype == dns.edns.OptionType.PADDING:
+                    pad = 468
+        response.use_edns(0, 0, our_payload, query.payload, pad=pad)
+    if query.had_tsig:
+        response.use_tsig(
+            query.keyring,
+            query.keyname,
+            fudge,
+            None,
+            tsig_error,
+            b"",
+            query.keyalgorithm,
+        )
+        response.request_mac = query.mac
+    return response
+
+
+### BEGIN generated MessageSection constants
+
+QUESTION = MessageSection.QUESTION
+ANSWER = MessageSection.ANSWER
+AUTHORITY = MessageSection.AUTHORITY
+ADDITIONAL = MessageSection.ADDITIONAL
+
+### END generated MessageSection constants