about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/dns/update.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/dns/update.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/update.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/update.py386
1 files changed, 386 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/update.py b/.venv/lib/python3.12/site-packages/dns/update.py
new file mode 100644
index 00000000..bf1157ac
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/update.py
@@ -0,0 +1,386 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+# Copyright (C) 2003-2007, 2009-2011 Nominum, Inc.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose with or without fee is hereby granted,
+# provided that the above copyright notice and this permission notice
+# appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""DNS Dynamic Update Support"""
+
+from typing import Any, List, Optional, Union
+
+import dns.message
+import dns.name
+import dns.opcode
+import dns.rdata
+import dns.rdataclass
+import dns.rdataset
+import dns.rdatatype
+import dns.tsig
+
+
+class UpdateSection(dns.enum.IntEnum):
+    """Update sections"""
+
+    ZONE = 0
+    PREREQ = 1
+    UPDATE = 2
+    ADDITIONAL = 3
+
+    @classmethod
+    def _maximum(cls):
+        return 3
+
+
+class UpdateMessage(dns.message.Message):  # lgtm[py/missing-equals]
+    # ignore the mypy error here as we mean to use a different enum
+    _section_enum = UpdateSection  # type: ignore
+
+    def __init__(
+        self,
+        zone: Optional[Union[dns.name.Name, str]] = None,
+        rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
+        keyring: Optional[Any] = None,
+        keyname: Optional[dns.name.Name] = None,
+        keyalgorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm,
+        id: Optional[int] = None,
+    ):
+        """Initialize a new DNS Update object.
+
+        See the documentation of the Message class for a complete
+        description of the keyring dictionary.
+
+        *zone*, a ``dns.name.Name``, ``str``, or ``None``, the zone
+        which is being updated.  ``None`` should only be used by dnspython's
+        message constructors, as a zone is required for the convenience
+        methods like ``add()``, ``replace()``, etc.
+
+        *rdclass*, an ``int`` or ``str``, the class of the zone.
+
+        The *keyring*, *keyname*, and *keyalgorithm* parameters are passed to
+        ``use_tsig()``; see its documentation for details.
+        """
+        super().__init__(id=id)
+        self.flags |= dns.opcode.to_flags(dns.opcode.UPDATE)
+        if isinstance(zone, str):
+            zone = dns.name.from_text(zone)
+        self.origin = zone
+        rdclass = dns.rdataclass.RdataClass.make(rdclass)
+        self.zone_rdclass = rdclass
+        if self.origin:
+            self.find_rrset(
+                self.zone,
+                self.origin,
+                rdclass,
+                dns.rdatatype.SOA,
+                create=True,
+                force_unique=True,
+            )
+        if keyring is not None:
+            self.use_tsig(keyring, keyname, algorithm=keyalgorithm)
+
+    @property
+    def zone(self) -> List[dns.rrset.RRset]:
+        """The zone section."""
+        return self.sections[0]
+
+    @zone.setter
+    def zone(self, v):
+        self.sections[0] = v
+
+    @property
+    def prerequisite(self) -> List[dns.rrset.RRset]:
+        """The prerequisite section."""
+        return self.sections[1]
+
+    @prerequisite.setter
+    def prerequisite(self, v):
+        self.sections[1] = v
+
+    @property
+    def update(self) -> List[dns.rrset.RRset]:
+        """The update section."""
+        return self.sections[2]
+
+    @update.setter
+    def update(self, v):
+        self.sections[2] = v
+
+    def _add_rr(self, name, ttl, rd, deleting=None, section=None):
+        """Add a single RR to the update section."""
+
+        if section is None:
+            section = self.update
+        covers = rd.covers()
+        rrset = self.find_rrset(
+            section, name, self.zone_rdclass, rd.rdtype, covers, deleting, True, True
+        )
+        rrset.add(rd, ttl)
+
+    def _add(self, replace, section, name, *args):
+        """Add records.
+
+        *replace* is the replacement mode.  If ``False``,
+        RRs are added to an existing RRset; if ``True``, the RRset
+        is replaced with the specified contents.  The second
+        argument is the section to add to.  The third argument
+        is always a name.  The other arguments can be:
+
+                - rdataset...
+
+                - ttl, rdata...
+
+                - ttl, rdtype, string...
+        """
+
+        if isinstance(name, str):
+            name = dns.name.from_text(name, None)
+        if isinstance(args[0], dns.rdataset.Rdataset):
+            for rds in args:
+                if replace:
+                    self.delete(name, rds.rdtype)
+                for rd in rds:
+                    self._add_rr(name, rds.ttl, rd, section=section)
+        else:
+            args = list(args)
+            ttl = int(args.pop(0))
+            if isinstance(args[0], dns.rdata.Rdata):
+                if replace:
+                    self.delete(name, args[0].rdtype)
+                for rd in args:
+                    self._add_rr(name, ttl, rd, section=section)
+            else:
+                rdtype = dns.rdatatype.RdataType.make(args.pop(0))
+                if replace:
+                    self.delete(name, rdtype)
+                for s in args:
+                    rd = dns.rdata.from_text(self.zone_rdclass, rdtype, s, self.origin)
+                    self._add_rr(name, ttl, rd, section=section)
+
+    def add(self, name: Union[dns.name.Name, str], *args: Any) -> None:
+        """Add records.
+
+        The first argument is always a name.  The other
+        arguments can be:
+
+                - rdataset...
+
+                - ttl, rdata...
+
+                - ttl, rdtype, string...
+        """
+
+        self._add(False, self.update, name, *args)
+
+    def delete(self, name: Union[dns.name.Name, str], *args: Any) -> None:
+        """Delete records.
+
+        The first argument is always a name.  The other
+        arguments can be:
+
+                - *empty*
+
+                - rdataset...
+
+                - rdata...
+
+                - rdtype, [string...]
+        """
+
+        if isinstance(name, str):
+            name = dns.name.from_text(name, None)
+        if len(args) == 0:
+            self.find_rrset(
+                self.update,
+                name,
+                dns.rdataclass.ANY,
+                dns.rdatatype.ANY,
+                dns.rdatatype.NONE,
+                dns.rdataclass.ANY,
+                True,
+                True,
+            )
+        elif isinstance(args[0], dns.rdataset.Rdataset):
+            for rds in args:
+                for rd in rds:
+                    self._add_rr(name, 0, rd, dns.rdataclass.NONE)
+        else:
+            largs = list(args)
+            if isinstance(largs[0], dns.rdata.Rdata):
+                for rd in largs:
+                    self._add_rr(name, 0, rd, dns.rdataclass.NONE)
+            else:
+                rdtype = dns.rdatatype.RdataType.make(largs.pop(0))
+                if len(largs) == 0:
+                    self.find_rrset(
+                        self.update,
+                        name,
+                        self.zone_rdclass,
+                        rdtype,
+                        dns.rdatatype.NONE,
+                        dns.rdataclass.ANY,
+                        True,
+                        True,
+                    )
+                else:
+                    for s in largs:
+                        rd = dns.rdata.from_text(
+                            self.zone_rdclass,
+                            rdtype,
+                            s,  # type: ignore[arg-type]
+                            self.origin,
+                        )
+                        self._add_rr(name, 0, rd, dns.rdataclass.NONE)
+
+    def replace(self, name: Union[dns.name.Name, str], *args: Any) -> None:
+        """Replace records.
+
+        The first argument is always a name.  The other
+        arguments can be:
+
+                - rdataset...
+
+                - ttl, rdata...
+
+                - ttl, rdtype, string...
+
+        Note that if you want to replace the entire node, you should do
+        a delete of the name followed by one or more calls to add.
+        """
+
+        self._add(True, self.update, name, *args)
+
+    def present(self, name: Union[dns.name.Name, str], *args: Any) -> None:
+        """Require that an owner name (and optionally an rdata type,
+        or specific rdataset) exists as a prerequisite to the
+        execution of the update.
+
+        The first argument is always a name.
+        The other arguments can be:
+
+                - rdataset...
+
+                - rdata...
+
+                - rdtype, string...
+        """
+
+        if isinstance(name, str):
+            name = dns.name.from_text(name, None)
+        if len(args) == 0:
+            self.find_rrset(
+                self.prerequisite,
+                name,
+                dns.rdataclass.ANY,
+                dns.rdatatype.ANY,
+                dns.rdatatype.NONE,
+                None,
+                True,
+                True,
+            )
+        elif (
+            isinstance(args[0], dns.rdataset.Rdataset)
+            or isinstance(args[0], dns.rdata.Rdata)
+            or len(args) > 1
+        ):
+            if not isinstance(args[0], dns.rdataset.Rdataset):
+                # Add a 0 TTL
+                largs = list(args)
+                largs.insert(0, 0)  # type: ignore[arg-type]
+                self._add(False, self.prerequisite, name, *largs)
+            else:
+                self._add(False, self.prerequisite, name, *args)
+        else:
+            rdtype = dns.rdatatype.RdataType.make(args[0])
+            self.find_rrset(
+                self.prerequisite,
+                name,
+                dns.rdataclass.ANY,
+                rdtype,
+                dns.rdatatype.NONE,
+                None,
+                True,
+                True,
+            )
+
+    def absent(
+        self,
+        name: Union[dns.name.Name, str],
+        rdtype: Optional[Union[dns.rdatatype.RdataType, str]] = None,
+    ) -> None:
+        """Require that an owner name (and optionally an rdata type) does
+        not exist as a prerequisite to the execution of the update."""
+
+        if isinstance(name, str):
+            name = dns.name.from_text(name, None)
+        if rdtype is None:
+            self.find_rrset(
+                self.prerequisite,
+                name,
+                dns.rdataclass.NONE,
+                dns.rdatatype.ANY,
+                dns.rdatatype.NONE,
+                None,
+                True,
+                True,
+            )
+        else:
+            rdtype = dns.rdatatype.RdataType.make(rdtype)
+            self.find_rrset(
+                self.prerequisite,
+                name,
+                dns.rdataclass.NONE,
+                rdtype,
+                dns.rdatatype.NONE,
+                None,
+                True,
+                True,
+            )
+
+    def _get_one_rr_per_rrset(self, value):
+        # Updates are always one_rr_per_rrset
+        return True
+
+    def _parse_rr_header(self, section, name, rdclass, rdtype):
+        deleting = None
+        empty = False
+        if section == UpdateSection.ZONE:
+            if (
+                dns.rdataclass.is_metaclass(rdclass)
+                or rdtype != dns.rdatatype.SOA
+                or self.zone
+            ):
+                raise dns.exception.FormError
+        else:
+            if not self.zone:
+                raise dns.exception.FormError
+            if rdclass in (dns.rdataclass.ANY, dns.rdataclass.NONE):
+                deleting = rdclass
+                rdclass = self.zone[0].rdclass
+                empty = (
+                    deleting == dns.rdataclass.ANY or section == UpdateSection.PREREQ
+                )
+        return (rdclass, rdtype, deleting, empty)
+
+
+# backwards compatibility
+Update = UpdateMessage
+
+### BEGIN generated UpdateSection constants
+
+ZONE = UpdateSection.ZONE
+PREREQ = UpdateSection.PREREQ
+UPDATE = UpdateSection.UPDATE
+ADDITIONAL = UpdateSection.ADDITIONAL
+
+### END generated UpdateSection constants