aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/dns/zone.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/zone.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/zone.py1434
1 files changed, 1434 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/zone.py b/.venv/lib/python3.12/site-packages/dns/zone.py
new file mode 100644
index 00000000..844919e4
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/zone.py
@@ -0,0 +1,1434 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose with or without fee is hereby granted,
+# provided that the above copyright notice and this permission notice
+# appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""DNS Zones."""
+
+import contextlib
+import io
+import os
+import struct
+from typing import (
+ Any,
+ Callable,
+ Iterable,
+ Iterator,
+ List,
+ MutableMapping,
+ Optional,
+ Set,
+ Tuple,
+ Union,
+)
+
+import dns.exception
+import dns.grange
+import dns.immutable
+import dns.name
+import dns.node
+import dns.rdata
+import dns.rdataclass
+import dns.rdataset
+import dns.rdatatype
+import dns.rdtypes.ANY.SOA
+import dns.rdtypes.ANY.ZONEMD
+import dns.rrset
+import dns.tokenizer
+import dns.transaction
+import dns.ttl
+import dns.zonefile
+from dns.zonetypes import DigestHashAlgorithm, DigestScheme, _digest_hashers
+
+
+class BadZone(dns.exception.DNSException):
+ """The DNS zone is malformed."""
+
+
+class NoSOA(BadZone):
+ """The DNS zone has no SOA RR at its origin."""
+
+
+class NoNS(BadZone):
+ """The DNS zone has no NS RRset at its origin."""
+
+
+class UnknownOrigin(BadZone):
+ """The DNS zone's origin is unknown."""
+
+
+class UnsupportedDigestScheme(dns.exception.DNSException):
+ """The zone digest's scheme is unsupported."""
+
+
+class UnsupportedDigestHashAlgorithm(dns.exception.DNSException):
+ """The zone digest's origin is unsupported."""
+
+
+class NoDigest(dns.exception.DNSException):
+ """The DNS zone has no ZONEMD RRset at its origin."""
+
+
+class DigestVerificationFailure(dns.exception.DNSException):
+ """The ZONEMD digest failed to verify."""
+
+
+def _validate_name(
+ name: dns.name.Name,
+ origin: Optional[dns.name.Name],
+ relativize: bool,
+) -> dns.name.Name:
+ # This name validation code is shared by Zone and Version
+ if origin is None:
+ # This should probably never happen as other code (e.g.
+ # _rr_line) will notice the lack of an origin before us, but
+ # we check just in case!
+ raise KeyError("no zone origin is defined")
+ if name.is_absolute():
+ if not name.is_subdomain(origin):
+ raise KeyError("name parameter must be a subdomain of the zone origin")
+ if relativize:
+ name = name.relativize(origin)
+ else:
+ # We have a relative name. Make sure that the derelativized name is
+ # not too long.
+ try:
+ abs_name = name.derelativize(origin)
+ except dns.name.NameTooLong:
+ # We map dns.name.NameTooLong to KeyError to be consistent with
+ # the other exceptions above.
+ raise KeyError("relative name too long for zone")
+ if not relativize:
+ # We have a relative name in a non-relative zone, so use the
+ # derelativized name.
+ name = abs_name
+ return name
+
+
+class Zone(dns.transaction.TransactionManager):
+ """A DNS zone.
+
+ A ``Zone`` is a mapping from names to nodes. The zone object may be
+ treated like a Python dictionary, e.g. ``zone[name]`` will retrieve
+ the node associated with that name. The *name* may be a
+ ``dns.name.Name object``, or it may be a string. In either case,
+ if the name is relative it is treated as relative to the origin of
+ the zone.
+ """
+
+ node_factory: Callable[[], dns.node.Node] = dns.node.Node
+ map_factory: Callable[[], MutableMapping[dns.name.Name, dns.node.Node]] = dict
+ writable_version_factory: Optional[Callable[[], "WritableVersion"]] = None
+ immutable_version_factory: Optional[Callable[[], "ImmutableVersion"]] = None
+
+ __slots__ = ["rdclass", "origin", "nodes", "relativize"]
+
+ def __init__(
+ self,
+ origin: Optional[Union[dns.name.Name, str]],
+ rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
+ relativize: bool = True,
+ ):
+ """Initialize a zone object.
+
+ *origin* is the origin of the zone. It may be a ``dns.name.Name``,
+ a ``str``, or ``None``. If ``None``, then the zone's origin will
+ be set by the first ``$ORIGIN`` line in a zone file.
+
+ *rdclass*, an ``int``, the zone's rdata class; the default is class IN.
+
+ *relativize*, a ``bool``, determine's whether domain names are
+ relativized to the zone's origin. The default is ``True``.
+ """
+
+ if origin is not None:
+ if isinstance(origin, str):
+ origin = dns.name.from_text(origin)
+ elif not isinstance(origin, dns.name.Name):
+ raise ValueError("origin parameter must be convertible to a DNS name")
+ if not origin.is_absolute():
+ raise ValueError("origin parameter must be an absolute name")
+ self.origin = origin
+ self.rdclass = rdclass
+ self.nodes: MutableMapping[dns.name.Name, dns.node.Node] = self.map_factory()
+ self.relativize = relativize
+
+ def __eq__(self, other):
+ """Two zones are equal if they have the same origin, class, and
+ nodes.
+
+ Returns a ``bool``.
+ """
+
+ if not isinstance(other, Zone):
+ return False
+ if (
+ self.rdclass != other.rdclass
+ or self.origin != other.origin
+ or self.nodes != other.nodes
+ ):
+ return False
+ return True
+
+ def __ne__(self, other):
+ """Are two zones not equal?
+
+ Returns a ``bool``.
+ """
+
+ return not self.__eq__(other)
+
+ def _validate_name(self, name: Union[dns.name.Name, str]) -> dns.name.Name:
+ # Note that any changes in this method should have corresponding changes
+ # made in the Version _validate_name() method.
+ if isinstance(name, str):
+ name = dns.name.from_text(name, None)
+ elif not isinstance(name, dns.name.Name):
+ raise KeyError("name parameter must be convertible to a DNS name")
+ return _validate_name(name, self.origin, self.relativize)
+
+ def __getitem__(self, key):
+ key = self._validate_name(key)
+ return self.nodes[key]
+
+ def __setitem__(self, key, value):
+ key = self._validate_name(key)
+ self.nodes[key] = value
+
+ def __delitem__(self, key):
+ key = self._validate_name(key)
+ del self.nodes[key]
+
+ def __iter__(self):
+ return self.nodes.__iter__()
+
+ def keys(self):
+ return self.nodes.keys()
+
+ def values(self):
+ return self.nodes.values()
+
+ def items(self):
+ return self.nodes.items()
+
+ def get(self, key):
+ key = self._validate_name(key)
+ return self.nodes.get(key)
+
+ def __contains__(self, key):
+ key = self._validate_name(key)
+ return key in self.nodes
+
+ def find_node(
+ self, name: Union[dns.name.Name, str], create: bool = False
+ ) -> dns.node.Node:
+ """Find a node in the zone, possibly creating it.
+
+ *name*: the name of the node to find.
+ The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
+ name must be a subdomain of the zone's origin. If ``zone.relativize``
+ is ``True``, then the name will be relativized.
+
+ *create*, a ``bool``. If true, the node will be created if it does
+ not exist.
+
+ Raises ``KeyError`` if the name is not known and create was
+ not specified, or if the name was not a subdomain of the origin.
+
+ Returns a ``dns.node.Node``.
+ """
+
+ name = self._validate_name(name)
+ node = self.nodes.get(name)
+ if node is None:
+ if not create:
+ raise KeyError
+ node = self.node_factory()
+ self.nodes[name] = node
+ return node
+
+ def get_node(
+ self, name: Union[dns.name.Name, str], create: bool = False
+ ) -> Optional[dns.node.Node]:
+ """Get a node in the zone, possibly creating it.
+
+ This method is like ``find_node()``, except it returns None instead
+ of raising an exception if the node does not exist and creation
+ has not been requested.
+
+ *name*: the name of the node to find.
+ The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
+ name must be a subdomain of the zone's origin. If ``zone.relativize``
+ is ``True``, then the name will be relativized.
+
+ *create*, a ``bool``. If true, the node will be created if it does
+ not exist.
+
+ Returns a ``dns.node.Node`` or ``None``.
+ """
+
+ try:
+ node = self.find_node(name, create)
+ except KeyError:
+ node = None
+ return node
+
+ def delete_node(self, name: Union[dns.name.Name, str]) -> None:
+ """Delete the specified node if it exists.
+
+ *name*: the name of the node to find.
+ The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
+ name must be a subdomain of the zone's origin. If ``zone.relativize``
+ is ``True``, then the name will be relativized.
+
+ It is not an error if the node does not exist.
+ """
+
+ name = self._validate_name(name)
+ if name in self.nodes:
+ del self.nodes[name]
+
+ def find_rdataset(
+ self,
+ name: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ create: bool = False,
+ ) -> dns.rdataset.Rdataset:
+ """Look for an rdataset with the specified name and type in the zone,
+ and return an rdataset encapsulating it.
+
+ The rdataset returned is not a copy; changes to it will change
+ the zone.
+
+ KeyError is raised if the name or type are not found.
+
+ *name*: the name of the node to find.
+ The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
+ name must be a subdomain of the zone's origin. If ``zone.relativize``
+ is ``True``, then the name will be relativized.
+
+ *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdata type desired.
+
+ *covers*, a ``dns.rdatatype.RdataType`` or ``str`` the covered type.
+ Usually this value is ``dns.rdatatype.NONE``, but if the
+ rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
+ then the covers value will be the rdata type the SIG/RRSIG
+ covers. The library treats the SIG and RRSIG types as if they
+ were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
+ This makes RRSIGs much easier to work with than if RRSIGs
+ covering different rdata types were aggregated into a single
+ RRSIG rdataset.
+
+ *create*, a ``bool``. If true, the node will be created if it does
+ not exist.
+
+ Raises ``KeyError`` if the name is not known and create was
+ not specified, or if the name was not a subdomain of the origin.
+
+ Returns a ``dns.rdataset.Rdataset``.
+ """
+
+ name = self._validate_name(name)
+ rdtype = dns.rdatatype.RdataType.make(rdtype)
+ covers = dns.rdatatype.RdataType.make(covers)
+ node = self.find_node(name, create)
+ return node.find_rdataset(self.rdclass, rdtype, covers, create)
+
+ def get_rdataset(
+ self,
+ name: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ create: bool = False,
+ ) -> Optional[dns.rdataset.Rdataset]:
+ """Look for an rdataset with the specified name and type in the zone.
+
+ This method is like ``find_rdataset()``, except it returns None instead
+ of raising an exception if the rdataset does not exist and creation
+ has not been requested.
+
+ The rdataset returned is not a copy; changes to it will change
+ the zone.
+
+ *name*: the name of the node to find.
+ The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
+ name must be a subdomain of the zone's origin. If ``zone.relativize``
+ is ``True``, then the name will be relativized.
+
+ *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdata type desired.
+
+ *covers*, a ``dns.rdatatype.RdataType`` or ``str``, the covered type.
+ Usually this value is ``dns.rdatatype.NONE``, but if the
+ rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
+ then the covers value will be the rdata type the SIG/RRSIG
+ covers. The library treats the SIG and RRSIG types as if they
+ were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
+ This makes RRSIGs much easier to work with than if RRSIGs
+ covering different rdata types were aggregated into a single
+ RRSIG rdataset.
+
+ *create*, a ``bool``. If true, the node will be created if it does
+ not exist.
+
+ Raises ``KeyError`` if the name is not known and create was
+ not specified, or if the name was not a subdomain of the origin.
+
+ Returns a ``dns.rdataset.Rdataset`` or ``None``.
+ """
+
+ try:
+ rdataset = self.find_rdataset(name, rdtype, covers, create)
+ except KeyError:
+ rdataset = None
+ return rdataset
+
+ def delete_rdataset(
+ self,
+ name: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ ) -> None:
+ """Delete the rdataset matching *rdtype* and *covers*, if it
+ exists at the node specified by *name*.
+
+ It is not an error if the node does not exist, or if there is no matching
+ rdataset at the node.
+
+ If the node has no rdatasets after the deletion, it will itself be deleted.
+
+ *name*: the name of the node to find. The value may be a ``dns.name.Name`` or a
+ ``str``. If absolute, the name must be a subdomain of the zone's origin. If
+ ``zone.relativize`` is ``True``, then the name will be relativized.
+
+ *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdata type desired.
+
+ *covers*, a ``dns.rdatatype.RdataType`` or ``str`` or ``None``, the covered
+ type. Usually this value is ``dns.rdatatype.NONE``, but if the rdtype is
+ ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``, then the covers value will be
+ the rdata type the SIG/RRSIG covers. The library treats the SIG and RRSIG types
+ as if they were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA). This
+ makes RRSIGs much easier to work with than if RRSIGs covering different rdata
+ types were aggregated into a single RRSIG rdataset.
+ """
+
+ name = self._validate_name(name)
+ rdtype = dns.rdatatype.RdataType.make(rdtype)
+ covers = dns.rdatatype.RdataType.make(covers)
+ node = self.get_node(name)
+ if node is not None:
+ node.delete_rdataset(self.rdclass, rdtype, covers)
+ if len(node) == 0:
+ self.delete_node(name)
+
+ def replace_rdataset(
+ self, name: Union[dns.name.Name, str], replacement: dns.rdataset.Rdataset
+ ) -> None:
+ """Replace an rdataset at name.
+
+ It is not an error if there is no rdataset matching I{replacement}.
+
+ Ownership of the *replacement* object is transferred to the zone;
+ in other words, this method does not store a copy of *replacement*
+ at the node, it stores *replacement* itself.
+
+ If the node does not exist, it is created.
+
+ *name*: the name of the node to find.
+ The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
+ name must be a subdomain of the zone's origin. If ``zone.relativize``
+ is ``True``, then the name will be relativized.
+
+ *replacement*, a ``dns.rdataset.Rdataset``, the replacement rdataset.
+ """
+
+ if replacement.rdclass != self.rdclass:
+ raise ValueError("replacement.rdclass != zone.rdclass")
+ node = self.find_node(name, True)
+ node.replace_rdataset(replacement)
+
+ def find_rrset(
+ self,
+ name: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ ) -> dns.rrset.RRset:
+ """Look for an rdataset with the specified name and type in the zone,
+ and return an RRset encapsulating it.
+
+ This method is less efficient than the similar
+ ``find_rdataset()`` because it creates an RRset instead of
+ returning the matching rdataset. It may be more convenient
+ for some uses since it returns an object which binds the owner
+ name to the rdataset.
+
+ This method may not be used to create new nodes or rdatasets;
+ use ``find_rdataset`` instead.
+
+ *name*: the name of the node to find.
+ The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
+ name must be a subdomain of the zone's origin. If ``zone.relativize``
+ is ``True``, then the name will be relativized.
+
+ *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdata type desired.
+
+ *covers*, a ``dns.rdatatype.RdataType`` or ``str``, the covered type.
+ Usually this value is ``dns.rdatatype.NONE``, but if the
+ rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
+ then the covers value will be the rdata type the SIG/RRSIG
+ covers. The library treats the SIG and RRSIG types as if they
+ were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
+ This makes RRSIGs much easier to work with than if RRSIGs
+ covering different rdata types were aggregated into a single
+ RRSIG rdataset.
+
+ *create*, a ``bool``. If true, the node will be created if it does
+ not exist.
+
+ Raises ``KeyError`` if the name is not known and create was
+ not specified, or if the name was not a subdomain of the origin.
+
+ Returns a ``dns.rrset.RRset`` or ``None``.
+ """
+
+ vname = self._validate_name(name)
+ rdtype = dns.rdatatype.RdataType.make(rdtype)
+ covers = dns.rdatatype.RdataType.make(covers)
+ rdataset = self.nodes[vname].find_rdataset(self.rdclass, rdtype, covers)
+ rrset = dns.rrset.RRset(vname, self.rdclass, rdtype, covers)
+ rrset.update(rdataset)
+ return rrset
+
+ def get_rrset(
+ self,
+ name: Union[dns.name.Name, str],
+ rdtype: Union[dns.rdatatype.RdataType, str],
+ covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ ) -> Optional[dns.rrset.RRset]:
+ """Look for an rdataset with the specified name and type in the zone,
+ and return an RRset encapsulating it.
+
+ This method is less efficient than the similar ``get_rdataset()``
+ because it creates an RRset instead of returning the matching
+ rdataset. It may be more convenient for some uses since it
+ returns an object which binds the owner name to the rdataset.
+
+ This method may not be used to create new nodes or rdatasets;
+ use ``get_rdataset()`` instead.
+
+ *name*: the name of the node to find.
+ The value may be a ``dns.name.Name`` or a ``str``. If absolute, the
+ name must be a subdomain of the zone's origin. If ``zone.relativize``
+ is ``True``, then the name will be relativized.
+
+ *rdtype*, a ``dns.rdataset.Rdataset`` or ``str``, the rdata type desired.
+
+ *covers*, a ``dns.rdataset.Rdataset`` or ``str``, the covered type.
+ Usually this value is ``dns.rdatatype.NONE``, but if the
+ rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
+ then the covers value will be the rdata type the SIG/RRSIG
+ covers. The library treats the SIG and RRSIG types as if they
+ were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
+ This makes RRSIGs much easier to work with than if RRSIGs
+ covering different rdata types were aggregated into a single
+ RRSIG rdataset.
+
+ *create*, a ``bool``. If true, the node will be created if it does
+ not exist.
+
+ Returns a ``dns.rrset.RRset`` or ``None``.
+ """
+
+ try:
+ rrset = self.find_rrset(name, rdtype, covers)
+ except KeyError:
+ rrset = None
+ return rrset
+
+ def iterate_rdatasets(
+ self,
+ rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.ANY,
+ covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ ) -> Iterator[Tuple[dns.name.Name, dns.rdataset.Rdataset]]:
+ """Return a generator which yields (name, rdataset) tuples for
+ all rdatasets in the zone which have the specified *rdtype*
+ and *covers*. If *rdtype* is ``dns.rdatatype.ANY``, the default,
+ then all rdatasets will be matched.
+
+ *rdtype*, a ``dns.rdataset.Rdataset`` or ``str``, the rdata type desired.
+
+ *covers*, a ``dns.rdataset.Rdataset`` or ``str``, the covered type.
+ Usually this value is ``dns.rdatatype.NONE``, but if the
+ rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
+ then the covers value will be the rdata type the SIG/RRSIG
+ covers. The library treats the SIG and RRSIG types as if they
+ were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
+ This makes RRSIGs much easier to work with than if RRSIGs
+ covering different rdata types were aggregated into a single
+ RRSIG rdataset.
+ """
+
+ rdtype = dns.rdatatype.RdataType.make(rdtype)
+ covers = dns.rdatatype.RdataType.make(covers)
+ for name, node in self.items():
+ for rds in node:
+ if rdtype == dns.rdatatype.ANY or (
+ rds.rdtype == rdtype and rds.covers == covers
+ ):
+ yield (name, rds)
+
+ def iterate_rdatas(
+ self,
+ rdtype: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.ANY,
+ covers: Union[dns.rdatatype.RdataType, str] = dns.rdatatype.NONE,
+ ) -> Iterator[Tuple[dns.name.Name, int, dns.rdata.Rdata]]:
+ """Return a generator which yields (name, ttl, rdata) tuples for
+ all rdatas in the zone which have the specified *rdtype*
+ and *covers*. If *rdtype* is ``dns.rdatatype.ANY``, the default,
+ then all rdatas will be matched.
+
+ *rdtype*, a ``dns.rdataset.Rdataset`` or ``str``, the rdata type desired.
+
+ *covers*, a ``dns.rdataset.Rdataset`` or ``str``, the covered type.
+ Usually this value is ``dns.rdatatype.NONE``, but if the
+ rdtype is ``dns.rdatatype.SIG`` or ``dns.rdatatype.RRSIG``,
+ then the covers value will be the rdata type the SIG/RRSIG
+ covers. The library treats the SIG and RRSIG types as if they
+ were a family of types, e.g. RRSIG(A), RRSIG(NS), RRSIG(SOA).
+ This makes RRSIGs much easier to work with than if RRSIGs
+ covering different rdata types were aggregated into a single
+ RRSIG rdataset.
+ """
+
+ rdtype = dns.rdatatype.RdataType.make(rdtype)
+ covers = dns.rdatatype.RdataType.make(covers)
+ for name, node in self.items():
+ for rds in node:
+ if rdtype == dns.rdatatype.ANY or (
+ rds.rdtype == rdtype and rds.covers == covers
+ ):
+ for rdata in rds:
+ yield (name, rds.ttl, rdata)
+
+ def to_file(
+ self,
+ f: Any,
+ sorted: bool = True,
+ relativize: bool = True,
+ nl: Optional[str] = None,
+ want_comments: bool = False,
+ want_origin: bool = False,
+ ) -> None:
+ """Write a zone to a file.
+
+ *f*, a file or `str`. If *f* is a string, it is treated
+ as the name of a file to open.
+
+ *sorted*, a ``bool``. If True, the default, then the file
+ will be written with the names sorted in DNSSEC order from
+ least to greatest. Otherwise the names will be written in
+ whatever order they happen to have in the zone's dictionary.
+
+ *relativize*, a ``bool``. If True, the default, then domain
+ names in the output will be relativized to the zone's origin
+ if possible.
+
+ *nl*, a ``str`` or None. The end of line string. If not
+ ``None``, the output will use the platform's native
+ end-of-line marker (i.e. LF on POSIX, CRLF on Windows).
+
+ *want_comments*, a ``bool``. If ``True``, emit end-of-line comments
+ as part of writing the file. If ``False``, the default, do not
+ emit them.
+
+ *want_origin*, a ``bool``. If ``True``, emit a $ORIGIN line at
+ the start of the file. If ``False``, the default, do not emit
+ one.
+ """
+
+ if isinstance(f, str):
+ cm: contextlib.AbstractContextManager = open(f, "wb")
+ else:
+ cm = contextlib.nullcontext(f)
+ with cm as f:
+ # must be in this way, f.encoding may contain None, or even
+ # attribute may not be there
+ file_enc = getattr(f, "encoding", None)
+ if file_enc is None:
+ file_enc = "utf-8"
+
+ if nl is None:
+ # binary mode, '\n' is not enough
+ nl_b = os.linesep.encode(file_enc)
+ nl = "\n"
+ elif isinstance(nl, str):
+ nl_b = nl.encode(file_enc)
+ else:
+ nl_b = nl
+ nl = nl.decode()
+
+ if want_origin:
+ assert self.origin is not None
+ l = "$ORIGIN " + self.origin.to_text()
+ l_b = l.encode(file_enc)
+ try:
+ f.write(l_b)
+ f.write(nl_b)
+ except TypeError: # textual mode
+ f.write(l)
+ f.write(nl)
+
+ if sorted:
+ names = list(self.keys())
+ names.sort()
+ else:
+ names = self.keys()
+ for n in names:
+ l = self[n].to_text(
+ n,
+ origin=self.origin,
+ relativize=relativize,
+ want_comments=want_comments,
+ )
+ l_b = l.encode(file_enc)
+
+ try:
+ f.write(l_b)
+ f.write(nl_b)
+ except TypeError: # textual mode
+ f.write(l)
+ f.write(nl)
+
+ def to_text(
+ self,
+ sorted: bool = True,
+ relativize: bool = True,
+ nl: Optional[str] = None,
+ want_comments: bool = False,
+ want_origin: bool = False,
+ ) -> str:
+ """Return a zone's text as though it were written to a file.
+
+ *sorted*, a ``bool``. If True, the default, then the file
+ will be written with the names sorted in DNSSEC order from
+ least to greatest. Otherwise the names will be written in
+ whatever order they happen to have in the zone's dictionary.
+
+ *relativize*, a ``bool``. If True, the default, then domain
+ names in the output will be relativized to the zone's origin
+ if possible.
+
+ *nl*, a ``str`` or None. The end of line string. If not
+ ``None``, the output will use the platform's native
+ end-of-line marker (i.e. LF on POSIX, CRLF on Windows).
+
+ *want_comments*, a ``bool``. If ``True``, emit end-of-line comments
+ as part of writing the file. If ``False``, the default, do not
+ emit them.
+
+ *want_origin*, a ``bool``. If ``True``, emit a $ORIGIN line at
+ the start of the output. If ``False``, the default, do not emit
+ one.
+
+ Returns a ``str``.
+ """
+ temp_buffer = io.StringIO()
+ self.to_file(temp_buffer, sorted, relativize, nl, want_comments, want_origin)
+ return_value = temp_buffer.getvalue()
+ temp_buffer.close()
+ return return_value
+
+ def check_origin(self) -> None:
+ """Do some simple checking of the zone's origin.
+
+ Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
+
+ Raises ``dns.zone.NoNS`` if there is no NS RRset.
+
+ Raises ``KeyError`` if there is no origin node.
+ """
+ if self.relativize:
+ name = dns.name.empty
+ else:
+ assert self.origin is not None
+ name = self.origin
+ if self.get_rdataset(name, dns.rdatatype.SOA) is None:
+ raise NoSOA
+ if self.get_rdataset(name, dns.rdatatype.NS) is None:
+ raise NoNS
+
+ def get_soa(
+ self, txn: Optional[dns.transaction.Transaction] = None
+ ) -> dns.rdtypes.ANY.SOA.SOA:
+ """Get the zone SOA rdata.
+
+ Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
+
+ Returns a ``dns.rdtypes.ANY.SOA.SOA`` Rdata.
+ """
+ if self.relativize:
+ origin_name = dns.name.empty
+ else:
+ if self.origin is None:
+ # get_soa() has been called very early, and there must not be
+ # an SOA if there is no origin.
+ raise NoSOA
+ origin_name = self.origin
+ soa: Optional[dns.rdataset.Rdataset]
+ if txn:
+ soa = txn.get(origin_name, dns.rdatatype.SOA)
+ else:
+ soa = self.get_rdataset(origin_name, dns.rdatatype.SOA)
+ if soa is None:
+ raise NoSOA
+ return soa[0]
+
+ def _compute_digest(
+ self,
+ hash_algorithm: DigestHashAlgorithm,
+ scheme: DigestScheme = DigestScheme.SIMPLE,
+ ) -> bytes:
+ hashinfo = _digest_hashers.get(hash_algorithm)
+ if not hashinfo:
+ raise UnsupportedDigestHashAlgorithm
+ if scheme != DigestScheme.SIMPLE:
+ raise UnsupportedDigestScheme
+
+ if self.relativize:
+ origin_name = dns.name.empty
+ else:
+ assert self.origin is not None
+ origin_name = self.origin
+ hasher = hashinfo()
+ for name, node in sorted(self.items()):
+ rrnamebuf = name.to_digestable(self.origin)
+ for rdataset in sorted(node, key=lambda rds: (rds.rdtype, rds.covers)):
+ if name == origin_name and dns.rdatatype.ZONEMD in (
+ rdataset.rdtype,
+ rdataset.covers,
+ ):
+ continue
+ rrfixed = struct.pack(
+ "!HHI", rdataset.rdtype, rdataset.rdclass, rdataset.ttl
+ )
+ rdatas = [rdata.to_digestable(self.origin) for rdata in rdataset]
+ for rdata in sorted(rdatas):
+ rrlen = struct.pack("!H", len(rdata))
+ hasher.update(rrnamebuf + rrfixed + rrlen + rdata)
+ return hasher.digest()
+
+ def compute_digest(
+ self,
+ hash_algorithm: DigestHashAlgorithm,
+ scheme: DigestScheme = DigestScheme.SIMPLE,
+ ) -> dns.rdtypes.ANY.ZONEMD.ZONEMD:
+ serial = self.get_soa().serial
+ digest = self._compute_digest(hash_algorithm, scheme)
+ return dns.rdtypes.ANY.ZONEMD.ZONEMD(
+ self.rdclass, dns.rdatatype.ZONEMD, serial, scheme, hash_algorithm, digest
+ )
+
+ def verify_digest(
+ self, zonemd: Optional[dns.rdtypes.ANY.ZONEMD.ZONEMD] = None
+ ) -> None:
+ digests: Union[dns.rdataset.Rdataset, List[dns.rdtypes.ANY.ZONEMD.ZONEMD]]
+ if zonemd:
+ digests = [zonemd]
+ else:
+ assert self.origin is not None
+ rds = self.get_rdataset(self.origin, dns.rdatatype.ZONEMD)
+ if rds is None:
+ raise NoDigest
+ digests = rds
+ for digest in digests:
+ try:
+ computed = self._compute_digest(digest.hash_algorithm, digest.scheme)
+ if computed == digest.digest:
+ return
+ except Exception:
+ pass
+ raise DigestVerificationFailure
+
+ # TransactionManager methods
+
+ def reader(self) -> "Transaction":
+ return Transaction(self, False, Version(self, 1, self.nodes, self.origin))
+
+ def writer(self, replacement: bool = False) -> "Transaction":
+ txn = Transaction(self, replacement)
+ txn._setup_version()
+ return txn
+
+ def origin_information(
+ self,
+ ) -> Tuple[Optional[dns.name.Name], bool, Optional[dns.name.Name]]:
+ effective: Optional[dns.name.Name]
+ if self.relativize:
+ effective = dns.name.empty
+ else:
+ effective = self.origin
+ return (self.origin, self.relativize, effective)
+
+ def get_class(self):
+ return self.rdclass
+
+ # Transaction methods
+
+ def _end_read(self, txn):
+ pass
+
+ def _end_write(self, txn):
+ pass
+
+ def _commit_version(self, _, version, origin):
+ self.nodes = version.nodes
+ if self.origin is None:
+ self.origin = origin
+
+ def _get_next_version_id(self):
+ # Versions are ephemeral and all have id 1
+ return 1
+
+
+# These classes used to be in dns.versioned, but have moved here so we can use
+# the copy-on-write transaction mechanism for both kinds of zones. In a
+# regular zone, the version only exists during the transaction, and the nodes
+# are regular dns.node.Nodes.
+
+# A node with a version id.
+
+
+class VersionedNode(dns.node.Node): # lgtm[py/missing-equals]
+ __slots__ = ["id"]
+
+ def __init__(self):
+ super().__init__()
+ # A proper id will get set by the Version
+ self.id = 0
+
+
+@dns.immutable.immutable
+class ImmutableVersionedNode(VersionedNode):
+ def __init__(self, node):
+ super().__init__()
+ self.id = node.id
+ self.rdatasets = tuple(
+ [dns.rdataset.ImmutableRdataset(rds) for rds in node.rdatasets]
+ )
+
+ def find_rdataset(
+ self,
+ rdclass: dns.rdataclass.RdataClass,
+ rdtype: dns.rdatatype.RdataType,
+ covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
+ create: bool = False,
+ ) -> dns.rdataset.Rdataset:
+ if create:
+ raise TypeError("immutable")
+ return super().find_rdataset(rdclass, rdtype, covers, False)
+
+ def get_rdataset(
+ self,
+ rdclass: dns.rdataclass.RdataClass,
+ rdtype: dns.rdatatype.RdataType,
+ covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
+ create: bool = False,
+ ) -> Optional[dns.rdataset.Rdataset]:
+ if create:
+ raise TypeError("immutable")
+ return super().get_rdataset(rdclass, rdtype, covers, False)
+
+ def delete_rdataset(
+ self,
+ rdclass: dns.rdataclass.RdataClass,
+ rdtype: dns.rdatatype.RdataType,
+ covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
+ ) -> None:
+ raise TypeError("immutable")
+
+ def replace_rdataset(self, replacement: dns.rdataset.Rdataset) -> None:
+ raise TypeError("immutable")
+
+ def is_immutable(self) -> bool:
+ return True
+
+
+class Version:
+ def __init__(
+ self,
+ zone: Zone,
+ id: int,
+ nodes: Optional[MutableMapping[dns.name.Name, dns.node.Node]] = None,
+ origin: Optional[dns.name.Name] = None,
+ ):
+ self.zone = zone
+ self.id = id
+ if nodes is not None:
+ self.nodes = nodes
+ else:
+ self.nodes = zone.map_factory()
+ self.origin = origin
+
+ def _validate_name(self, name: dns.name.Name) -> dns.name.Name:
+ return _validate_name(name, self.origin, self.zone.relativize)
+
+ def get_node(self, name: dns.name.Name) -> Optional[dns.node.Node]:
+ name = self._validate_name(name)
+ return self.nodes.get(name)
+
+ def get_rdataset(
+ self,
+ name: dns.name.Name,
+ rdtype: dns.rdatatype.RdataType,
+ covers: dns.rdatatype.RdataType,
+ ) -> Optional[dns.rdataset.Rdataset]:
+ node = self.get_node(name)
+ if node is None:
+ return None
+ return node.get_rdataset(self.zone.rdclass, rdtype, covers)
+
+ def keys(self):
+ return self.nodes.keys()
+
+ def items(self):
+ return self.nodes.items()
+
+
+class WritableVersion(Version):
+ def __init__(self, zone: Zone, replacement: bool = False):
+ # The zone._versions_lock must be held by our caller in a versioned
+ # zone.
+ id = zone._get_next_version_id()
+ super().__init__(zone, id)
+ if not replacement:
+ # We copy the map, because that gives us a simple and thread-safe
+ # way of doing versions, and we have a garbage collector to help
+ # us. We only make new node objects if we actually change the
+ # node.
+ self.nodes.update(zone.nodes)
+ # We have to copy the zone origin as it may be None in the first
+ # version, and we don't want to mutate the zone until we commit.
+ self.origin = zone.origin
+ self.changed: Set[dns.name.Name] = set()
+
+ def _maybe_cow(self, name: dns.name.Name) -> dns.node.Node:
+ name = self._validate_name(name)
+ node = self.nodes.get(name)
+ if node is None or name not in self.changed:
+ new_node = self.zone.node_factory()
+ if hasattr(new_node, "id"):
+ # We keep doing this for backwards compatibility, as earlier
+ # code used new_node.id != self.id for the "do we need to CoW?"
+ # test. Now we use the changed set as this works with both
+ # regular zones and versioned zones.
+ #
+ # We ignore the mypy error as this is safe but it doesn't see it.
+ new_node.id = self.id # type: ignore
+ if node is not None:
+ # moo! copy on write!
+ new_node.rdatasets.extend(node.rdatasets)
+ self.nodes[name] = new_node
+ self.changed.add(name)
+ return new_node
+ else:
+ return node
+
+ def delete_node(self, name: dns.name.Name) -> None:
+ name = self._validate_name(name)
+ if name in self.nodes:
+ del self.nodes[name]
+ self.changed.add(name)
+
+ def put_rdataset(
+ self, name: dns.name.Name, rdataset: dns.rdataset.Rdataset
+ ) -> None:
+ node = self._maybe_cow(name)
+ node.replace_rdataset(rdataset)
+
+ def delete_rdataset(
+ self,
+ name: dns.name.Name,
+ rdtype: dns.rdatatype.RdataType,
+ covers: dns.rdatatype.RdataType,
+ ) -> None:
+ node = self._maybe_cow(name)
+ node.delete_rdataset(self.zone.rdclass, rdtype, covers)
+ if len(node) == 0:
+ del self.nodes[name]
+
+
+@dns.immutable.immutable
+class ImmutableVersion(Version):
+ def __init__(self, version: WritableVersion):
+ # We tell super() that it's a replacement as we don't want it
+ # to copy the nodes, as we're about to do that with an
+ # immutable Dict.
+ super().__init__(version.zone, True)
+ # set the right id!
+ self.id = version.id
+ # keep the origin
+ self.origin = version.origin
+ # Make changed nodes immutable
+ for name in version.changed:
+ node = version.nodes.get(name)
+ # it might not exist if we deleted it in the version
+ if node:
+ version.nodes[name] = ImmutableVersionedNode(node)
+ # We're changing the type of the nodes dictionary here on purpose, so
+ # we ignore the mypy error.
+ self.nodes = dns.immutable.Dict(
+ version.nodes, True, self.zone.map_factory
+ ) # type: ignore
+
+
+class Transaction(dns.transaction.Transaction):
+ def __init__(self, zone, replacement, version=None, make_immutable=False):
+ read_only = version is not None
+ super().__init__(zone, replacement, read_only)
+ self.version = version
+ self.make_immutable = make_immutable
+
+ @property
+ def zone(self):
+ return self.manager
+
+ def _setup_version(self):
+ assert self.version is None
+ factory = self.manager.writable_version_factory
+ if factory is None:
+ factory = WritableVersion
+ self.version = factory(self.zone, self.replacement)
+
+ def _get_rdataset(self, name, rdtype, covers):
+ return self.version.get_rdataset(name, rdtype, covers)
+
+ def _put_rdataset(self, name, rdataset):
+ assert not self.read_only
+ self.version.put_rdataset(name, rdataset)
+
+ def _delete_name(self, name):
+ assert not self.read_only
+ self.version.delete_node(name)
+
+ def _delete_rdataset(self, name, rdtype, covers):
+ assert not self.read_only
+ self.version.delete_rdataset(name, rdtype, covers)
+
+ def _name_exists(self, name):
+ return self.version.get_node(name) is not None
+
+ def _changed(self):
+ if self.read_only:
+ return False
+ else:
+ return len(self.version.changed) > 0
+
+ def _end_transaction(self, commit):
+ if self.read_only:
+ self.zone._end_read(self)
+ elif commit and len(self.version.changed) > 0:
+ if self.make_immutable:
+ factory = self.manager.immutable_version_factory
+ if factory is None:
+ factory = ImmutableVersion
+ version = factory(self.version)
+ else:
+ version = self.version
+ self.zone._commit_version(self, version, self.version.origin)
+ else:
+ # rollback
+ self.zone._end_write(self)
+
+ def _set_origin(self, origin):
+ if self.version.origin is None:
+ self.version.origin = origin
+
+ def _iterate_rdatasets(self):
+ for name, node in self.version.items():
+ for rdataset in node:
+ yield (name, rdataset)
+
+ def _iterate_names(self):
+ return self.version.keys()
+
+ def _get_node(self, name):
+ return self.version.get_node(name)
+
+ def _origin_information(self):
+ (absolute, relativize, effective) = self.manager.origin_information()
+ if absolute is None and self.version.origin is not None:
+ # No origin has been committed yet, but we've learned one as part of
+ # this txn. Use it.
+ absolute = self.version.origin
+ if relativize:
+ effective = dns.name.empty
+ else:
+ effective = absolute
+ return (absolute, relativize, effective)
+
+
+def _from_text(
+ text: Any,
+ origin: Optional[Union[dns.name.Name, str]] = None,
+ rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
+ relativize: bool = True,
+ zone_factory: Any = Zone,
+ filename: Optional[str] = None,
+ allow_include: bool = False,
+ check_origin: bool = True,
+ idna_codec: Optional[dns.name.IDNACodec] = None,
+ allow_directives: Union[bool, Iterable[str]] = True,
+) -> Zone:
+ # See the comments for the public APIs from_text() and from_file() for
+ # details.
+
+ # 'text' can also be a file, but we don't publish that fact
+ # since it's an implementation detail. The official file
+ # interface is from_file().
+
+ if filename is None:
+ filename = "<string>"
+ zone = zone_factory(origin, rdclass, relativize=relativize)
+ with zone.writer(True) as txn:
+ tok = dns.tokenizer.Tokenizer(text, filename, idna_codec=idna_codec)
+ reader = dns.zonefile.Reader(
+ tok,
+ rdclass,
+ txn,
+ allow_include=allow_include,
+ allow_directives=allow_directives,
+ )
+ try:
+ reader.read()
+ except dns.zonefile.UnknownOrigin:
+ # for backwards compatibility
+ raise dns.zone.UnknownOrigin
+ # Now that we're done reading, do some basic checking of the zone.
+ if check_origin:
+ zone.check_origin()
+ return zone
+
+
+def from_text(
+ text: str,
+ origin: Optional[Union[dns.name.Name, str]] = None,
+ rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
+ relativize: bool = True,
+ zone_factory: Any = Zone,
+ filename: Optional[str] = None,
+ allow_include: bool = False,
+ check_origin: bool = True,
+ idna_codec: Optional[dns.name.IDNACodec] = None,
+ allow_directives: Union[bool, Iterable[str]] = True,
+) -> Zone:
+ """Build a zone object from a zone file format string.
+
+ *text*, a ``str``, the zone file format input.
+
+ *origin*, a ``dns.name.Name``, a ``str``, or ``None``. The origin
+ of the zone; if not specified, the first ``$ORIGIN`` statement in the
+ zone file will determine the origin of the zone.
+
+ *rdclass*, a ``dns.rdataclass.RdataClass``, the zone's rdata class; the default is
+ class IN.
+
+ *relativize*, a ``bool``, determine's whether domain names are
+ relativized to the zone's origin. The default is ``True``.
+
+ *zone_factory*, the zone factory to use or ``None``. If ``None``, then
+ ``dns.zone.Zone`` will be used. The value may be any class or callable
+ that returns a subclass of ``dns.zone.Zone``.
+
+ *filename*, a ``str`` or ``None``, the filename to emit when
+ describing where an error occurred; the default is ``'<string>'``.
+
+ *allow_include*, a ``bool``. If ``True``, the default, then ``$INCLUDE``
+ directives are permitted. If ``False``, then encoutering a ``$INCLUDE``
+ will raise a ``SyntaxError`` exception.
+
+ *check_origin*, a ``bool``. If ``True``, the default, then sanity
+ checks of the origin node will be made by calling the zone's
+ ``check_origin()`` method.
+
+ *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
+ encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
+ is used.
+
+ *allow_directives*, a ``bool`` or an iterable of `str`. If ``True``, the default,
+ then directives are permitted, and the *allow_include* parameter controls whether
+ ``$INCLUDE`` is permitted. If ``False`` or an empty iterable, then no directive
+ processing is done and any directive-like text will be treated as a regular owner
+ name. If a non-empty iterable, then only the listed directives (including the
+ ``$``) are allowed.
+
+ Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
+
+ Raises ``dns.zone.NoNS`` if there is no NS RRset.
+
+ Raises ``KeyError`` if there is no origin node.
+
+ Returns a subclass of ``dns.zone.Zone``.
+ """
+ return _from_text(
+ text,
+ origin,
+ rdclass,
+ relativize,
+ zone_factory,
+ filename,
+ allow_include,
+ check_origin,
+ idna_codec,
+ allow_directives,
+ )
+
+
+def from_file(
+ f: Any,
+ origin: Optional[Union[dns.name.Name, str]] = None,
+ rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
+ relativize: bool = True,
+ zone_factory: Any = Zone,
+ filename: Optional[str] = None,
+ allow_include: bool = True,
+ check_origin: bool = True,
+ idna_codec: Optional[dns.name.IDNACodec] = None,
+ allow_directives: Union[bool, Iterable[str]] = True,
+) -> Zone:
+ """Read a zone file and build a zone object.
+
+ *f*, a file or ``str``. If *f* is a string, it is treated
+ as the name of a file to open.
+
+ *origin*, a ``dns.name.Name``, a ``str``, or ``None``. The origin
+ of the zone; if not specified, the first ``$ORIGIN`` statement in the
+ zone file will determine the origin of the zone.
+
+ *rdclass*, an ``int``, the zone's rdata class; the default is class IN.
+
+ *relativize*, a ``bool``, determine's whether domain names are
+ relativized to the zone's origin. The default is ``True``.
+
+ *zone_factory*, the zone factory to use or ``None``. If ``None``, then
+ ``dns.zone.Zone`` will be used. The value may be any class or callable
+ that returns a subclass of ``dns.zone.Zone``.
+
+ *filename*, a ``str`` or ``None``, the filename to emit when
+ describing where an error occurred; the default is ``'<string>'``.
+
+ *allow_include*, a ``bool``. If ``True``, the default, then ``$INCLUDE``
+ directives are permitted. If ``False``, then encoutering a ``$INCLUDE``
+ will raise a ``SyntaxError`` exception.
+
+ *check_origin*, a ``bool``. If ``True``, the default, then sanity
+ checks of the origin node will be made by calling the zone's
+ ``check_origin()`` method.
+
+ *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
+ encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
+ is used.
+
+ *allow_directives*, a ``bool`` or an iterable of `str`. If ``True``, the default,
+ then directives are permitted, and the *allow_include* parameter controls whether
+ ``$INCLUDE`` is permitted. If ``False`` or an empty iterable, then no directive
+ processing is done and any directive-like text will be treated as a regular owner
+ name. If a non-empty iterable, then only the listed directives (including the
+ ``$``) are allowed.
+
+ Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
+
+ Raises ``dns.zone.NoNS`` if there is no NS RRset.
+
+ Raises ``KeyError`` if there is no origin node.
+
+ Returns a subclass of ``dns.zone.Zone``.
+ """
+
+ if isinstance(f, str):
+ if filename is None:
+ filename = f
+ cm: contextlib.AbstractContextManager = open(f)
+ else:
+ cm = contextlib.nullcontext(f)
+ with cm as f:
+ return _from_text(
+ f,
+ origin,
+ rdclass,
+ relativize,
+ zone_factory,
+ filename,
+ allow_include,
+ check_origin,
+ idna_codec,
+ allow_directives,
+ )
+ assert False # make mypy happy lgtm[py/unreachable-statement]
+
+
+def from_xfr(
+ xfr: Any,
+ zone_factory: Any = Zone,
+ relativize: bool = True,
+ check_origin: bool = True,
+) -> Zone:
+ """Convert the output of a zone transfer generator into a zone object.
+
+ *xfr*, a generator of ``dns.message.Message`` objects, typically
+ ``dns.query.xfr()``.
+
+ *relativize*, a ``bool``, determine's whether domain names are
+ relativized to the zone's origin. The default is ``True``.
+ It is essential that the relativize setting matches the one specified
+ to the generator.
+
+ *check_origin*, a ``bool``. If ``True``, the default, then sanity
+ checks of the origin node will be made by calling the zone's
+ ``check_origin()`` method.
+
+ Raises ``dns.zone.NoSOA`` if there is no SOA RRset.
+
+ Raises ``dns.zone.NoNS`` if there is no NS RRset.
+
+ Raises ``KeyError`` if there is no origin node.
+
+ Raises ``ValueError`` if no messages are yielded by the generator.
+
+ Returns a subclass of ``dns.zone.Zone``.
+ """
+
+ z = None
+ for r in xfr:
+ if z is None:
+ if relativize:
+ origin = r.origin
+ else:
+ origin = r.answer[0].name
+ rdclass = r.answer[0].rdclass
+ z = zone_factory(origin, rdclass, relativize=relativize)
+ for rrset in r.answer:
+ znode = z.nodes.get(rrset.name)
+ if not znode:
+ znode = z.node_factory()
+ z.nodes[rrset.name] = znode
+ zrds = znode.find_rdataset(rrset.rdclass, rrset.rdtype, rrset.covers, True)
+ zrds.update_ttl(rrset.ttl)
+ for rd in rrset:
+ zrds.add(rd)
+ if z is None:
+ raise ValueError("empty transfer")
+ if check_origin:
+ z.check_origin()
+ return z