about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/dns/zonefile.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/dns/zonefile.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/zonefile.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/zonefile.py744
1 files changed, 744 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/zonefile.py b/.venv/lib/python3.12/site-packages/dns/zonefile.py
new file mode 100644
index 00000000..d74510b2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/zonefile.py
@@ -0,0 +1,744 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose with or without fee is hereby granted,
+# provided that the above copyright notice and this permission notice
+# appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""DNS Zones."""
+
+import re
+import sys
+from typing import Any, Iterable, List, Optional, Set, Tuple, Union
+
+import dns.exception
+import dns.grange
+import dns.name
+import dns.node
+import dns.rdata
+import dns.rdataclass
+import dns.rdatatype
+import dns.rdtypes.ANY.SOA
+import dns.rrset
+import dns.tokenizer
+import dns.transaction
+import dns.ttl
+
+
+class UnknownOrigin(dns.exception.DNSException):
+    """Unknown origin"""
+
+
+class CNAMEAndOtherData(dns.exception.DNSException):
+    """A node has a CNAME and other data"""
+
+
+def _check_cname_and_other_data(txn, name, rdataset):
+    rdataset_kind = dns.node.NodeKind.classify_rdataset(rdataset)
+    node = txn.get_node(name)
+    if node is None:
+        # empty nodes are neutral.
+        return
+    node_kind = node.classify()
+    if (
+        node_kind == dns.node.NodeKind.CNAME
+        and rdataset_kind == dns.node.NodeKind.REGULAR
+    ):
+        raise CNAMEAndOtherData("rdataset type is not compatible with a CNAME node")
+    elif (
+        node_kind == dns.node.NodeKind.REGULAR
+        and rdataset_kind == dns.node.NodeKind.CNAME
+    ):
+        raise CNAMEAndOtherData(
+            "CNAME rdataset is not compatible with a regular data node"
+        )
+    # Otherwise at least one of the node and the rdataset is neutral, so
+    # adding the rdataset is ok
+
+
+SavedStateType = Tuple[
+    dns.tokenizer.Tokenizer,
+    Optional[dns.name.Name],  # current_origin
+    Optional[dns.name.Name],  # last_name
+    Optional[Any],  # current_file
+    int,  # last_ttl
+    bool,  # last_ttl_known
+    int,  # default_ttl
+    bool,
+]  # default_ttl_known
+
+
+def _upper_dollarize(s):
+    s = s.upper()
+    if not s.startswith("$"):
+        s = "$" + s
+    return s
+
+
+class Reader:
+    """Read a DNS zone file into a transaction."""
+
+    def __init__(
+        self,
+        tok: dns.tokenizer.Tokenizer,
+        rdclass: dns.rdataclass.RdataClass,
+        txn: dns.transaction.Transaction,
+        allow_include: bool = False,
+        allow_directives: Union[bool, Iterable[str]] = True,
+        force_name: Optional[dns.name.Name] = None,
+        force_ttl: Optional[int] = None,
+        force_rdclass: Optional[dns.rdataclass.RdataClass] = None,
+        force_rdtype: Optional[dns.rdatatype.RdataType] = None,
+        default_ttl: Optional[int] = None,
+    ):
+        self.tok = tok
+        (self.zone_origin, self.relativize, _) = txn.manager.origin_information()
+        self.current_origin = self.zone_origin
+        self.last_ttl = 0
+        self.last_ttl_known = False
+        if force_ttl is not None:
+            default_ttl = force_ttl
+        if default_ttl is None:
+            self.default_ttl = 0
+            self.default_ttl_known = False
+        else:
+            self.default_ttl = default_ttl
+            self.default_ttl_known = True
+        self.last_name = self.current_origin
+        self.zone_rdclass = rdclass
+        self.txn = txn
+        self.saved_state: List[SavedStateType] = []
+        self.current_file: Optional[Any] = None
+        self.allowed_directives: Set[str]
+        if allow_directives is True:
+            self.allowed_directives = {"$GENERATE", "$ORIGIN", "$TTL"}
+            if allow_include:
+                self.allowed_directives.add("$INCLUDE")
+        elif allow_directives is False:
+            # allow_include was ignored in earlier releases if allow_directives was
+            # False, so we continue that.
+            self.allowed_directives = set()
+        else:
+            # Note that if directives are explicitly specified, then allow_include
+            # is ignored.
+            self.allowed_directives = set(_upper_dollarize(d) for d in allow_directives)
+        self.force_name = force_name
+        self.force_ttl = force_ttl
+        self.force_rdclass = force_rdclass
+        self.force_rdtype = force_rdtype
+        self.txn.check_put_rdataset(_check_cname_and_other_data)
+
+    def _eat_line(self):
+        while 1:
+            token = self.tok.get()
+            if token.is_eol_or_eof():
+                break
+
+    def _get_identifier(self):
+        token = self.tok.get()
+        if not token.is_identifier():
+            raise dns.exception.SyntaxError
+        return token
+
+    def _rr_line(self):
+        """Process one line from a DNS zone file."""
+        token = None
+        # Name
+        if self.force_name is not None:
+            name = self.force_name
+        else:
+            if self.current_origin is None:
+                raise UnknownOrigin
+            token = self.tok.get(want_leading=True)
+            if not token.is_whitespace():
+                self.last_name = self.tok.as_name(token, self.current_origin)
+            else:
+                token = self.tok.get()
+                if token.is_eol_or_eof():
+                    # treat leading WS followed by EOL/EOF as if they were EOL/EOF.
+                    return
+                self.tok.unget(token)
+            name = self.last_name
+            if not name.is_subdomain(self.zone_origin):
+                self._eat_line()
+                return
+            if self.relativize:
+                name = name.relativize(self.zone_origin)
+
+        # TTL
+        if self.force_ttl is not None:
+            ttl = self.force_ttl
+            self.last_ttl = ttl
+            self.last_ttl_known = True
+        else:
+            token = self._get_identifier()
+            ttl = None
+            try:
+                ttl = dns.ttl.from_text(token.value)
+                self.last_ttl = ttl
+                self.last_ttl_known = True
+                token = None
+            except dns.ttl.BadTTL:
+                self.tok.unget(token)
+
+        # Class
+        if self.force_rdclass is not None:
+            rdclass = self.force_rdclass
+        else:
+            token = self._get_identifier()
+            try:
+                rdclass = dns.rdataclass.from_text(token.value)
+            except dns.exception.SyntaxError:
+                raise
+            except Exception:
+                rdclass = self.zone_rdclass
+                self.tok.unget(token)
+            if rdclass != self.zone_rdclass:
+                raise dns.exception.SyntaxError("RR class is not zone's class")
+
+        if ttl is None:
+            # support for <class> <ttl> <type> syntax
+            token = self._get_identifier()
+            ttl = None
+            try:
+                ttl = dns.ttl.from_text(token.value)
+                self.last_ttl = ttl
+                self.last_ttl_known = True
+                token = None
+            except dns.ttl.BadTTL:
+                if self.default_ttl_known:
+                    ttl = self.default_ttl
+                elif self.last_ttl_known:
+                    ttl = self.last_ttl
+                self.tok.unget(token)
+
+        # Type
+        if self.force_rdtype is not None:
+            rdtype = self.force_rdtype
+        else:
+            token = self._get_identifier()
+            try:
+                rdtype = dns.rdatatype.from_text(token.value)
+            except Exception:
+                raise dns.exception.SyntaxError(f"unknown rdatatype '{token.value}'")
+
+        try:
+            rd = dns.rdata.from_text(
+                rdclass,
+                rdtype,
+                self.tok,
+                self.current_origin,
+                self.relativize,
+                self.zone_origin,
+            )
+        except dns.exception.SyntaxError:
+            # Catch and reraise.
+            raise
+        except Exception:
+            # All exceptions that occur in the processing of rdata
+            # are treated as syntax errors.  This is not strictly
+            # correct, but it is correct almost all of the time.
+            # We convert them to syntax errors so that we can emit
+            # helpful filename:line info.
+            (ty, va) = sys.exc_info()[:2]
+            raise dns.exception.SyntaxError(f"caught exception {str(ty)}: {str(va)}")
+
+        if not self.default_ttl_known and rdtype == dns.rdatatype.SOA:
+            # The pre-RFC2308 and pre-BIND9 behavior inherits the zone default
+            # TTL from the SOA minttl if no $TTL statement is present before the
+            # SOA is parsed.
+            self.default_ttl = rd.minimum
+            self.default_ttl_known = True
+            if ttl is None:
+                # if we didn't have a TTL on the SOA, set it!
+                ttl = rd.minimum
+
+        # TTL check.  We had to wait until now to do this as the SOA RR's
+        # own TTL can be inferred from its minimum.
+        if ttl is None:
+            raise dns.exception.SyntaxError("Missing default TTL value")
+
+        self.txn.add(name, ttl, rd)
+
+    def _parse_modify(self, side: str) -> Tuple[str, str, int, int, str]:
+        # Here we catch everything in '{' '}' in a group so we can replace it
+        # with ''.
+        is_generate1 = re.compile(r"^.*\$({(\+|-?)(\d+),(\d+),(.)}).*$")
+        is_generate2 = re.compile(r"^.*\$({(\+|-?)(\d+)}).*$")
+        is_generate3 = re.compile(r"^.*\$({(\+|-?)(\d+),(\d+)}).*$")
+        # Sometimes there are modifiers in the hostname. These come after
+        # the dollar sign. They are in the form: ${offset[,width[,base]]}.
+        # Make names
+        mod = ""
+        sign = "+"
+        offset = "0"
+        width = "0"
+        base = "d"
+        g1 = is_generate1.match(side)
+        if g1:
+            mod, sign, offset, width, base = g1.groups()
+            if sign == "":
+                sign = "+"
+        else:
+            g2 = is_generate2.match(side)
+            if g2:
+                mod, sign, offset = g2.groups()
+                if sign == "":
+                    sign = "+"
+                width = "0"
+                base = "d"
+            else:
+                g3 = is_generate3.match(side)
+                if g3:
+                    mod, sign, offset, width = g3.groups()
+                    if sign == "":
+                        sign = "+"
+                    base = "d"
+
+        ioffset = int(offset)
+        iwidth = int(width)
+
+        if sign not in ["+", "-"]:
+            raise dns.exception.SyntaxError(f"invalid offset sign {sign}")
+        if base not in ["d", "o", "x", "X", "n", "N"]:
+            raise dns.exception.SyntaxError(f"invalid type {base}")
+
+        return mod, sign, ioffset, iwidth, base
+
+    def _generate_line(self):
+        # range lhs [ttl] [class] type rhs [ comment ]
+        """Process one line containing the GENERATE statement from a DNS
+        zone file."""
+        if self.current_origin is None:
+            raise UnknownOrigin
+
+        token = self.tok.get()
+        # Range (required)
+        try:
+            start, stop, step = dns.grange.from_text(token.value)
+            token = self.tok.get()
+            if not token.is_identifier():
+                raise dns.exception.SyntaxError
+        except Exception:
+            raise dns.exception.SyntaxError
+
+        # lhs (required)
+        try:
+            lhs = token.value
+            token = self.tok.get()
+            if not token.is_identifier():
+                raise dns.exception.SyntaxError
+        except Exception:
+            raise dns.exception.SyntaxError
+
+        # TTL
+        try:
+            ttl = dns.ttl.from_text(token.value)
+            self.last_ttl = ttl
+            self.last_ttl_known = True
+            token = self.tok.get()
+            if not token.is_identifier():
+                raise dns.exception.SyntaxError
+        except dns.ttl.BadTTL:
+            if not (self.last_ttl_known or self.default_ttl_known):
+                raise dns.exception.SyntaxError("Missing default TTL value")
+            if self.default_ttl_known:
+                ttl = self.default_ttl
+            elif self.last_ttl_known:
+                ttl = self.last_ttl
+        # Class
+        try:
+            rdclass = dns.rdataclass.from_text(token.value)
+            token = self.tok.get()
+            if not token.is_identifier():
+                raise dns.exception.SyntaxError
+        except dns.exception.SyntaxError:
+            raise dns.exception.SyntaxError
+        except Exception:
+            rdclass = self.zone_rdclass
+        if rdclass != self.zone_rdclass:
+            raise dns.exception.SyntaxError("RR class is not zone's class")
+        # Type
+        try:
+            rdtype = dns.rdatatype.from_text(token.value)
+            token = self.tok.get()
+            if not token.is_identifier():
+                raise dns.exception.SyntaxError
+        except Exception:
+            raise dns.exception.SyntaxError(f"unknown rdatatype '{token.value}'")
+
+        # rhs (required)
+        rhs = token.value
+
+        def _calculate_index(counter: int, offset_sign: str, offset: int) -> int:
+            """Calculate the index from the counter and offset."""
+            if offset_sign == "-":
+                offset *= -1
+            return counter + offset
+
+        def _format_index(index: int, base: str, width: int) -> str:
+            """Format the index with the given base, and zero-fill it
+            to the given width."""
+            if base in ["d", "o", "x", "X"]:
+                return format(index, base).zfill(width)
+
+            # base can only be n or N here
+            hexa = _format_index(index, "x", width)
+            nibbles = ".".join(hexa[::-1])[:width]
+            if base == "N":
+                nibbles = nibbles.upper()
+            return nibbles
+
+        lmod, lsign, loffset, lwidth, lbase = self._parse_modify(lhs)
+        rmod, rsign, roffset, rwidth, rbase = self._parse_modify(rhs)
+        for i in range(start, stop + 1, step):
+            # +1 because bind is inclusive and python is exclusive
+
+            lindex = _calculate_index(i, lsign, loffset)
+            rindex = _calculate_index(i, rsign, roffset)
+
+            lzfindex = _format_index(lindex, lbase, lwidth)
+            rzfindex = _format_index(rindex, rbase, rwidth)
+
+            name = lhs.replace(f"${lmod}", lzfindex)
+            rdata = rhs.replace(f"${rmod}", rzfindex)
+
+            self.last_name = dns.name.from_text(
+                name, self.current_origin, self.tok.idna_codec
+            )
+            name = self.last_name
+            if not name.is_subdomain(self.zone_origin):
+                self._eat_line()
+                return
+            if self.relativize:
+                name = name.relativize(self.zone_origin)
+
+            try:
+                rd = dns.rdata.from_text(
+                    rdclass,
+                    rdtype,
+                    rdata,
+                    self.current_origin,
+                    self.relativize,
+                    self.zone_origin,
+                )
+            except dns.exception.SyntaxError:
+                # Catch and reraise.
+                raise
+            except Exception:
+                # All exceptions that occur in the processing of rdata
+                # are treated as syntax errors.  This is not strictly
+                # correct, but it is correct almost all of the time.
+                # We convert them to syntax errors so that we can emit
+                # helpful filename:line info.
+                (ty, va) = sys.exc_info()[:2]
+                raise dns.exception.SyntaxError(
+                    f"caught exception {str(ty)}: {str(va)}"
+                )
+
+            self.txn.add(name, ttl, rd)
+
+    def read(self) -> None:
+        """Read a DNS zone file and build a zone object.
+
+        @raises dns.zone.NoSOA: No SOA RR was found at the zone origin
+        @raises dns.zone.NoNS: No NS RRset was found at the zone origin
+        """
+
+        try:
+            while 1:
+                token = self.tok.get(True, True)
+                if token.is_eof():
+                    if self.current_file is not None:
+                        self.current_file.close()
+                    if len(self.saved_state) > 0:
+                        (
+                            self.tok,
+                            self.current_origin,
+                            self.last_name,
+                            self.current_file,
+                            self.last_ttl,
+                            self.last_ttl_known,
+                            self.default_ttl,
+                            self.default_ttl_known,
+                        ) = self.saved_state.pop(-1)
+                        continue
+                    break
+                elif token.is_eol():
+                    continue
+                elif token.is_comment():
+                    self.tok.get_eol()
+                    continue
+                elif token.value[0] == "$" and len(self.allowed_directives) > 0:
+                    # Note that we only run directive processing code if at least
+                    # one directive is allowed in order to be backwards compatible
+                    c = token.value.upper()
+                    if c not in self.allowed_directives:
+                        raise dns.exception.SyntaxError(
+                            f"zone file directive '{c}' is not allowed"
+                        )
+                    if c == "$TTL":
+                        token = self.tok.get()
+                        if not token.is_identifier():
+                            raise dns.exception.SyntaxError("bad $TTL")
+                        self.default_ttl = dns.ttl.from_text(token.value)
+                        self.default_ttl_known = True
+                        self.tok.get_eol()
+                    elif c == "$ORIGIN":
+                        self.current_origin = self.tok.get_name()
+                        self.tok.get_eol()
+                        if self.zone_origin is None:
+                            self.zone_origin = self.current_origin
+                        self.txn._set_origin(self.current_origin)
+                    elif c == "$INCLUDE":
+                        token = self.tok.get()
+                        filename = token.value
+                        token = self.tok.get()
+                        new_origin: Optional[dns.name.Name]
+                        if token.is_identifier():
+                            new_origin = dns.name.from_text(
+                                token.value, self.current_origin, self.tok.idna_codec
+                            )
+                            self.tok.get_eol()
+                        elif not token.is_eol_or_eof():
+                            raise dns.exception.SyntaxError("bad origin in $INCLUDE")
+                        else:
+                            new_origin = self.current_origin
+                        self.saved_state.append(
+                            (
+                                self.tok,
+                                self.current_origin,
+                                self.last_name,
+                                self.current_file,
+                                self.last_ttl,
+                                self.last_ttl_known,
+                                self.default_ttl,
+                                self.default_ttl_known,
+                            )
+                        )
+                        self.current_file = open(filename)
+                        self.tok = dns.tokenizer.Tokenizer(self.current_file, filename)
+                        self.current_origin = new_origin
+                    elif c == "$GENERATE":
+                        self._generate_line()
+                    else:
+                        raise dns.exception.SyntaxError(
+                            f"Unknown zone file directive '{c}'"
+                        )
+                    continue
+                self.tok.unget(token)
+                self._rr_line()
+        except dns.exception.SyntaxError as detail:
+            (filename, line_number) = self.tok.where()
+            if detail is None:
+                detail = "syntax error"
+            ex = dns.exception.SyntaxError(
+                "%s:%d: %s" % (filename, line_number, detail)
+            )
+            tb = sys.exc_info()[2]
+            raise ex.with_traceback(tb) from None
+
+
+class RRsetsReaderTransaction(dns.transaction.Transaction):
+    def __init__(self, manager, replacement, read_only):
+        assert not read_only
+        super().__init__(manager, replacement, read_only)
+        self.rdatasets = {}
+
+    def _get_rdataset(self, name, rdtype, covers):
+        return self.rdatasets.get((name, rdtype, covers))
+
+    def _get_node(self, name):
+        rdatasets = []
+        for (rdataset_name, _, _), rdataset in self.rdatasets.items():
+            if name == rdataset_name:
+                rdatasets.append(rdataset)
+        if len(rdatasets) == 0:
+            return None
+        node = dns.node.Node()
+        node.rdatasets = rdatasets
+        return node
+
+    def _put_rdataset(self, name, rdataset):
+        self.rdatasets[(name, rdataset.rdtype, rdataset.covers)] = rdataset
+
+    def _delete_name(self, name):
+        # First remove any changes involving the name
+        remove = []
+        for key in self.rdatasets:
+            if key[0] == name:
+                remove.append(key)
+        if len(remove) > 0:
+            for key in remove:
+                del self.rdatasets[key]
+
+    def _delete_rdataset(self, name, rdtype, covers):
+        try:
+            del self.rdatasets[(name, rdtype, covers)]
+        except KeyError:
+            pass
+
+    def _name_exists(self, name):
+        for n, _, _ in self.rdatasets:
+            if n == name:
+                return True
+        return False
+
+    def _changed(self):
+        return len(self.rdatasets) > 0
+
+    def _end_transaction(self, commit):
+        if commit and self._changed():
+            rrsets = []
+            for (name, _, _), rdataset in self.rdatasets.items():
+                rrset = dns.rrset.RRset(
+                    name, rdataset.rdclass, rdataset.rdtype, rdataset.covers
+                )
+                rrset.update(rdataset)
+                rrsets.append(rrset)
+            self.manager.set_rrsets(rrsets)
+
+    def _set_origin(self, origin):
+        pass
+
+    def _iterate_rdatasets(self):
+        raise NotImplementedError  # pragma: no cover
+
+    def _iterate_names(self):
+        raise NotImplementedError  # pragma: no cover
+
+
+class RRSetsReaderManager(dns.transaction.TransactionManager):
+    def __init__(
+        self, origin=dns.name.root, relativize=False, rdclass=dns.rdataclass.IN
+    ):
+        self.origin = origin
+        self.relativize = relativize
+        self.rdclass = rdclass
+        self.rrsets = []
+
+    def reader(self):  # pragma: no cover
+        raise NotImplementedError
+
+    def writer(self, replacement=False):
+        assert replacement is True
+        return RRsetsReaderTransaction(self, True, False)
+
+    def get_class(self):
+        return self.rdclass
+
+    def origin_information(self):
+        if self.relativize:
+            effective = dns.name.empty
+        else:
+            effective = self.origin
+        return (self.origin, self.relativize, effective)
+
+    def set_rrsets(self, rrsets):
+        self.rrsets = rrsets
+
+
+def read_rrsets(
+    text: Any,
+    name: Optional[Union[dns.name.Name, str]] = None,
+    ttl: Optional[int] = None,
+    rdclass: Optional[Union[dns.rdataclass.RdataClass, str]] = dns.rdataclass.IN,
+    default_rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
+    rdtype: Optional[Union[dns.rdatatype.RdataType, str]] = None,
+    default_ttl: Optional[Union[int, str]] = None,
+    idna_codec: Optional[dns.name.IDNACodec] = None,
+    origin: Optional[Union[dns.name.Name, str]] = dns.name.root,
+    relativize: bool = False,
+) -> List[dns.rrset.RRset]:
+    """Read one or more rrsets from the specified text, possibly subject
+    to restrictions.
+
+    *text*, a file object or a string, is the input to process.
+
+    *name*, a string, ``dns.name.Name``, or ``None``, is the owner name of
+    the rrset.  If not ``None``, then the owner name is "forced", and the
+    input must not specify an owner name.  If ``None``, then any owner names
+    are allowed and must be present in the input.
+
+    *ttl*, an ``int``, string, or None.  If not ``None``, the the TTL is
+    forced to be the specified value and the input must not specify a TTL.
+    If ``None``, then a TTL may be specified in the input.  If it is not
+    specified, then the *default_ttl* will be used.
+
+    *rdclass*, a ``dns.rdataclass.RdataClass``, string, or ``None``.  If
+    not ``None``, then the class is forced to the specified value, and the
+    input must not specify a class.  If ``None``, then the input may specify
+    a class that matches *default_rdclass*.  Note that it is not possible to
+    return rrsets with differing classes; specifying ``None`` for the class
+    simply allows the user to optionally type a class as that may be convenient
+    when cutting and pasting.
+
+    *default_rdclass*, a ``dns.rdataclass.RdataClass`` or string.  The class
+    of the returned rrsets.
+
+    *rdtype*, a ``dns.rdatatype.RdataType``, string, or ``None``.  If not
+    ``None``, then the type is forced to the specified value, and the
+    input must not specify a type.  If ``None``, then a type must be present
+    for each RR.
+
+    *default_ttl*, an ``int``, string, or ``None``.  If not ``None``, then if
+    the TTL is not forced and is not specified, then this value will be used.
+    if ``None``, then if the TTL is not forced an error will occur if the TTL
+    is not specified.
+
+    *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
+    encoder/decoder.  If ``None``, the default IDNA 2003 encoder/decoder
+    is used.  Note that codecs only apply to the owner name; dnspython does
+    not do IDNA for names in rdata, as there is no IDNA zonefile format.
+
+    *origin*, a string, ``dns.name.Name``, or ``None``, is the origin for any
+    relative names in the input, and also the origin to relativize to if
+    *relativize* is ``True``.
+
+    *relativize*, a bool.  If ``True``, names are relativized to the *origin*;
+    if ``False`` then any relative names in the input are made absolute by
+    appending the *origin*.
+    """
+    if isinstance(origin, str):
+        origin = dns.name.from_text(origin, dns.name.root, idna_codec)
+    if isinstance(name, str):
+        name = dns.name.from_text(name, origin, idna_codec)
+    if isinstance(ttl, str):
+        ttl = dns.ttl.from_text(ttl)
+    if isinstance(default_ttl, str):
+        default_ttl = dns.ttl.from_text(default_ttl)
+    if rdclass is not None:
+        rdclass = dns.rdataclass.RdataClass.make(rdclass)
+    else:
+        rdclass = None
+    default_rdclass = dns.rdataclass.RdataClass.make(default_rdclass)
+    if rdtype is not None:
+        rdtype = dns.rdatatype.RdataType.make(rdtype)
+    else:
+        rdtype = None
+    manager = RRSetsReaderManager(origin, relativize, default_rdclass)
+    with manager.writer(True) as txn:
+        tok = dns.tokenizer.Tokenizer(text, "<input>", idna_codec=idna_codec)
+        reader = Reader(
+            tok,
+            default_rdclass,
+            txn,
+            allow_directives=False,
+            force_name=name,
+            force_ttl=ttl,
+            force_rdclass=rdclass,
+            force_rdtype=rdtype,
+            default_ttl=default_ttl,
+        )
+        reader.read()
+    return manager.rrsets