about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/dns/dnssec.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/dns/dnssec.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/dns/dnssec.py')
-rw-r--r--.venv/lib/python3.12/site-packages/dns/dnssec.py1247
1 files changed, 1247 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/dns/dnssec.py b/.venv/lib/python3.12/site-packages/dns/dnssec.py
new file mode 100644
index 00000000..b69d0a12
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/dns/dnssec.py
@@ -0,0 +1,1247 @@
+# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
+
+# Copyright (C) 2003-2017 Nominum, Inc.
+#
+# Permission to use, copy, modify, and distribute this software and its
+# documentation for any purpose with or without fee is hereby granted,
+# provided that the above copyright notice and this permission notice
+# appear in all copies.
+#
+# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
+# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
+# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
+# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+"""Common DNSSEC-related functions and constants."""
+
+
+import base64
+import contextlib
+import functools
+import hashlib
+import struct
+import time
+from datetime import datetime
+from typing import Callable, Dict, List, Optional, Set, Tuple, Union, cast
+
+import dns._features
+import dns.exception
+import dns.name
+import dns.node
+import dns.rdata
+import dns.rdataclass
+import dns.rdataset
+import dns.rdatatype
+import dns.rrset
+import dns.transaction
+import dns.zone
+from dns.dnssectypes import Algorithm, DSDigest, NSEC3Hash
+from dns.exception import (  # pylint: disable=W0611
+    AlgorithmKeyMismatch,
+    DeniedByPolicy,
+    UnsupportedAlgorithm,
+    ValidationFailure,
+)
+from dns.rdtypes.ANY.CDNSKEY import CDNSKEY
+from dns.rdtypes.ANY.CDS import CDS
+from dns.rdtypes.ANY.DNSKEY import DNSKEY
+from dns.rdtypes.ANY.DS import DS
+from dns.rdtypes.ANY.NSEC import NSEC, Bitmap
+from dns.rdtypes.ANY.NSEC3PARAM import NSEC3PARAM
+from dns.rdtypes.ANY.RRSIG import RRSIG, sigtime_to_posixtime
+from dns.rdtypes.dnskeybase import Flag
+
+PublicKey = Union[
+    "GenericPublicKey",
+    "rsa.RSAPublicKey",
+    "ec.EllipticCurvePublicKey",
+    "ed25519.Ed25519PublicKey",
+    "ed448.Ed448PublicKey",
+]
+
+PrivateKey = Union[
+    "GenericPrivateKey",
+    "rsa.RSAPrivateKey",
+    "ec.EllipticCurvePrivateKey",
+    "ed25519.Ed25519PrivateKey",
+    "ed448.Ed448PrivateKey",
+]
+
+RRsetSigner = Callable[[dns.transaction.Transaction, dns.rrset.RRset], None]
+
+
+def algorithm_from_text(text: str) -> Algorithm:
+    """Convert text into a DNSSEC algorithm value.
+
+    *text*, a ``str``, the text to convert to into an algorithm value.
+
+    Returns an ``int``.
+    """
+
+    return Algorithm.from_text(text)
+
+
+def algorithm_to_text(value: Union[Algorithm, int]) -> str:
+    """Convert a DNSSEC algorithm value to text
+
+    *value*, a ``dns.dnssec.Algorithm``.
+
+    Returns a ``str``, the name of a DNSSEC algorithm.
+    """
+
+    return Algorithm.to_text(value)
+
+
+def to_timestamp(value: Union[datetime, str, float, int]) -> int:
+    """Convert various format to a timestamp"""
+    if isinstance(value, datetime):
+        return int(value.timestamp())
+    elif isinstance(value, str):
+        return sigtime_to_posixtime(value)
+    elif isinstance(value, float):
+        return int(value)
+    elif isinstance(value, int):
+        return value
+    else:
+        raise TypeError("Unsupported timestamp type")
+
+
+def key_id(key: Union[DNSKEY, CDNSKEY]) -> int:
+    """Return the key id (a 16-bit number) for the specified key.
+
+    *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY``
+
+    Returns an ``int`` between 0 and 65535
+    """
+
+    rdata = key.to_wire()
+    assert rdata is not None  # for mypy
+    if key.algorithm == Algorithm.RSAMD5:
+        return (rdata[-3] << 8) + rdata[-2]
+    else:
+        total = 0
+        for i in range(len(rdata) // 2):
+            total += (rdata[2 * i] << 8) + rdata[2 * i + 1]
+        if len(rdata) % 2 != 0:
+            total += rdata[len(rdata) - 1] << 8
+        total += (total >> 16) & 0xFFFF
+        return total & 0xFFFF
+
+
+class Policy:
+    def __init__(self):
+        pass
+
+    def ok_to_sign(self, _: DNSKEY) -> bool:  # pragma: no cover
+        return False
+
+    def ok_to_validate(self, _: DNSKEY) -> bool:  # pragma: no cover
+        return False
+
+    def ok_to_create_ds(self, _: DSDigest) -> bool:  # pragma: no cover
+        return False
+
+    def ok_to_validate_ds(self, _: DSDigest) -> bool:  # pragma: no cover
+        return False
+
+
+class SimpleDeny(Policy):
+    def __init__(self, deny_sign, deny_validate, deny_create_ds, deny_validate_ds):
+        super().__init__()
+        self._deny_sign = deny_sign
+        self._deny_validate = deny_validate
+        self._deny_create_ds = deny_create_ds
+        self._deny_validate_ds = deny_validate_ds
+
+    def ok_to_sign(self, key: DNSKEY) -> bool:
+        return key.algorithm not in self._deny_sign
+
+    def ok_to_validate(self, key: DNSKEY) -> bool:
+        return key.algorithm not in self._deny_validate
+
+    def ok_to_create_ds(self, algorithm: DSDigest) -> bool:
+        return algorithm not in self._deny_create_ds
+
+    def ok_to_validate_ds(self, algorithm: DSDigest) -> bool:
+        return algorithm not in self._deny_validate_ds
+
+
+rfc_8624_policy = SimpleDeny(
+    {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1, Algorithm.ECCGOST},
+    {Algorithm.RSAMD5, Algorithm.DSA, Algorithm.DSANSEC3SHA1},
+    {DSDigest.NULL, DSDigest.SHA1, DSDigest.GOST},
+    {DSDigest.NULL},
+)
+
+allow_all_policy = SimpleDeny(set(), set(), set(), set())
+
+
+default_policy = rfc_8624_policy
+
+
+def make_ds(
+    name: Union[dns.name.Name, str],
+    key: dns.rdata.Rdata,
+    algorithm: Union[DSDigest, str],
+    origin: Optional[dns.name.Name] = None,
+    policy: Optional[Policy] = None,
+    validating: bool = False,
+) -> DS:
+    """Create a DS record for a DNSSEC key.
+
+    *name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record.
+
+    *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``,
+    the key the DS is about.
+
+    *algorithm*, a ``str`` or ``int`` specifying the hash algorithm.
+    The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
+    does not matter for these strings.
+
+    *origin*, a ``dns.name.Name`` or ``None``.  If *key* is a relative name,
+    then it will be made absolute using the specified origin.
+
+    *policy*, a ``dns.dnssec.Policy`` or ``None``.  If ``None``, the default policy,
+    ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
+    *validating*, a ``bool``.  If ``True``, then policy is checked in
+    validating mode, i.e. "Is it ok to validate using this digest algorithm?".
+    Otherwise the policy is checked in creating mode, i.e. "Is it ok to create a DS with
+    this digest algorithm?".
+
+    Raises ``UnsupportedAlgorithm`` if the algorithm is unknown.
+
+    Raises ``DeniedByPolicy`` if the algorithm is denied by policy.
+
+    Returns a ``dns.rdtypes.ANY.DS.DS``
+    """
+
+    if policy is None:
+        policy = default_policy
+    try:
+        if isinstance(algorithm, str):
+            algorithm = DSDigest[algorithm.upper()]
+    except Exception:
+        raise UnsupportedAlgorithm(f'unsupported algorithm "{algorithm}"')
+    if validating:
+        check = policy.ok_to_validate_ds
+    else:
+        check = policy.ok_to_create_ds
+    if not check(algorithm):
+        raise DeniedByPolicy
+    if not isinstance(key, (DNSKEY, CDNSKEY)):
+        raise ValueError("key is not a DNSKEY/CDNSKEY")
+    if algorithm == DSDigest.SHA1:
+        dshash = hashlib.sha1()
+    elif algorithm == DSDigest.SHA256:
+        dshash = hashlib.sha256()
+    elif algorithm == DSDigest.SHA384:
+        dshash = hashlib.sha384()
+    else:
+        raise UnsupportedAlgorithm(f'unsupported algorithm "{algorithm}"')
+
+    if isinstance(name, str):
+        name = dns.name.from_text(name, origin)
+    wire = name.canonicalize().to_wire()
+    kwire = key.to_wire(origin=origin)
+    assert wire is not None and kwire is not None  # for mypy
+    dshash.update(wire)
+    dshash.update(kwire)
+    digest = dshash.digest()
+
+    dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, algorithm) + digest
+    ds = dns.rdata.from_wire(
+        dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0, len(dsrdata)
+    )
+    return cast(DS, ds)
+
+
+def make_cds(
+    name: Union[dns.name.Name, str],
+    key: dns.rdata.Rdata,
+    algorithm: Union[DSDigest, str],
+    origin: Optional[dns.name.Name] = None,
+) -> CDS:
+    """Create a CDS record for a DNSSEC key.
+
+    *name*, a ``dns.name.Name`` or ``str``, the owner name of the DS record.
+
+    *key*, a ``dns.rdtypes.ANY.DNSKEY.DNSKEY`` or ``dns.rdtypes.ANY.DNSKEY.CDNSKEY``,
+    the key the DS is about.
+
+    *algorithm*, a ``str`` or ``int`` specifying the hash algorithm.
+    The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
+    does not matter for these strings.
+
+    *origin*, a ``dns.name.Name`` or ``None``.  If *key* is a relative name,
+    then it will be made absolute using the specified origin.
+
+    Raises ``UnsupportedAlgorithm`` if the algorithm is unknown.
+
+    Returns a ``dns.rdtypes.ANY.DS.CDS``
+    """
+
+    ds = make_ds(name, key, algorithm, origin)
+    return CDS(
+        rdclass=ds.rdclass,
+        rdtype=dns.rdatatype.CDS,
+        key_tag=ds.key_tag,
+        algorithm=ds.algorithm,
+        digest_type=ds.digest_type,
+        digest=ds.digest,
+    )
+
+
+def _find_candidate_keys(
+    keys: Dict[dns.name.Name, Union[dns.rdataset.Rdataset, dns.node.Node]], rrsig: RRSIG
+) -> Optional[List[DNSKEY]]:
+    value = keys.get(rrsig.signer)
+    if isinstance(value, dns.node.Node):
+        rdataset = value.get_rdataset(dns.rdataclass.IN, dns.rdatatype.DNSKEY)
+    else:
+        rdataset = value
+    if rdataset is None:
+        return None
+    return [
+        cast(DNSKEY, rd)
+        for rd in rdataset
+        if rd.algorithm == rrsig.algorithm
+        and key_id(rd) == rrsig.key_tag
+        and (rd.flags & Flag.ZONE) == Flag.ZONE  # RFC 4034 2.1.1
+        and rd.protocol == 3  # RFC 4034 2.1.2
+    ]
+
+
+def _get_rrname_rdataset(
+    rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+) -> Tuple[dns.name.Name, dns.rdataset.Rdataset]:
+    if isinstance(rrset, tuple):
+        return rrset[0], rrset[1]
+    else:
+        return rrset.name, rrset
+
+
+def _validate_signature(sig: bytes, data: bytes, key: DNSKEY) -> None:
+    # pylint: disable=possibly-used-before-assignment
+    public_cls = get_algorithm_cls_from_dnskey(key).public_cls
+    try:
+        public_key = public_cls.from_dnskey(key)
+    except ValueError:
+        raise ValidationFailure("invalid public key")
+    public_key.verify(sig, data)
+
+
+def _validate_rrsig(
+    rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+    rrsig: RRSIG,
+    keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
+    origin: Optional[dns.name.Name] = None,
+    now: Optional[float] = None,
+    policy: Optional[Policy] = None,
+) -> None:
+    """Validate an RRset against a single signature rdata, throwing an
+    exception if validation is not successful.
+
+    *rrset*, the RRset to validate.  This can be a
+    ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``)
+    tuple.
+
+    *rrsig*, a ``dns.rdata.Rdata``, the signature to validate.
+
+    *keys*, the key dictionary, used to find the DNSKEY associated
+    with a given name.  The dictionary is keyed by a
+    ``dns.name.Name``, and has ``dns.node.Node`` or
+    ``dns.rdataset.Rdataset`` values.
+
+    *origin*, a ``dns.name.Name`` or ``None``, the origin to use for relative
+    names.
+
+    *now*, a ``float`` or ``None``, the time, in seconds since the epoch, to
+    use as the current time when validating.  If ``None``, the actual current
+    time is used.
+
+    *policy*, a ``dns.dnssec.Policy`` or ``None``.  If ``None``, the default policy,
+    ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
+    Raises ``ValidationFailure`` if the signature is expired, not yet valid,
+    the public key is invalid, the algorithm is unknown, the verification
+    fails, etc.
+
+    Raises ``UnsupportedAlgorithm`` if the algorithm is recognized by
+    dnspython but not implemented.
+    """
+
+    if policy is None:
+        policy = default_policy
+
+    candidate_keys = _find_candidate_keys(keys, rrsig)
+    if candidate_keys is None:
+        raise ValidationFailure("unknown key")
+
+    if now is None:
+        now = time.time()
+    if rrsig.expiration < now:
+        raise ValidationFailure("expired")
+    if rrsig.inception > now:
+        raise ValidationFailure("not yet valid")
+
+    data = _make_rrsig_signature_data(rrset, rrsig, origin)
+
+    # pylint: disable=possibly-used-before-assignment
+    for candidate_key in candidate_keys:
+        if not policy.ok_to_validate(candidate_key):
+            continue
+        try:
+            _validate_signature(rrsig.signature, data, candidate_key)
+            return
+        except (InvalidSignature, ValidationFailure):
+            # this happens on an individual validation failure
+            continue
+    # nothing verified -- raise failure:
+    raise ValidationFailure("verify failure")
+
+
+def _validate(
+    rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+    rrsigset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+    keys: Dict[dns.name.Name, Union[dns.node.Node, dns.rdataset.Rdataset]],
+    origin: Optional[dns.name.Name] = None,
+    now: Optional[float] = None,
+    policy: Optional[Policy] = None,
+) -> None:
+    """Validate an RRset against a signature RRset, throwing an exception
+    if none of the signatures validate.
+
+    *rrset*, the RRset to validate.  This can be a
+    ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``)
+    tuple.
+
+    *rrsigset*, the signature RRset.  This can be a
+    ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``)
+    tuple.
+
+    *keys*, the key dictionary, used to find the DNSKEY associated
+    with a given name.  The dictionary is keyed by a
+    ``dns.name.Name``, and has ``dns.node.Node`` or
+    ``dns.rdataset.Rdataset`` values.
+
+    *origin*, a ``dns.name.Name``, the origin to use for relative names;
+    defaults to None.
+
+    *now*, an ``int`` or ``None``, the time, in seconds since the epoch, to
+    use as the current time when validating.  If ``None``, the actual current
+    time is used.
+
+    *policy*, a ``dns.dnssec.Policy`` or ``None``.  If ``None``, the default policy,
+    ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
+    Raises ``ValidationFailure`` if the signature is expired, not yet valid,
+    the public key is invalid, the algorithm is unknown, the verification
+    fails, etc.
+    """
+
+    if policy is None:
+        policy = default_policy
+
+    if isinstance(origin, str):
+        origin = dns.name.from_text(origin, dns.name.root)
+
+    if isinstance(rrset, tuple):
+        rrname = rrset[0]
+    else:
+        rrname = rrset.name
+
+    if isinstance(rrsigset, tuple):
+        rrsigname = rrsigset[0]
+        rrsigrdataset = rrsigset[1]
+    else:
+        rrsigname = rrsigset.name
+        rrsigrdataset = rrsigset
+
+    rrname = rrname.choose_relativity(origin)
+    rrsigname = rrsigname.choose_relativity(origin)
+    if rrname != rrsigname:
+        raise ValidationFailure("owner names do not match")
+
+    for rrsig in rrsigrdataset:
+        if not isinstance(rrsig, RRSIG):
+            raise ValidationFailure("expected an RRSIG")
+        try:
+            _validate_rrsig(rrset, rrsig, keys, origin, now, policy)
+            return
+        except (ValidationFailure, UnsupportedAlgorithm):
+            pass
+    raise ValidationFailure("no RRSIGs validated")
+
+
+def _sign(
+    rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+    private_key: PrivateKey,
+    signer: dns.name.Name,
+    dnskey: DNSKEY,
+    inception: Optional[Union[datetime, str, int, float]] = None,
+    expiration: Optional[Union[datetime, str, int, float]] = None,
+    lifetime: Optional[int] = None,
+    verify: bool = False,
+    policy: Optional[Policy] = None,
+    origin: Optional[dns.name.Name] = None,
+    deterministic: bool = True,
+) -> RRSIG:
+    """Sign RRset using private key.
+
+    *rrset*, the RRset to validate.  This can be a
+    ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``)
+    tuple.
+
+    *private_key*, the private key to use for signing, a
+    ``cryptography.hazmat.primitives.asymmetric`` private key class applicable
+    for DNSSEC.
+
+    *signer*, a ``dns.name.Name``, the Signer's name.
+
+    *dnskey*, a ``DNSKEY`` matching ``private_key``.
+
+    *inception*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the
+    signature inception time.  If ``None``, the current time is used.  If a ``str``, the
+    format is "YYYYMMDDHHMMSS" or alternatively the number of seconds since the UNIX
+    epoch in text form; this is the same the RRSIG rdata's text form.
+    Values of type `int` or `float` are interpreted as seconds since the UNIX epoch.
+
+    *expiration*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature
+    expiration time.  If ``None``, the expiration time will be the inception time plus
+    the value of the *lifetime* parameter.  See the description of *inception* above
+    for how the various parameter types are interpreted.
+
+    *lifetime*, an ``int`` or ``None``, the signature lifetime in seconds.  This
+    parameter is only meaningful if *expiration* is ``None``.
+
+    *verify*, a ``bool``.  If set to ``True``, the signer will verify signatures
+    after they are created; the default is ``False``.
+
+    *policy*, a ``dns.dnssec.Policy`` or ``None``.  If ``None``, the default policy,
+    ``dns.dnssec.default_policy`` is used; this policy defaults to that of RFC 8624.
+
+    *origin*, a ``dns.name.Name`` or ``None``.  If ``None``, the default, then all
+    names in the rrset (including its owner name) must be absolute; otherwise the
+    specified origin will be used to make names absolute when signing.
+
+    *deterministic*, a ``bool``. If ``True``, the default, use deterministic
+    (reproducible) signatures when supported by the algorithm used for signing.
+    Currently, this only affects ECDSA.
+
+    Raises ``DeniedByPolicy`` if the signature is denied by policy.
+    """
+
+    if policy is None:
+        policy = default_policy
+    if not policy.ok_to_sign(dnskey):
+        raise DeniedByPolicy
+
+    if isinstance(rrset, tuple):
+        rdclass = rrset[1].rdclass
+        rdtype = rrset[1].rdtype
+        rrname = rrset[0]
+        original_ttl = rrset[1].ttl
+    else:
+        rdclass = rrset.rdclass
+        rdtype = rrset.rdtype
+        rrname = rrset.name
+        original_ttl = rrset.ttl
+
+    if inception is not None:
+        rrsig_inception = to_timestamp(inception)
+    else:
+        rrsig_inception = int(time.time())
+
+    if expiration is not None:
+        rrsig_expiration = to_timestamp(expiration)
+    elif lifetime is not None:
+        rrsig_expiration = rrsig_inception + lifetime
+    else:
+        raise ValueError("expiration or lifetime must be specified")
+
+    # Derelativize now because we need a correct labels length for the
+    # rrsig_template.
+    if origin is not None:
+        rrname = rrname.derelativize(origin)
+    labels = len(rrname) - 1
+
+    # Adjust labels appropriately for wildcards.
+    if rrname.is_wild():
+        labels -= 1
+
+    rrsig_template = RRSIG(
+        rdclass=rdclass,
+        rdtype=dns.rdatatype.RRSIG,
+        type_covered=rdtype,
+        algorithm=dnskey.algorithm,
+        labels=labels,
+        original_ttl=original_ttl,
+        expiration=rrsig_expiration,
+        inception=rrsig_inception,
+        key_tag=key_id(dnskey),
+        signer=signer,
+        signature=b"",
+    )
+
+    data = dns.dnssec._make_rrsig_signature_data(rrset, rrsig_template, origin)
+
+    # pylint: disable=possibly-used-before-assignment
+    if isinstance(private_key, GenericPrivateKey):
+        signing_key = private_key
+    else:
+        try:
+            private_cls = get_algorithm_cls_from_dnskey(dnskey)
+            signing_key = private_cls(key=private_key)
+        except UnsupportedAlgorithm:
+            raise TypeError("Unsupported key algorithm")
+
+    signature = signing_key.sign(data, verify, deterministic)
+
+    return cast(RRSIG, rrsig_template.replace(signature=signature))
+
+
+def _make_rrsig_signature_data(
+    rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+    rrsig: RRSIG,
+    origin: Optional[dns.name.Name] = None,
+) -> bytes:
+    """Create signature rdata.
+
+    *rrset*, the RRset to sign/validate.  This can be a
+    ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``)
+    tuple.
+
+    *rrsig*, a ``dns.rdata.Rdata``, the signature to validate, or the
+    signature template used when signing.
+
+    *origin*, a ``dns.name.Name`` or ``None``, the origin to use for relative
+    names.
+
+    Raises ``UnsupportedAlgorithm`` if the algorithm is recognized by
+    dnspython but not implemented.
+    """
+
+    if isinstance(origin, str):
+        origin = dns.name.from_text(origin, dns.name.root)
+
+    signer = rrsig.signer
+    if not signer.is_absolute():
+        if origin is None:
+            raise ValidationFailure("relative RR name without an origin specified")
+        signer = signer.derelativize(origin)
+
+    # For convenience, allow the rrset to be specified as a (name,
+    # rdataset) tuple as well as a proper rrset
+    rrname, rdataset = _get_rrname_rdataset(rrset)
+
+    data = b""
+    wire = rrsig.to_wire(origin=signer)
+    assert wire is not None  # for mypy
+    data += wire[:18]
+    data += rrsig.signer.to_digestable(signer)
+
+    # Derelativize the name before considering labels.
+    if not rrname.is_absolute():
+        if origin is None:
+            raise ValidationFailure("relative RR name without an origin specified")
+        rrname = rrname.derelativize(origin)
+
+    name_len = len(rrname)
+    if rrname.is_wild() and rrsig.labels != name_len - 2:
+        raise ValidationFailure("wild owner name has wrong label length")
+    if name_len - 1 < rrsig.labels:
+        raise ValidationFailure("owner name longer than RRSIG labels")
+    elif rrsig.labels < name_len - 1:
+        suffix = rrname.split(rrsig.labels + 1)[1]
+        rrname = dns.name.from_text("*", suffix)
+    rrnamebuf = rrname.to_digestable()
+    rrfixed = struct.pack("!HHI", rdataset.rdtype, rdataset.rdclass, rrsig.original_ttl)
+    rdatas = [rdata.to_digestable(origin) for rdata in rdataset]
+    for rdata in sorted(rdatas):
+        data += rrnamebuf
+        data += rrfixed
+        rrlen = struct.pack("!H", len(rdata))
+        data += rrlen
+        data += rdata
+
+    return data
+
+
+def _make_dnskey(
+    public_key: PublicKey,
+    algorithm: Union[int, str],
+    flags: int = Flag.ZONE,
+    protocol: int = 3,
+) -> DNSKEY:
+    """Convert a public key to DNSKEY Rdata
+
+    *public_key*, a ``PublicKey`` (``GenericPublicKey`` or
+    ``cryptography.hazmat.primitives.asymmetric``) to convert.
+
+    *algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm.
+
+    *flags*: DNSKEY flags field as an integer.
+
+    *protocol*: DNSKEY protocol field as an integer.
+
+    Raises ``ValueError`` if the specified key algorithm parameters are not
+    unsupported, ``TypeError`` if the key type is unsupported,
+    `UnsupportedAlgorithm` if the algorithm is unknown and
+    `AlgorithmKeyMismatch` if the algorithm does not match the key type.
+
+    Return DNSKEY ``Rdata``.
+    """
+
+    algorithm = Algorithm.make(algorithm)
+
+    # pylint: disable=possibly-used-before-assignment
+    if isinstance(public_key, GenericPublicKey):
+        return public_key.to_dnskey(flags=flags, protocol=protocol)
+    else:
+        public_cls = get_algorithm_cls(algorithm).public_cls
+        return public_cls(key=public_key).to_dnskey(flags=flags, protocol=protocol)
+
+
+def _make_cdnskey(
+    public_key: PublicKey,
+    algorithm: Union[int, str],
+    flags: int = Flag.ZONE,
+    protocol: int = 3,
+) -> CDNSKEY:
+    """Convert a public key to CDNSKEY Rdata
+
+    *public_key*, the public key to convert, a
+    ``cryptography.hazmat.primitives.asymmetric`` public key class applicable
+    for DNSSEC.
+
+    *algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm.
+
+    *flags*: DNSKEY flags field as an integer.
+
+    *protocol*: DNSKEY protocol field as an integer.
+
+    Raises ``ValueError`` if the specified key algorithm parameters are not
+    unsupported, ``TypeError`` if the key type is unsupported,
+    `UnsupportedAlgorithm` if the algorithm is unknown and
+    `AlgorithmKeyMismatch` if the algorithm does not match the key type.
+
+    Return CDNSKEY ``Rdata``.
+    """
+
+    dnskey = _make_dnskey(public_key, algorithm, flags, protocol)
+
+    return CDNSKEY(
+        rdclass=dnskey.rdclass,
+        rdtype=dns.rdatatype.CDNSKEY,
+        flags=dnskey.flags,
+        protocol=dnskey.protocol,
+        algorithm=dnskey.algorithm,
+        key=dnskey.key,
+    )
+
+
+def nsec3_hash(
+    domain: Union[dns.name.Name, str],
+    salt: Optional[Union[str, bytes]],
+    iterations: int,
+    algorithm: Union[int, str],
+) -> str:
+    """
+    Calculate the NSEC3 hash, according to
+    https://tools.ietf.org/html/rfc5155#section-5
+
+    *domain*, a ``dns.name.Name`` or ``str``, the name to hash.
+
+    *salt*, a ``str``, ``bytes``, or ``None``, the hash salt.  If a
+    string, it is decoded as a hex string.
+
+    *iterations*, an ``int``, the number of iterations.
+
+    *algorithm*, a ``str`` or ``int``, the hash algorithm.
+    The only defined algorithm is SHA1.
+
+    Returns a ``str``, the encoded NSEC3 hash.
+    """
+
+    b32_conversion = str.maketrans(
+        "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567", "0123456789ABCDEFGHIJKLMNOPQRSTUV"
+    )
+
+    try:
+        if isinstance(algorithm, str):
+            algorithm = NSEC3Hash[algorithm.upper()]
+    except Exception:
+        raise ValueError("Wrong hash algorithm (only SHA1 is supported)")
+
+    if algorithm != NSEC3Hash.SHA1:
+        raise ValueError("Wrong hash algorithm (only SHA1 is supported)")
+
+    if salt is None:
+        salt_encoded = b""
+    elif isinstance(salt, str):
+        if len(salt) % 2 == 0:
+            salt_encoded = bytes.fromhex(salt)
+        else:
+            raise ValueError("Invalid salt length")
+    else:
+        salt_encoded = salt
+
+    if not isinstance(domain, dns.name.Name):
+        domain = dns.name.from_text(domain)
+    domain_encoded = domain.canonicalize().to_wire()
+    assert domain_encoded is not None
+
+    digest = hashlib.sha1(domain_encoded + salt_encoded).digest()
+    for _ in range(iterations):
+        digest = hashlib.sha1(digest + salt_encoded).digest()
+
+    output = base64.b32encode(digest).decode("utf-8")
+    output = output.translate(b32_conversion)
+
+    return output
+
+
+def make_ds_rdataset(
+    rrset: Union[dns.rrset.RRset, Tuple[dns.name.Name, dns.rdataset.Rdataset]],
+    algorithms: Set[Union[DSDigest, str]],
+    origin: Optional[dns.name.Name] = None,
+) -> dns.rdataset.Rdataset:
+    """Create a DS record from DNSKEY/CDNSKEY/CDS.
+
+    *rrset*, the RRset to create DS Rdataset for.  This can be a
+    ``dns.rrset.RRset`` or a (``dns.name.Name``, ``dns.rdataset.Rdataset``)
+    tuple.
+
+    *algorithms*, a set of ``str`` or ``int`` specifying the hash algorithms.
+    The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
+    does not matter for these strings. If the RRset is a CDS, only digest
+    algorithms matching algorithms are accepted.
+
+    *origin*, a ``dns.name.Name`` or ``None``.  If `key` is a relative name,
+    then it will be made absolute using the specified origin.
+
+    Raises ``UnsupportedAlgorithm`` if any of the algorithms are unknown and
+    ``ValueError`` if the given RRset is not usable.
+
+    Returns a ``dns.rdataset.Rdataset``
+    """
+
+    rrname, rdataset = _get_rrname_rdataset(rrset)
+
+    if rdataset.rdtype not in (
+        dns.rdatatype.DNSKEY,
+        dns.rdatatype.CDNSKEY,
+        dns.rdatatype.CDS,
+    ):
+        raise ValueError("rrset not a DNSKEY/CDNSKEY/CDS")
+
+    _algorithms = set()
+    for algorithm in algorithms:
+        try:
+            if isinstance(algorithm, str):
+                algorithm = DSDigest[algorithm.upper()]
+        except Exception:
+            raise UnsupportedAlgorithm(f'unsupported algorithm "{algorithm}"')
+        _algorithms.add(algorithm)
+
+    if rdataset.rdtype == dns.rdatatype.CDS:
+        res = []
+        for rdata in cds_rdataset_to_ds_rdataset(rdataset):
+            if rdata.digest_type in _algorithms:
+                res.append(rdata)
+        if len(res) == 0:
+            raise ValueError("no acceptable CDS rdata found")
+        return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+    res = []
+    for algorithm in _algorithms:
+        res.extend(dnskey_rdataset_to_cds_rdataset(rrname, rdataset, algorithm, origin))
+    return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+
+def cds_rdataset_to_ds_rdataset(
+    rdataset: dns.rdataset.Rdataset,
+) -> dns.rdataset.Rdataset:
+    """Create a CDS record from DS.
+
+    *rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for.
+
+    Raises ``ValueError`` if the rdataset is not CDS.
+
+    Returns a ``dns.rdataset.Rdataset``
+    """
+
+    if rdataset.rdtype != dns.rdatatype.CDS:
+        raise ValueError("rdataset not a CDS")
+    res = []
+    for rdata in rdataset:
+        res.append(
+            CDS(
+                rdclass=rdata.rdclass,
+                rdtype=dns.rdatatype.DS,
+                key_tag=rdata.key_tag,
+                algorithm=rdata.algorithm,
+                digest_type=rdata.digest_type,
+                digest=rdata.digest,
+            )
+        )
+    return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+
+def dnskey_rdataset_to_cds_rdataset(
+    name: Union[dns.name.Name, str],
+    rdataset: dns.rdataset.Rdataset,
+    algorithm: Union[DSDigest, str],
+    origin: Optional[dns.name.Name] = None,
+) -> dns.rdataset.Rdataset:
+    """Create a CDS record from DNSKEY/CDNSKEY.
+
+    *name*, a ``dns.name.Name`` or ``str``, the owner name of the CDS record.
+
+    *rdataset*, a ``dns.rdataset.Rdataset``, to create DS Rdataset for.
+
+    *algorithm*, a ``str`` or ``int`` specifying the hash algorithm.
+    The currently supported hashes are "SHA1", "SHA256", and "SHA384". Case
+    does not matter for these strings.
+
+    *origin*, a ``dns.name.Name`` or ``None``.  If `key` is a relative name,
+    then it will be made absolute using the specified origin.
+
+    Raises ``UnsupportedAlgorithm`` if the algorithm is unknown or
+    ``ValueError`` if the rdataset is not DNSKEY/CDNSKEY.
+
+    Returns a ``dns.rdataset.Rdataset``
+    """
+
+    if rdataset.rdtype not in (dns.rdatatype.DNSKEY, dns.rdatatype.CDNSKEY):
+        raise ValueError("rdataset not a DNSKEY/CDNSKEY")
+    res = []
+    for rdata in rdataset:
+        res.append(make_cds(name, rdata, algorithm, origin))
+    return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+
+def dnskey_rdataset_to_cdnskey_rdataset(
+    rdataset: dns.rdataset.Rdataset,
+) -> dns.rdataset.Rdataset:
+    """Create a CDNSKEY record from DNSKEY.
+
+    *rdataset*, a ``dns.rdataset.Rdataset``, to create CDNSKEY Rdataset for.
+
+    Returns a ``dns.rdataset.Rdataset``
+    """
+
+    if rdataset.rdtype != dns.rdatatype.DNSKEY:
+        raise ValueError("rdataset not a DNSKEY")
+    res = []
+    for rdata in rdataset:
+        res.append(
+            CDNSKEY(
+                rdclass=rdataset.rdclass,
+                rdtype=rdataset.rdtype,
+                flags=rdata.flags,
+                protocol=rdata.protocol,
+                algorithm=rdata.algorithm,
+                key=rdata.key,
+            )
+        )
+    return dns.rdataset.from_rdata_list(rdataset.ttl, res)
+
+
+def default_rrset_signer(
+    txn: dns.transaction.Transaction,
+    rrset: dns.rrset.RRset,
+    signer: dns.name.Name,
+    ksks: List[Tuple[PrivateKey, DNSKEY]],
+    zsks: List[Tuple[PrivateKey, DNSKEY]],
+    inception: Optional[Union[datetime, str, int, float]] = None,
+    expiration: Optional[Union[datetime, str, int, float]] = None,
+    lifetime: Optional[int] = None,
+    policy: Optional[Policy] = None,
+    origin: Optional[dns.name.Name] = None,
+    deterministic: bool = True,
+) -> None:
+    """Default RRset signer"""
+
+    if rrset.rdtype in set(
+        [
+            dns.rdatatype.RdataType.DNSKEY,
+            dns.rdatatype.RdataType.CDS,
+            dns.rdatatype.RdataType.CDNSKEY,
+        ]
+    ):
+        keys = ksks
+    else:
+        keys = zsks
+
+    for private_key, dnskey in keys:
+        rrsig = dns.dnssec.sign(
+            rrset=rrset,
+            private_key=private_key,
+            dnskey=dnskey,
+            inception=inception,
+            expiration=expiration,
+            lifetime=lifetime,
+            signer=signer,
+            policy=policy,
+            origin=origin,
+            deterministic=deterministic,
+        )
+        txn.add(rrset.name, rrset.ttl, rrsig)
+
+
+def sign_zone(
+    zone: dns.zone.Zone,
+    txn: Optional[dns.transaction.Transaction] = None,
+    keys: Optional[List[Tuple[PrivateKey, DNSKEY]]] = None,
+    add_dnskey: bool = True,
+    dnskey_ttl: Optional[int] = None,
+    inception: Optional[Union[datetime, str, int, float]] = None,
+    expiration: Optional[Union[datetime, str, int, float]] = None,
+    lifetime: Optional[int] = None,
+    nsec3: Optional[NSEC3PARAM] = None,
+    rrset_signer: Optional[RRsetSigner] = None,
+    policy: Optional[Policy] = None,
+    deterministic: bool = True,
+) -> None:
+    """Sign zone.
+
+    *zone*, a ``dns.zone.Zone``, the zone to sign.
+
+    *txn*, a ``dns.transaction.Transaction``, an optional transaction to use for
+    signing.
+
+    *keys*, a list of (``PrivateKey``, ``DNSKEY``) tuples, to use for signing. KSK/ZSK
+    roles are assigned automatically if the SEP flag is used, otherwise all RRsets are
+    signed by all keys.
+
+    *add_dnskey*, a ``bool``.  If ``True``, the default, all specified DNSKEYs are
+    automatically added to the zone on signing.
+
+    *dnskey_ttl*, a``int``, specifies the TTL for DNSKEY RRs. If not specified the TTL
+    of the existing DNSKEY RRset used or the TTL of the SOA RRset.
+
+    *inception*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature
+    inception time.  If ``None``, the current time is used.  If a ``str``, the format is
+    "YYYYMMDDHHMMSS" or alternatively the number of seconds since the UNIX epoch in text
+    form; this is the same the RRSIG rdata's text form. Values of type `int` or `float`
+    are interpreted as seconds since the UNIX epoch.
+
+    *expiration*, a ``datetime``, ``str``, ``int``, ``float`` or ``None``, the signature
+    expiration time.  If ``None``, the expiration time will be the inception time plus
+    the value of the *lifetime* parameter.  See the description of *inception* above for
+    how the various parameter types are interpreted.
+
+    *lifetime*, an ``int`` or ``None``, the signature lifetime in seconds.  This
+    parameter is only meaningful if *expiration* is ``None``.
+
+    *nsec3*, a ``NSEC3PARAM`` Rdata, configures signing using NSEC3. Not yet
+    implemented.
+
+    *rrset_signer*, a ``Callable``, an optional function for signing RRsets. The
+    function requires two arguments: transaction and RRset. If the not specified,
+    ``dns.dnssec.default_rrset_signer`` will be used.
+
+    *deterministic*, a ``bool``. If ``True``, the default, use deterministic
+    (reproducible) signatures when supported by the algorithm used for signing.
+    Currently, this only affects ECDSA.
+
+    Returns ``None``.
+    """
+
+    ksks = []
+    zsks = []
+
+    # if we have both KSKs and ZSKs, split by SEP flag. if not, sign all
+    # records with all keys
+    if keys:
+        for key in keys:
+            if key[1].flags & Flag.SEP:
+                ksks.append(key)
+            else:
+                zsks.append(key)
+        if not ksks:
+            ksks = keys
+        if not zsks:
+            zsks = keys
+    else:
+        keys = []
+
+    if txn:
+        cm: contextlib.AbstractContextManager = contextlib.nullcontext(txn)
+    else:
+        cm = zone.writer()
+
+    if zone.origin is None:
+        raise ValueError("no zone origin")
+
+    with cm as _txn:
+        if add_dnskey:
+            if dnskey_ttl is None:
+                dnskey = _txn.get(zone.origin, dns.rdatatype.DNSKEY)
+                if dnskey:
+                    dnskey_ttl = dnskey.ttl
+                else:
+                    soa = _txn.get(zone.origin, dns.rdatatype.SOA)
+                    dnskey_ttl = soa.ttl
+            for _, dnskey in keys:
+                _txn.add(zone.origin, dnskey_ttl, dnskey)
+
+        if nsec3:
+            raise NotImplementedError("Signing with NSEC3 not yet implemented")
+        else:
+            _rrset_signer = rrset_signer or functools.partial(
+                default_rrset_signer,
+                signer=zone.origin,
+                ksks=ksks,
+                zsks=zsks,
+                inception=inception,
+                expiration=expiration,
+                lifetime=lifetime,
+                policy=policy,
+                origin=zone.origin,
+                deterministic=deterministic,
+            )
+            return _sign_zone_nsec(zone, _txn, _rrset_signer)
+
+
+def _sign_zone_nsec(
+    zone: dns.zone.Zone,
+    txn: dns.transaction.Transaction,
+    rrset_signer: Optional[RRsetSigner] = None,
+) -> None:
+    """NSEC zone signer"""
+
+    def _txn_add_nsec(
+        txn: dns.transaction.Transaction,
+        name: dns.name.Name,
+        next_secure: Optional[dns.name.Name],
+        rdclass: dns.rdataclass.RdataClass,
+        ttl: int,
+        rrset_signer: Optional[RRsetSigner] = None,
+    ) -> None:
+        """NSEC zone signer helper"""
+        mandatory_types = set(
+            [dns.rdatatype.RdataType.RRSIG, dns.rdatatype.RdataType.NSEC]
+        )
+        node = txn.get_node(name)
+        if node and next_secure:
+            types = (
+                set([rdataset.rdtype for rdataset in node.rdatasets]) | mandatory_types
+            )
+            windows = Bitmap.from_rdtypes(list(types))
+            rrset = dns.rrset.from_rdata(
+                name,
+                ttl,
+                NSEC(
+                    rdclass=rdclass,
+                    rdtype=dns.rdatatype.RdataType.NSEC,
+                    next=next_secure,
+                    windows=windows,
+                ),
+            )
+            txn.add(rrset)
+            if rrset_signer:
+                rrset_signer(txn, rrset)
+
+    rrsig_ttl = zone.get_soa().minimum
+    delegation = None
+    last_secure = None
+
+    for name in sorted(txn.iterate_names()):
+        if delegation and name.is_subdomain(delegation):
+            # names below delegations are not secure
+            continue
+        elif txn.get(name, dns.rdatatype.NS) and name != zone.origin:
+            # inside delegation
+            delegation = name
+        else:
+            # outside delegation
+            delegation = None
+
+        if rrset_signer:
+            node = txn.get_node(name)
+            if node:
+                for rdataset in node.rdatasets:
+                    if rdataset.rdtype == dns.rdatatype.RRSIG:
+                        # do not sign RRSIGs
+                        continue
+                    elif delegation and rdataset.rdtype != dns.rdatatype.DS:
+                        # do not sign delegations except DS records
+                        continue
+                    else:
+                        rrset = dns.rrset.from_rdata(name, rdataset.ttl, *rdataset)
+                        rrset_signer(txn, rrset)
+
+        # We need "is not None" as the empty name is False because its length is 0.
+        if last_secure is not None:
+            _txn_add_nsec(txn, last_secure, name, zone.rdclass, rrsig_ttl, rrset_signer)
+        last_secure = name
+
+    if last_secure:
+        _txn_add_nsec(
+            txn, last_secure, zone.origin, zone.rdclass, rrsig_ttl, rrset_signer
+        )
+
+
+def _need_pyca(*args, **kwargs):
+    raise ImportError(
+        "DNSSEC validation requires python cryptography"
+    )  # pragma: no cover
+
+
+if dns._features.have("dnssec"):
+    from cryptography.exceptions import InvalidSignature
+    from cryptography.hazmat.primitives.asymmetric import dsa  # pylint: disable=W0611
+    from cryptography.hazmat.primitives.asymmetric import ec  # pylint: disable=W0611
+    from cryptography.hazmat.primitives.asymmetric import ed448  # pylint: disable=W0611
+    from cryptography.hazmat.primitives.asymmetric import rsa  # pylint: disable=W0611
+    from cryptography.hazmat.primitives.asymmetric import (  # pylint: disable=W0611
+        ed25519,
+    )
+
+    from dns.dnssecalgs import (  # pylint: disable=C0412
+        get_algorithm_cls,
+        get_algorithm_cls_from_dnskey,
+    )
+    from dns.dnssecalgs.base import GenericPrivateKey, GenericPublicKey
+
+    validate = _validate  # type: ignore
+    validate_rrsig = _validate_rrsig  # type: ignore
+    sign = _sign
+    make_dnskey = _make_dnskey
+    make_cdnskey = _make_cdnskey
+    _have_pyca = True
+else:  # pragma: no cover
+    validate = _need_pyca
+    validate_rrsig = _need_pyca
+    sign = _need_pyca
+    make_dnskey = _need_pyca
+    make_cdnskey = _need_pyca
+    _have_pyca = False
+
+### BEGIN generated Algorithm constants
+
+RSAMD5 = Algorithm.RSAMD5
+DH = Algorithm.DH
+DSA = Algorithm.DSA
+ECC = Algorithm.ECC
+RSASHA1 = Algorithm.RSASHA1
+DSANSEC3SHA1 = Algorithm.DSANSEC3SHA1
+RSASHA1NSEC3SHA1 = Algorithm.RSASHA1NSEC3SHA1
+RSASHA256 = Algorithm.RSASHA256
+RSASHA512 = Algorithm.RSASHA512
+ECCGOST = Algorithm.ECCGOST
+ECDSAP256SHA256 = Algorithm.ECDSAP256SHA256
+ECDSAP384SHA384 = Algorithm.ECDSAP384SHA384
+ED25519 = Algorithm.ED25519
+ED448 = Algorithm.ED448
+INDIRECT = Algorithm.INDIRECT
+PRIVATEDNS = Algorithm.PRIVATEDNS
+PRIVATEOID = Algorithm.PRIVATEOID
+
+### END generated Algorithm constants