diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/rsa')
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/__init__.py | 60 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/asn1.py | 52 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/cli.py | 321 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/common.py | 184 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/core.py | 53 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/key.py | 858 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/parallel.py | 96 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/pem.py | 134 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/pkcs1.py | 485 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/pkcs1_v2.py | 100 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/prime.py | 198 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/py.typed | 1 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/randnum.py | 95 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/transform.py | 72 | ||||
-rw-r--r-- | .venv/lib/python3.12/site-packages/rsa/util.py | 97 |
15 files changed, 2806 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/rsa/__init__.py b/.venv/lib/python3.12/site-packages/rsa/__init__.py new file mode 100644 index 00000000..d0185fe9 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/__init__.py @@ -0,0 +1,60 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""RSA module + +Module for calculating large primes, and RSA encryption, decryption, signing +and verification. Includes generating public and private keys. + +WARNING: this implementation does not use compression of the cleartext input to +prevent repetitions, or other common security improvements. Use with care. + +""" + +from rsa.key import newkeys, PrivateKey, PublicKey +from rsa.pkcs1 import ( + encrypt, + decrypt, + sign, + verify, + DecryptionError, + VerificationError, + find_signature_hash, + sign_hash, + compute_hash, +) + +__author__ = "Sybren Stuvel, Barry Mead and Yesudeep Mangalapilly" +__date__ = "2022-07-20" +__version__ = "4.9" + +# Do doctest if we're run directly +if __name__ == "__main__": + import doctest + + doctest.testmod() + +__all__ = [ + "newkeys", + "encrypt", + "decrypt", + "sign", + "verify", + "PublicKey", + "PrivateKey", + "DecryptionError", + "VerificationError", + "find_signature_hash", + "compute_hash", + "sign_hash", +] diff --git a/.venv/lib/python3.12/site-packages/rsa/asn1.py b/.venv/lib/python3.12/site-packages/rsa/asn1.py new file mode 100644 index 00000000..b91806fb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/asn1.py @@ -0,0 +1,52 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ASN.1 definitions. + +Not all ASN.1-handling code use these definitions, but when it does, they should be here. +""" + +from pyasn1.type import univ, namedtype, tag + + +class PubKeyHeader(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType("oid", univ.ObjectIdentifier()), + namedtype.NamedType("parameters", univ.Null()), + ) + + +class OpenSSLPubKey(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType("header", PubKeyHeader()), + # This little hack (the implicit tag) allows us to get a Bit String as Octet String + namedtype.NamedType( + "key", + univ.OctetString().subtype(implicitTag=tag.Tag(tagClass=0, tagFormat=0, tagId=3)), + ), + ) + + +class AsnPubKey(univ.Sequence): + """ASN.1 contents of DER encoded public key: + + RSAPublicKey ::= SEQUENCE { + modulus INTEGER, -- n + publicExponent INTEGER, -- e + """ + + componentType = namedtype.NamedTypes( + namedtype.NamedType("modulus", univ.Integer()), + namedtype.NamedType("publicExponent", univ.Integer()), + ) diff --git a/.venv/lib/python3.12/site-packages/rsa/cli.py b/.venv/lib/python3.12/site-packages/rsa/cli.py new file mode 100644 index 00000000..5a6b6506 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/cli.py @@ -0,0 +1,321 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Commandline scripts. + +These scripts are called by the executables defined in setup.py. +""" + +import abc +import sys +import typing +import optparse + +import rsa +import rsa.key +import rsa.pkcs1 + +HASH_METHODS = sorted(rsa.pkcs1.HASH_METHODS.keys()) +Indexable = typing.Union[typing.Tuple, typing.List[str]] + + +def keygen() -> None: + """Key generator.""" + + # Parse the CLI options + parser = optparse.OptionParser( + usage="usage: %prog [options] keysize", + description='Generates a new RSA key pair of "keysize" bits.', + ) + + parser.add_option( + "--pubout", + type="string", + help="Output filename for the public key. The public key is " + "not saved if this option is not present. You can use " + "pyrsa-priv2pub to create the public key file later.", + ) + + parser.add_option( + "-o", + "--out", + type="string", + help="Output filename for the private key. The key is " + "written to stdout if this option is not present.", + ) + + parser.add_option( + "--form", + help="key format of the private and public keys - default PEM", + choices=("PEM", "DER"), + default="PEM", + ) + + (cli, cli_args) = parser.parse_args(sys.argv[1:]) + + if len(cli_args) != 1: + parser.print_help() + raise SystemExit(1) + + try: + keysize = int(cli_args[0]) + except ValueError as ex: + parser.print_help() + print("Not a valid number: %s" % cli_args[0], file=sys.stderr) + raise SystemExit(1) from ex + + print("Generating %i-bit key" % keysize, file=sys.stderr) + (pub_key, priv_key) = rsa.newkeys(keysize) + + # Save public key + if cli.pubout: + print("Writing public key to %s" % cli.pubout, file=sys.stderr) + data = pub_key.save_pkcs1(format=cli.form) + with open(cli.pubout, "wb") as outfile: + outfile.write(data) + + # Save private key + data = priv_key.save_pkcs1(format=cli.form) + + if cli.out: + print("Writing private key to %s" % cli.out, file=sys.stderr) + with open(cli.out, "wb") as outfile: + outfile.write(data) + else: + print("Writing private key to stdout", file=sys.stderr) + sys.stdout.buffer.write(data) + + +class CryptoOperation(metaclass=abc.ABCMeta): + """CLI callable that operates with input, output, and a key.""" + + keyname = "public" # or 'private' + usage = "usage: %%prog [options] %(keyname)s_key" + description = "" + operation = "decrypt" + operation_past = "decrypted" + operation_progressive = "decrypting" + input_help = "Name of the file to %(operation)s. Reads from stdin if " "not specified." + output_help = ( + "Name of the file to write the %(operation_past)s file " + "to. Written to stdout if this option is not present." + ) + expected_cli_args = 1 + has_output = True + + key_class = rsa.PublicKey # type: typing.Type[rsa.key.AbstractKey] + + def __init__(self) -> None: + self.usage = self.usage % self.__class__.__dict__ + self.input_help = self.input_help % self.__class__.__dict__ + self.output_help = self.output_help % self.__class__.__dict__ + + @abc.abstractmethod + def perform_operation( + self, indata: bytes, key: rsa.key.AbstractKey, cli_args: Indexable + ) -> typing.Any: + """Performs the program's operation. + + Implement in a subclass. + + :returns: the data to write to the output. + """ + + def __call__(self) -> None: + """Runs the program.""" + + (cli, cli_args) = self.parse_cli() + + key = self.read_key(cli_args[0], cli.keyform) + + indata = self.read_infile(cli.input) + + print(self.operation_progressive.title(), file=sys.stderr) + outdata = self.perform_operation(indata, key, cli_args) + + if self.has_output: + self.write_outfile(outdata, cli.output) + + def parse_cli(self) -> typing.Tuple[optparse.Values, typing.List[str]]: + """Parse the CLI options + + :returns: (cli_opts, cli_args) + """ + + parser = optparse.OptionParser(usage=self.usage, description=self.description) + + parser.add_option("-i", "--input", type="string", help=self.input_help) + + if self.has_output: + parser.add_option("-o", "--output", type="string", help=self.output_help) + + parser.add_option( + "--keyform", + help="Key format of the %s key - default PEM" % self.keyname, + choices=("PEM", "DER"), + default="PEM", + ) + + (cli, cli_args) = parser.parse_args(sys.argv[1:]) + + if len(cli_args) != self.expected_cli_args: + parser.print_help() + raise SystemExit(1) + + return cli, cli_args + + def read_key(self, filename: str, keyform: str) -> rsa.key.AbstractKey: + """Reads a public or private key.""" + + print("Reading %s key from %s" % (self.keyname, filename), file=sys.stderr) + with open(filename, "rb") as keyfile: + keydata = keyfile.read() + + return self.key_class.load_pkcs1(keydata, keyform) + + def read_infile(self, inname: str) -> bytes: + """Read the input file""" + + if inname: + print("Reading input from %s" % inname, file=sys.stderr) + with open(inname, "rb") as infile: + return infile.read() + + print("Reading input from stdin", file=sys.stderr) + return sys.stdin.buffer.read() + + def write_outfile(self, outdata: bytes, outname: str) -> None: + """Write the output file""" + + if outname: + print("Writing output to %s" % outname, file=sys.stderr) + with open(outname, "wb") as outfile: + outfile.write(outdata) + else: + print("Writing output to stdout", file=sys.stderr) + sys.stdout.buffer.write(outdata) + + +class EncryptOperation(CryptoOperation): + """Encrypts a file.""" + + keyname = "public" + description = ( + "Encrypts a file. The file must be shorter than the key " "length in order to be encrypted." + ) + operation = "encrypt" + operation_past = "encrypted" + operation_progressive = "encrypting" + + def perform_operation( + self, indata: bytes, pub_key: rsa.key.AbstractKey, cli_args: Indexable = () + ) -> bytes: + """Encrypts files.""" + assert isinstance(pub_key, rsa.key.PublicKey) + return rsa.encrypt(indata, pub_key) + + +class DecryptOperation(CryptoOperation): + """Decrypts a file.""" + + keyname = "private" + description = ( + "Decrypts a file. The original file must be shorter than " + "the key length in order to have been encrypted." + ) + operation = "decrypt" + operation_past = "decrypted" + operation_progressive = "decrypting" + key_class = rsa.PrivateKey + + def perform_operation( + self, indata: bytes, priv_key: rsa.key.AbstractKey, cli_args: Indexable = () + ) -> bytes: + """Decrypts files.""" + assert isinstance(priv_key, rsa.key.PrivateKey) + return rsa.decrypt(indata, priv_key) + + +class SignOperation(CryptoOperation): + """Signs a file.""" + + keyname = "private" + usage = "usage: %%prog [options] private_key hash_method" + description = ( + "Signs a file, outputs the signature. Choose the hash " + "method from %s" % ", ".join(HASH_METHODS) + ) + operation = "sign" + operation_past = "signature" + operation_progressive = "Signing" + key_class = rsa.PrivateKey + expected_cli_args = 2 + + output_help = ( + "Name of the file to write the signature to. Written " + "to stdout if this option is not present." + ) + + def perform_operation( + self, indata: bytes, priv_key: rsa.key.AbstractKey, cli_args: Indexable + ) -> bytes: + """Signs files.""" + assert isinstance(priv_key, rsa.key.PrivateKey) + + hash_method = cli_args[1] + if hash_method not in HASH_METHODS: + raise SystemExit("Invalid hash method, choose one of %s" % ", ".join(HASH_METHODS)) + + return rsa.sign(indata, priv_key, hash_method) + + +class VerifyOperation(CryptoOperation): + """Verify a signature.""" + + keyname = "public" + usage = "usage: %%prog [options] public_key signature_file" + description = ( + "Verifies a signature, exits with status 0 upon success, " + "prints an error message and exits with status 1 upon error." + ) + operation = "verify" + operation_past = "verified" + operation_progressive = "Verifying" + key_class = rsa.PublicKey + expected_cli_args = 2 + has_output = False + + def perform_operation( + self, indata: bytes, pub_key: rsa.key.AbstractKey, cli_args: Indexable + ) -> None: + """Verifies files.""" + assert isinstance(pub_key, rsa.key.PublicKey) + + signature_file = cli_args[1] + + with open(signature_file, "rb") as sigfile: + signature = sigfile.read() + + try: + rsa.verify(indata, signature, pub_key) + except rsa.VerificationError as ex: + raise SystemExit("Verification failed.") from ex + + print("Verification OK", file=sys.stderr) + + +encrypt = EncryptOperation() +decrypt = DecryptOperation() +sign = SignOperation() +verify = VerifyOperation() diff --git a/.venv/lib/python3.12/site-packages/rsa/common.py b/.venv/lib/python3.12/site-packages/rsa/common.py new file mode 100644 index 00000000..520dfec6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/common.py @@ -0,0 +1,184 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Common functionality shared by several modules.""" + +import typing + + +class NotRelativePrimeError(ValueError): + def __init__(self, a: int, b: int, d: int, msg: str = "") -> None: + super().__init__(msg or "%d and %d are not relatively prime, divider=%i" % (a, b, d)) + self.a = a + self.b = b + self.d = d + + +def bit_size(num: int) -> int: + """ + Number of bits needed to represent a integer excluding any prefix + 0 bits. + + Usage:: + + >>> bit_size(1023) + 10 + >>> bit_size(1024) + 11 + >>> bit_size(1025) + 11 + + :param num: + Integer value. If num is 0, returns 0. Only the absolute value of the + number is considered. Therefore, signed integers will be abs(num) + before the number's bit length is determined. + :returns: + Returns the number of bits in the integer. + """ + + try: + return num.bit_length() + except AttributeError as ex: + raise TypeError("bit_size(num) only supports integers, not %r" % type(num)) from ex + + +def byte_size(number: int) -> int: + """ + Returns the number of bytes required to hold a specific long number. + + The number of bytes is rounded up. + + Usage:: + + >>> byte_size(1 << 1023) + 128 + >>> byte_size((1 << 1024) - 1) + 128 + >>> byte_size(1 << 1024) + 129 + + :param number: + An unsigned integer + :returns: + The number of bytes required to hold a specific long number. + """ + if number == 0: + return 1 + return ceil_div(bit_size(number), 8) + + +def ceil_div(num: int, div: int) -> int: + """ + Returns the ceiling function of a division between `num` and `div`. + + Usage:: + + >>> ceil_div(100, 7) + 15 + >>> ceil_div(100, 10) + 10 + >>> ceil_div(1, 4) + 1 + + :param num: Division's numerator, a number + :param div: Division's divisor, a number + + :return: Rounded up result of the division between the parameters. + """ + quanta, mod = divmod(num, div) + if mod: + quanta += 1 + return quanta + + +def extended_gcd(a: int, b: int) -> typing.Tuple[int, int, int]: + """Returns a tuple (r, i, j) such that r = gcd(a, b) = ia + jb""" + # r = gcd(a,b) i = multiplicitive inverse of a mod b + # or j = multiplicitive inverse of b mod a + # Neg return values for i or j are made positive mod b or a respectively + # Iterateive Version is faster and uses much less stack space + x = 0 + y = 1 + lx = 1 + ly = 0 + oa = a # Remember original a/b to remove + ob = b # negative values from return results + while b != 0: + q = a // b + (a, b) = (b, a % b) + (x, lx) = ((lx - (q * x)), x) + (y, ly) = ((ly - (q * y)), y) + if lx < 0: + lx += ob # If neg wrap modulo original b + if ly < 0: + ly += oa # If neg wrap modulo original a + return a, lx, ly # Return only positive values + + +def inverse(x: int, n: int) -> int: + """Returns the inverse of x % n under multiplication, a.k.a x^-1 (mod n) + + >>> inverse(7, 4) + 3 + >>> (inverse(143, 4) * 143) % 4 + 1 + """ + + (divider, inv, _) = extended_gcd(x, n) + + if divider != 1: + raise NotRelativePrimeError(x, n, divider) + + return inv + + +def crt(a_values: typing.Iterable[int], modulo_values: typing.Iterable[int]) -> int: + """Chinese Remainder Theorem. + + Calculates x such that x = a[i] (mod m[i]) for each i. + + :param a_values: the a-values of the above equation + :param modulo_values: the m-values of the above equation + :returns: x such that x = a[i] (mod m[i]) for each i + + + >>> crt([2, 3], [3, 5]) + 8 + + >>> crt([2, 3, 2], [3, 5, 7]) + 23 + + >>> crt([2, 3, 0], [7, 11, 15]) + 135 + """ + + m = 1 + x = 0 + + for modulo in modulo_values: + m *= modulo + + for (m_i, a_i) in zip(modulo_values, a_values): + M_i = m // m_i + inv = inverse(M_i, m_i) + + x = (x + a_i * M_i * inv) % m + + return x + + +if __name__ == "__main__": + import doctest + + doctest.testmod() diff --git a/.venv/lib/python3.12/site-packages/rsa/core.py b/.venv/lib/python3.12/site-packages/rsa/core.py new file mode 100644 index 00000000..729123b6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/core.py @@ -0,0 +1,53 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Core mathematical operations. + +This is the actual core RSA implementation, which is only defined +mathematically on integers. +""" + + +def assert_int(var: int, name: str) -> None: + if isinstance(var, int): + return + + raise TypeError("%s should be an integer, not %s" % (name, var.__class__)) + + +def encrypt_int(message: int, ekey: int, n: int) -> int: + """Encrypts a message using encryption key 'ekey', working modulo n""" + + assert_int(message, "message") + assert_int(ekey, "ekey") + assert_int(n, "n") + + if message < 0: + raise ValueError("Only non-negative numbers are supported") + + if message > n: + raise OverflowError("The message %i is too long for n=%i" % (message, n)) + + return pow(message, ekey, n) + + +def decrypt_int(cyphertext: int, dkey: int, n: int) -> int: + """Decrypts a cypher text using the decryption key 'dkey', working modulo n""" + + assert_int(cyphertext, "cyphertext") + assert_int(dkey, "dkey") + assert_int(n, "n") + + message = pow(cyphertext, dkey, n) + return message diff --git a/.venv/lib/python3.12/site-packages/rsa/key.py b/.venv/lib/python3.12/site-packages/rsa/key.py new file mode 100644 index 00000000..b6b17f91 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/key.py @@ -0,0 +1,858 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""RSA key generation code. + +Create new keys with the newkeys() function. It will give you a PublicKey and a +PrivateKey object. + +Loading and saving keys requires the pyasn1 module. This module is imported as +late as possible, such that other functionality will remain working in absence +of pyasn1. + +.. note:: + + Storing public and private keys via the `pickle` module is possible. + However, it is insecure to load a key from an untrusted source. + The pickle module is not secure against erroneous or maliciously + constructed data. Never unpickle data received from an untrusted + or unauthenticated source. + +""" + +import threading +import typing +import warnings + +import rsa.prime +import rsa.pem +import rsa.common +import rsa.randnum +import rsa.core + + +DEFAULT_EXPONENT = 65537 + + +T = typing.TypeVar("T", bound="AbstractKey") + + +class AbstractKey: + """Abstract superclass for private and public keys.""" + + __slots__ = ("n", "e", "blindfac", "blindfac_inverse", "mutex") + + def __init__(self, n: int, e: int) -> None: + self.n = n + self.e = e + + # These will be computed properly on the first call to blind(). + self.blindfac = self.blindfac_inverse = -1 + + # Used to protect updates to the blinding factor in multi-threaded + # environments. + self.mutex = threading.Lock() + + @classmethod + def _load_pkcs1_pem(cls: typing.Type[T], keyfile: bytes) -> T: + """Loads a key in PKCS#1 PEM format, implement in a subclass. + + :param keyfile: contents of a PEM-encoded file that contains + the public key. + :type keyfile: bytes + + :return: the loaded key + :rtype: AbstractKey + """ + + @classmethod + def _load_pkcs1_der(cls: typing.Type[T], keyfile: bytes) -> T: + """Loads a key in PKCS#1 PEM format, implement in a subclass. + + :param keyfile: contents of a DER-encoded file that contains + the public key. + :type keyfile: bytes + + :return: the loaded key + :rtype: AbstractKey + """ + + def _save_pkcs1_pem(self) -> bytes: + """Saves the key in PKCS#1 PEM format, implement in a subclass. + + :returns: the PEM-encoded key. + :rtype: bytes + """ + + def _save_pkcs1_der(self) -> bytes: + """Saves the key in PKCS#1 DER format, implement in a subclass. + + :returns: the DER-encoded key. + :rtype: bytes + """ + + @classmethod + def load_pkcs1(cls: typing.Type[T], keyfile: bytes, format: str = "PEM") -> T: + """Loads a key in PKCS#1 DER or PEM format. + + :param keyfile: contents of a DER- or PEM-encoded file that contains + the key. + :type keyfile: bytes + :param format: the format of the file to load; 'PEM' or 'DER' + :type format: str + + :return: the loaded key + :rtype: AbstractKey + """ + + methods = { + "PEM": cls._load_pkcs1_pem, + "DER": cls._load_pkcs1_der, + } + + method = cls._assert_format_exists(format, methods) + return method(keyfile) + + @staticmethod + def _assert_format_exists( + file_format: str, methods: typing.Mapping[str, typing.Callable] + ) -> typing.Callable: + """Checks whether the given file format exists in 'methods'.""" + + try: + return methods[file_format] + except KeyError as ex: + formats = ", ".join(sorted(methods.keys())) + raise ValueError( + "Unsupported format: %r, try one of %s" % (file_format, formats) + ) from ex + + def save_pkcs1(self, format: str = "PEM") -> bytes: + """Saves the key in PKCS#1 DER or PEM format. + + :param format: the format to save; 'PEM' or 'DER' + :type format: str + :returns: the DER- or PEM-encoded key. + :rtype: bytes + """ + + methods = { + "PEM": self._save_pkcs1_pem, + "DER": self._save_pkcs1_der, + } + + method = self._assert_format_exists(format, methods) + return method() + + def blind(self, message: int) -> typing.Tuple[int, int]: + """Performs blinding on the message. + + :param message: the message, as integer, to blind. + :param r: the random number to blind with. + :return: tuple (the blinded message, the inverse of the used blinding factor) + + The blinding is such that message = unblind(decrypt(blind(encrypt(message))). + + See https://en.wikipedia.org/wiki/Blinding_%28cryptography%29 + """ + blindfac, blindfac_inverse = self._update_blinding_factor() + blinded = (message * pow(blindfac, self.e, self.n)) % self.n + return blinded, blindfac_inverse + + def unblind(self, blinded: int, blindfac_inverse: int) -> int: + """Performs blinding on the message using random number 'blindfac_inverse'. + + :param blinded: the blinded message, as integer, to unblind. + :param blindfac: the factor to unblind with. + :return: the original message. + + The blinding is such that message = unblind(decrypt(blind(encrypt(message))). + + See https://en.wikipedia.org/wiki/Blinding_%28cryptography%29 + """ + return (blindfac_inverse * blinded) % self.n + + def _initial_blinding_factor(self) -> int: + for _ in range(1000): + blind_r = rsa.randnum.randint(self.n - 1) + if rsa.prime.are_relatively_prime(self.n, blind_r): + return blind_r + raise RuntimeError("unable to find blinding factor") + + def _update_blinding_factor(self) -> typing.Tuple[int, int]: + """Update blinding factors. + + Computing a blinding factor is expensive, so instead this function + does this once, then updates the blinding factor as per section 9 + of 'A Timing Attack against RSA with the Chinese Remainder Theorem' + by Werner Schindler. + See https://tls.mbed.org/public/WSchindler-RSA_Timing_Attack.pdf + + :return: the new blinding factor and its inverse. + """ + + with self.mutex: + if self.blindfac < 0: + # Compute initial blinding factor, which is rather slow to do. + self.blindfac = self._initial_blinding_factor() + self.blindfac_inverse = rsa.common.inverse(self.blindfac, self.n) + else: + # Reuse previous blinding factor. + self.blindfac = pow(self.blindfac, 2, self.n) + self.blindfac_inverse = pow(self.blindfac_inverse, 2, self.n) + + return self.blindfac, self.blindfac_inverse + + +class PublicKey(AbstractKey): + """Represents a public RSA key. + + This key is also known as the 'encryption key'. It contains the 'n' and 'e' + values. + + Supports attributes as well as dictionary-like access. Attribute access is + faster, though. + + >>> PublicKey(5, 3) + PublicKey(5, 3) + + >>> key = PublicKey(5, 3) + >>> key.n + 5 + >>> key['n'] + 5 + >>> key.e + 3 + >>> key['e'] + 3 + + """ + + __slots__ = () + + def __getitem__(self, key: str) -> int: + return getattr(self, key) + + def __repr__(self) -> str: + return "PublicKey(%i, %i)" % (self.n, self.e) + + def __getstate__(self) -> typing.Tuple[int, int]: + """Returns the key as tuple for pickling.""" + return self.n, self.e + + def __setstate__(self, state: typing.Tuple[int, int]) -> None: + """Sets the key from tuple.""" + self.n, self.e = state + AbstractKey.__init__(self, self.n, self.e) + + def __eq__(self, other: typing.Any) -> bool: + if other is None: + return False + + if not isinstance(other, PublicKey): + return False + + return self.n == other.n and self.e == other.e + + def __ne__(self, other: typing.Any) -> bool: + return not (self == other) + + def __hash__(self) -> int: + return hash((self.n, self.e)) + + @classmethod + def _load_pkcs1_der(cls, keyfile: bytes) -> "PublicKey": + """Loads a key in PKCS#1 DER format. + + :param keyfile: contents of a DER-encoded file that contains the public + key. + :return: a PublicKey object + + First let's construct a DER encoded key: + + >>> import base64 + >>> b64der = 'MAwCBQCNGmYtAgMBAAE=' + >>> der = base64.standard_b64decode(b64der) + + This loads the file: + + >>> PublicKey._load_pkcs1_der(der) + PublicKey(2367317549, 65537) + + """ + + from pyasn1.codec.der import decoder + from rsa.asn1 import AsnPubKey + + (priv, _) = decoder.decode(keyfile, asn1Spec=AsnPubKey()) + return cls(n=int(priv["modulus"]), e=int(priv["publicExponent"])) + + def _save_pkcs1_der(self) -> bytes: + """Saves the public key in PKCS#1 DER format. + + :returns: the DER-encoded public key. + :rtype: bytes + """ + + from pyasn1.codec.der import encoder + from rsa.asn1 import AsnPubKey + + # Create the ASN object + asn_key = AsnPubKey() + asn_key.setComponentByName("modulus", self.n) + asn_key.setComponentByName("publicExponent", self.e) + + return encoder.encode(asn_key) + + @classmethod + def _load_pkcs1_pem(cls, keyfile: bytes) -> "PublicKey": + """Loads a PKCS#1 PEM-encoded public key file. + + The contents of the file before the "-----BEGIN RSA PUBLIC KEY-----" and + after the "-----END RSA PUBLIC KEY-----" lines is ignored. + + :param keyfile: contents of a PEM-encoded file that contains the public + key. + :return: a PublicKey object + """ + + der = rsa.pem.load_pem(keyfile, "RSA PUBLIC KEY") + return cls._load_pkcs1_der(der) + + def _save_pkcs1_pem(self) -> bytes: + """Saves a PKCS#1 PEM-encoded public key file. + + :return: contents of a PEM-encoded file that contains the public key. + :rtype: bytes + """ + + der = self._save_pkcs1_der() + return rsa.pem.save_pem(der, "RSA PUBLIC KEY") + + @classmethod + def load_pkcs1_openssl_pem(cls, keyfile: bytes) -> "PublicKey": + """Loads a PKCS#1.5 PEM-encoded public key file from OpenSSL. + + These files can be recognised in that they start with BEGIN PUBLIC KEY + rather than BEGIN RSA PUBLIC KEY. + + The contents of the file before the "-----BEGIN PUBLIC KEY-----" and + after the "-----END PUBLIC KEY-----" lines is ignored. + + :param keyfile: contents of a PEM-encoded file that contains the public + key, from OpenSSL. + :type keyfile: bytes + :return: a PublicKey object + """ + + der = rsa.pem.load_pem(keyfile, "PUBLIC KEY") + return cls.load_pkcs1_openssl_der(der) + + @classmethod + def load_pkcs1_openssl_der(cls, keyfile: bytes) -> "PublicKey": + """Loads a PKCS#1 DER-encoded public key file from OpenSSL. + + :param keyfile: contents of a DER-encoded file that contains the public + key, from OpenSSL. + :return: a PublicKey object + """ + + from rsa.asn1 import OpenSSLPubKey + from pyasn1.codec.der import decoder + from pyasn1.type import univ + + (keyinfo, _) = decoder.decode(keyfile, asn1Spec=OpenSSLPubKey()) + + if keyinfo["header"]["oid"] != univ.ObjectIdentifier("1.2.840.113549.1.1.1"): + raise TypeError("This is not a DER-encoded OpenSSL-compatible public key") + + return cls._load_pkcs1_der(keyinfo["key"][1:]) + + +class PrivateKey(AbstractKey): + """Represents a private RSA key. + + This key is also known as the 'decryption key'. It contains the 'n', 'e', + 'd', 'p', 'q' and other values. + + Supports attributes as well as dictionary-like access. Attribute access is + faster, though. + + >>> PrivateKey(3247, 65537, 833, 191, 17) + PrivateKey(3247, 65537, 833, 191, 17) + + exp1, exp2 and coef will be calculated: + + >>> pk = PrivateKey(3727264081, 65537, 3349121513, 65063, 57287) + >>> pk.exp1 + 55063 + >>> pk.exp2 + 10095 + >>> pk.coef + 50797 + + """ + + __slots__ = ("d", "p", "q", "exp1", "exp2", "coef") + + def __init__(self, n: int, e: int, d: int, p: int, q: int) -> None: + AbstractKey.__init__(self, n, e) + self.d = d + self.p = p + self.q = q + + # Calculate exponents and coefficient. + self.exp1 = int(d % (p - 1)) + self.exp2 = int(d % (q - 1)) + self.coef = rsa.common.inverse(q, p) + + def __getitem__(self, key: str) -> int: + return getattr(self, key) + + def __repr__(self) -> str: + return "PrivateKey(%i, %i, %i, %i, %i)" % ( + self.n, + self.e, + self.d, + self.p, + self.q, + ) + + def __getstate__(self) -> typing.Tuple[int, int, int, int, int, int, int, int]: + """Returns the key as tuple for pickling.""" + return self.n, self.e, self.d, self.p, self.q, self.exp1, self.exp2, self.coef + + def __setstate__(self, state: typing.Tuple[int, int, int, int, int, int, int, int]) -> None: + """Sets the key from tuple.""" + self.n, self.e, self.d, self.p, self.q, self.exp1, self.exp2, self.coef = state + AbstractKey.__init__(self, self.n, self.e) + + def __eq__(self, other: typing.Any) -> bool: + if other is None: + return False + + if not isinstance(other, PrivateKey): + return False + + return ( + self.n == other.n + and self.e == other.e + and self.d == other.d + and self.p == other.p + and self.q == other.q + and self.exp1 == other.exp1 + and self.exp2 == other.exp2 + and self.coef == other.coef + ) + + def __ne__(self, other: typing.Any) -> bool: + return not (self == other) + + def __hash__(self) -> int: + return hash((self.n, self.e, self.d, self.p, self.q, self.exp1, self.exp2, self.coef)) + + def blinded_decrypt(self, encrypted: int) -> int: + """Decrypts the message using blinding to prevent side-channel attacks. + + :param encrypted: the encrypted message + :type encrypted: int + + :returns: the decrypted message + :rtype: int + """ + + # Blinding and un-blinding should be using the same factor + blinded, blindfac_inverse = self.blind(encrypted) + + # Instead of using the core functionality, use the Chinese Remainder + # Theorem and be 2-4x faster. This the same as: + # + # decrypted = rsa.core.decrypt_int(blinded, self.d, self.n) + s1 = pow(blinded, self.exp1, self.p) + s2 = pow(blinded, self.exp2, self.q) + h = ((s1 - s2) * self.coef) % self.p + decrypted = s2 + self.q * h + + return self.unblind(decrypted, blindfac_inverse) + + def blinded_encrypt(self, message: int) -> int: + """Encrypts the message using blinding to prevent side-channel attacks. + + :param message: the message to encrypt + :type message: int + + :returns: the encrypted message + :rtype: int + """ + + blinded, blindfac_inverse = self.blind(message) + encrypted = rsa.core.encrypt_int(blinded, self.d, self.n) + return self.unblind(encrypted, blindfac_inverse) + + @classmethod + def _load_pkcs1_der(cls, keyfile: bytes) -> "PrivateKey": + """Loads a key in PKCS#1 DER format. + + :param keyfile: contents of a DER-encoded file that contains the private + key. + :type keyfile: bytes + :return: a PrivateKey object + + First let's construct a DER encoded key: + + >>> import base64 + >>> b64der = 'MC4CAQACBQDeKYlRAgMBAAECBQDHn4npAgMA/icCAwDfxwIDANcXAgInbwIDAMZt' + >>> der = base64.standard_b64decode(b64der) + + This loads the file: + + >>> PrivateKey._load_pkcs1_der(der) + PrivateKey(3727264081, 65537, 3349121513, 65063, 57287) + + """ + + from pyasn1.codec.der import decoder + + (priv, _) = decoder.decode(keyfile) + + # ASN.1 contents of DER encoded private key: + # + # RSAPrivateKey ::= SEQUENCE { + # version Version, + # modulus INTEGER, -- n + # publicExponent INTEGER, -- e + # privateExponent INTEGER, -- d + # prime1 INTEGER, -- p + # prime2 INTEGER, -- q + # exponent1 INTEGER, -- d mod (p-1) + # exponent2 INTEGER, -- d mod (q-1) + # coefficient INTEGER, -- (inverse of q) mod p + # otherPrimeInfos OtherPrimeInfos OPTIONAL + # } + + if priv[0] != 0: + raise ValueError("Unable to read this file, version %s != 0" % priv[0]) + + as_ints = map(int, priv[1:6]) + key = cls(*as_ints) + + exp1, exp2, coef = map(int, priv[6:9]) + + if (key.exp1, key.exp2, key.coef) != (exp1, exp2, coef): + warnings.warn( + "You have provided a malformed keyfile. Either the exponents " + "or the coefficient are incorrect. Using the correct values " + "instead.", + UserWarning, + ) + + return key + + def _save_pkcs1_der(self) -> bytes: + """Saves the private key in PKCS#1 DER format. + + :returns: the DER-encoded private key. + :rtype: bytes + """ + + from pyasn1.type import univ, namedtype + from pyasn1.codec.der import encoder + + class AsnPrivKey(univ.Sequence): + componentType = namedtype.NamedTypes( + namedtype.NamedType("version", univ.Integer()), + namedtype.NamedType("modulus", univ.Integer()), + namedtype.NamedType("publicExponent", univ.Integer()), + namedtype.NamedType("privateExponent", univ.Integer()), + namedtype.NamedType("prime1", univ.Integer()), + namedtype.NamedType("prime2", univ.Integer()), + namedtype.NamedType("exponent1", univ.Integer()), + namedtype.NamedType("exponent2", univ.Integer()), + namedtype.NamedType("coefficient", univ.Integer()), + ) + + # Create the ASN object + asn_key = AsnPrivKey() + asn_key.setComponentByName("version", 0) + asn_key.setComponentByName("modulus", self.n) + asn_key.setComponentByName("publicExponent", self.e) + asn_key.setComponentByName("privateExponent", self.d) + asn_key.setComponentByName("prime1", self.p) + asn_key.setComponentByName("prime2", self.q) + asn_key.setComponentByName("exponent1", self.exp1) + asn_key.setComponentByName("exponent2", self.exp2) + asn_key.setComponentByName("coefficient", self.coef) + + return encoder.encode(asn_key) + + @classmethod + def _load_pkcs1_pem(cls, keyfile: bytes) -> "PrivateKey": + """Loads a PKCS#1 PEM-encoded private key file. + + The contents of the file before the "-----BEGIN RSA PRIVATE KEY-----" and + after the "-----END RSA PRIVATE KEY-----" lines is ignored. + + :param keyfile: contents of a PEM-encoded file that contains the private + key. + :type keyfile: bytes + :return: a PrivateKey object + """ + + der = rsa.pem.load_pem(keyfile, b"RSA PRIVATE KEY") + return cls._load_pkcs1_der(der) + + def _save_pkcs1_pem(self) -> bytes: + """Saves a PKCS#1 PEM-encoded private key file. + + :return: contents of a PEM-encoded file that contains the private key. + :rtype: bytes + """ + + der = self._save_pkcs1_der() + return rsa.pem.save_pem(der, b"RSA PRIVATE KEY") + + +def find_p_q( + nbits: int, + getprime_func: typing.Callable[[int], int] = rsa.prime.getprime, + accurate: bool = True, +) -> typing.Tuple[int, int]: + """Returns a tuple of two different primes of nbits bits each. + + The resulting p * q has exactly 2 * nbits bits, and the returned p and q + will not be equal. + + :param nbits: the number of bits in each of p and q. + :param getprime_func: the getprime function, defaults to + :py:func:`rsa.prime.getprime`. + + *Introduced in Python-RSA 3.1* + + :param accurate: whether to enable accurate mode or not. + :returns: (p, q), where p > q + + >>> (p, q) = find_p_q(128) + >>> from rsa import common + >>> common.bit_size(p * q) + 256 + + When not in accurate mode, the number of bits can be slightly less + + >>> (p, q) = find_p_q(128, accurate=False) + >>> from rsa import common + >>> common.bit_size(p * q) <= 256 + True + >>> common.bit_size(p * q) > 240 + True + + """ + + total_bits = nbits * 2 + + # Make sure that p and q aren't too close or the factoring programs can + # factor n. + shift = nbits // 16 + pbits = nbits + shift + qbits = nbits - shift + + # Choose the two initial primes + p = getprime_func(pbits) + q = getprime_func(qbits) + + def is_acceptable(p: int, q: int) -> bool: + """Returns True iff p and q are acceptable: + + - p and q differ + - (p * q) has the right nr of bits (when accurate=True) + """ + + if p == q: + return False + + if not accurate: + return True + + # Make sure we have just the right amount of bits + found_size = rsa.common.bit_size(p * q) + return total_bits == found_size + + # Keep choosing other primes until they match our requirements. + change_p = False + while not is_acceptable(p, q): + # Change p on one iteration and q on the other + if change_p: + p = getprime_func(pbits) + else: + q = getprime_func(qbits) + + change_p = not change_p + + # We want p > q as described on + # http://www.di-mgt.com.au/rsa_alg.html#crt + return max(p, q), min(p, q) + + +def calculate_keys_custom_exponent(p: int, q: int, exponent: int) -> typing.Tuple[int, int]: + """Calculates an encryption and a decryption key given p, q and an exponent, + and returns them as a tuple (e, d) + + :param p: the first large prime + :param q: the second large prime + :param exponent: the exponent for the key; only change this if you know + what you're doing, as the exponent influences how difficult your + private key can be cracked. A very common choice for e is 65537. + :type exponent: int + + """ + + phi_n = (p - 1) * (q - 1) + + try: + d = rsa.common.inverse(exponent, phi_n) + except rsa.common.NotRelativePrimeError as ex: + raise rsa.common.NotRelativePrimeError( + exponent, + phi_n, + ex.d, + msg="e (%d) and phi_n (%d) are not relatively prime (divider=%i)" + % (exponent, phi_n, ex.d), + ) from ex + + if (exponent * d) % phi_n != 1: + raise ValueError( + "e (%d) and d (%d) are not mult. inv. modulo " "phi_n (%d)" % (exponent, d, phi_n) + ) + + return exponent, d + + +def calculate_keys(p: int, q: int) -> typing.Tuple[int, int]: + """Calculates an encryption and a decryption key given p and q, and + returns them as a tuple (e, d) + + :param p: the first large prime + :param q: the second large prime + + :return: tuple (e, d) with the encryption and decryption exponents. + """ + + return calculate_keys_custom_exponent(p, q, DEFAULT_EXPONENT) + + +def gen_keys( + nbits: int, + getprime_func: typing.Callable[[int], int], + accurate: bool = True, + exponent: int = DEFAULT_EXPONENT, +) -> typing.Tuple[int, int, int, int]: + """Generate RSA keys of nbits bits. Returns (p, q, e, d). + + Note: this can take a long time, depending on the key size. + + :param nbits: the total number of bits in ``p`` and ``q``. Both ``p`` and + ``q`` will use ``nbits/2`` bits. + :param getprime_func: either :py:func:`rsa.prime.getprime` or a function + with similar signature. + :param exponent: the exponent for the key; only change this if you know + what you're doing, as the exponent influences how difficult your + private key can be cracked. A very common choice for e is 65537. + :type exponent: int + """ + + # Regenerate p and q values, until calculate_keys doesn't raise a + # ValueError. + while True: + (p, q) = find_p_q(nbits // 2, getprime_func, accurate) + try: + (e, d) = calculate_keys_custom_exponent(p, q, exponent=exponent) + break + except ValueError: + pass + + return p, q, e, d + + +def newkeys( + nbits: int, + accurate: bool = True, + poolsize: int = 1, + exponent: int = DEFAULT_EXPONENT, +) -> typing.Tuple[PublicKey, PrivateKey]: + """Generates public and private keys, and returns them as (pub, priv). + + The public key is also known as the 'encryption key', and is a + :py:class:`rsa.PublicKey` object. The private key is also known as the + 'decryption key' and is a :py:class:`rsa.PrivateKey` object. + + :param nbits: the number of bits required to store ``n = p*q``. + :param accurate: when True, ``n`` will have exactly the number of bits you + asked for. However, this makes key generation much slower. When False, + `n`` may have slightly less bits. + :param poolsize: the number of processes to use to generate the prime + numbers. If set to a number > 1, a parallel algorithm will be used. + This requires Python 2.6 or newer. + :param exponent: the exponent for the key; only change this if you know + what you're doing, as the exponent influences how difficult your + private key can be cracked. A very common choice for e is 65537. + :type exponent: int + + :returns: a tuple (:py:class:`rsa.PublicKey`, :py:class:`rsa.PrivateKey`) + + The ``poolsize`` parameter was added in *Python-RSA 3.1* and requires + Python 2.6 or newer. + + """ + + if nbits < 16: + raise ValueError("Key too small") + + if poolsize < 1: + raise ValueError("Pool size (%i) should be >= 1" % poolsize) + + # Determine which getprime function to use + if poolsize > 1: + from rsa import parallel + + def getprime_func(nbits: int) -> int: + return parallel.getprime(nbits, poolsize=poolsize) + + else: + getprime_func = rsa.prime.getprime + + # Generate the key components + (p, q, e, d) = gen_keys(nbits, getprime_func, accurate=accurate, exponent=exponent) + + # Create the key objects + n = p * q + + return (PublicKey(n, e), PrivateKey(n, e, d, p, q)) + + +__all__ = ["PublicKey", "PrivateKey", "newkeys"] + +if __name__ == "__main__": + import doctest + + try: + for count in range(100): + (failures, tests) = doctest.testmod() + if failures: + break + + if (count % 10 == 0 and count) or count == 1: + print("%i times" % count) + except KeyboardInterrupt: + print("Aborted") + else: + print("Doctests done") diff --git a/.venv/lib/python3.12/site-packages/rsa/parallel.py b/.venv/lib/python3.12/site-packages/rsa/parallel.py new file mode 100644 index 00000000..ab992480 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/parallel.py @@ -0,0 +1,96 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Functions for parallel computation on multiple cores. + +Introduced in Python-RSA 3.1. + +.. note:: + + Requires Python 2.6 or newer. + +""" + +import multiprocessing as mp +from multiprocessing.connection import Connection + +import rsa.prime +import rsa.randnum + + +def _find_prime(nbits: int, pipe: Connection) -> None: + while True: + integer = rsa.randnum.read_random_odd_int(nbits) + + # Test for primeness + if rsa.prime.is_prime(integer): + pipe.send(integer) + return + + +def getprime(nbits: int, poolsize: int) -> int: + """Returns a prime number that can be stored in 'nbits' bits. + + Works in multiple threads at the same time. + + >>> p = getprime(128, 3) + >>> rsa.prime.is_prime(p-1) + False + >>> rsa.prime.is_prime(p) + True + >>> rsa.prime.is_prime(p+1) + False + + >>> from rsa import common + >>> common.bit_size(p) == 128 + True + + """ + + (pipe_recv, pipe_send) = mp.Pipe(duplex=False) + + # Create processes + try: + procs = [mp.Process(target=_find_prime, args=(nbits, pipe_send)) for _ in range(poolsize)] + # Start processes + for p in procs: + p.start() + + result = pipe_recv.recv() + finally: + pipe_recv.close() + pipe_send.close() + + # Terminate processes + for p in procs: + p.terminate() + + return result + + +__all__ = ["getprime"] + +if __name__ == "__main__": + print("Running doctests 1000x or until failure") + import doctest + + for count in range(100): + (failures, tests) = doctest.testmod() + if failures: + break + + if count % 10 == 0 and count: + print("%i times" % count) + + print("Doctests done") diff --git a/.venv/lib/python3.12/site-packages/rsa/pem.py b/.venv/lib/python3.12/site-packages/rsa/pem.py new file mode 100644 index 00000000..eb9c0446 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/pem.py @@ -0,0 +1,134 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Functions that load and write PEM-encoded files.""" + +import base64 +import typing + +# Should either be ASCII strings or bytes. +FlexiText = typing.Union[str, bytes] + + +def _markers(pem_marker: FlexiText) -> typing.Tuple[bytes, bytes]: + """ + Returns the start and end PEM markers, as bytes. + """ + + if not isinstance(pem_marker, bytes): + pem_marker = pem_marker.encode("ascii") + + return ( + b"-----BEGIN " + pem_marker + b"-----", + b"-----END " + pem_marker + b"-----", + ) + + +def _pem_lines(contents: bytes, pem_start: bytes, pem_end: bytes) -> typing.Iterator[bytes]: + """Generator over PEM lines between pem_start and pem_end.""" + + in_pem_part = False + seen_pem_start = False + + for line in contents.splitlines(): + line = line.strip() + + # Skip empty lines + if not line: + continue + + # Handle start marker + if line == pem_start: + if in_pem_part: + raise ValueError('Seen start marker "%r" twice' % pem_start) + + in_pem_part = True + seen_pem_start = True + continue + + # Skip stuff before first marker + if not in_pem_part: + continue + + # Handle end marker + if in_pem_part and line == pem_end: + in_pem_part = False + break + + # Load fields + if b":" in line: + continue + + yield line + + # Do some sanity checks + if not seen_pem_start: + raise ValueError('No PEM start marker "%r" found' % pem_start) + + if in_pem_part: + raise ValueError('No PEM end marker "%r" found' % pem_end) + + +def load_pem(contents: FlexiText, pem_marker: FlexiText) -> bytes: + """Loads a PEM file. + + :param contents: the contents of the file to interpret + :param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY' + when your file has '-----BEGIN RSA PRIVATE KEY-----' and + '-----END RSA PRIVATE KEY-----' markers. + + :return: the base64-decoded content between the start and end markers. + + @raise ValueError: when the content is invalid, for example when the start + marker cannot be found. + + """ + + # We want bytes, not text. If it's text, it can be converted to ASCII bytes. + if not isinstance(contents, bytes): + contents = contents.encode("ascii") + + (pem_start, pem_end) = _markers(pem_marker) + pem_lines = [line for line in _pem_lines(contents, pem_start, pem_end)] + + # Base64-decode the contents + pem = b"".join(pem_lines) + return base64.standard_b64decode(pem) + + +def save_pem(contents: bytes, pem_marker: FlexiText) -> bytes: + """Saves a PEM file. + + :param contents: the contents to encode in PEM format + :param pem_marker: the marker of the PEM content, such as 'RSA PRIVATE KEY' + when your file has '-----BEGIN RSA PRIVATE KEY-----' and + '-----END RSA PRIVATE KEY-----' markers. + + :return: the base64-encoded content between the start and end markers, as bytes. + + """ + + (pem_start, pem_end) = _markers(pem_marker) + + b64 = base64.standard_b64encode(contents).replace(b"\n", b"") + pem_lines = [pem_start] + + for block_start in range(0, len(b64), 64): + block = b64[block_start : block_start + 64] + pem_lines.append(block) + + pem_lines.append(pem_end) + pem_lines.append(b"") + + return b"\n".join(pem_lines) diff --git a/.venv/lib/python3.12/site-packages/rsa/pkcs1.py b/.venv/lib/python3.12/site-packages/rsa/pkcs1.py new file mode 100644 index 00000000..b7f54220 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/pkcs1.py @@ -0,0 +1,485 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Functions for PKCS#1 version 1.5 encryption and signing + +This module implements certain functionality from PKCS#1 version 1.5. For a +very clear example, read http://www.di-mgt.com.au/rsa_alg.html#pkcs1schemes + +At least 8 bytes of random padding is used when encrypting a message. This makes +these methods much more secure than the ones in the ``rsa`` module. + +WARNING: this module leaks information when decryption fails. The exceptions +that are raised contain the Python traceback information, which can be used to +deduce where in the process the failure occurred. DO NOT PASS SUCH INFORMATION +to your users. +""" + +import hashlib +import os +import sys +import typing +from hmac import compare_digest + +from . import common, transform, core, key + +if typing.TYPE_CHECKING: + HashType = hashlib._Hash +else: + HashType = typing.Any + +# ASN.1 codes that describe the hash algorithm used. +HASH_ASN1 = { + "MD5": b"\x30\x20\x30\x0c\x06\x08\x2a\x86\x48\x86\xf7\x0d\x02\x05\x05\x00\x04\x10", + "SHA-1": b"\x30\x21\x30\x09\x06\x05\x2b\x0e\x03\x02\x1a\x05\x00\x04\x14", + "SHA-224": b"\x30\x2d\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x04\x05\x00\x04\x1c", + "SHA-256": b"\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x01\x05\x00\x04\x20", + "SHA-384": b"\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x02\x05\x00\x04\x30", + "SHA-512": b"\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x03\x05\x00\x04\x40", +} + +HASH_METHODS: typing.Dict[str, typing.Callable[[], HashType]] = { + "MD5": hashlib.md5, + "SHA-1": hashlib.sha1, + "SHA-224": hashlib.sha224, + "SHA-256": hashlib.sha256, + "SHA-384": hashlib.sha384, + "SHA-512": hashlib.sha512, +} +"""Hash methods supported by this library.""" + + +if sys.version_info >= (3, 6): + # Python 3.6 introduced SHA3 support. + HASH_ASN1.update( + { + "SHA3-256": b"\x30\x31\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x08\x05\x00\x04\x20", + "SHA3-384": b"\x30\x41\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x09\x05\x00\x04\x30", + "SHA3-512": b"\x30\x51\x30\x0d\x06\x09\x60\x86\x48\x01\x65\x03\x04\x02\x0a\x05\x00\x04\x40", + } + ) + + HASH_METHODS.update( + { + "SHA3-256": hashlib.sha3_256, + "SHA3-384": hashlib.sha3_384, + "SHA3-512": hashlib.sha3_512, + } + ) + + +class CryptoError(Exception): + """Base class for all exceptions in this module.""" + + +class DecryptionError(CryptoError): + """Raised when decryption fails.""" + + +class VerificationError(CryptoError): + """Raised when verification fails.""" + + +def _pad_for_encryption(message: bytes, target_length: int) -> bytes: + r"""Pads the message for encryption, returning the padded message. + + :return: 00 02 RANDOM_DATA 00 MESSAGE + + >>> block = _pad_for_encryption(b'hello', 16) + >>> len(block) + 16 + >>> block[0:2] + b'\x00\x02' + >>> block[-6:] + b'\x00hello' + + """ + + max_msglength = target_length - 11 + msglength = len(message) + + if msglength > max_msglength: + raise OverflowError( + "%i bytes needed for message, but there is only" + " space for %i" % (msglength, max_msglength) + ) + + # Get random padding + padding = b"" + padding_length = target_length - msglength - 3 + + # We remove 0-bytes, so we'll end up with less padding than we've asked for, + # so keep adding data until we're at the correct length. + while len(padding) < padding_length: + needed_bytes = padding_length - len(padding) + + # Always read at least 8 bytes more than we need, and trim off the rest + # after removing the 0-bytes. This increases the chance of getting + # enough bytes, especially when needed_bytes is small + new_padding = os.urandom(needed_bytes + 5) + new_padding = new_padding.replace(b"\x00", b"") + padding = padding + new_padding[:needed_bytes] + + assert len(padding) == padding_length + + return b"".join([b"\x00\x02", padding, b"\x00", message]) + + +def _pad_for_signing(message: bytes, target_length: int) -> bytes: + r"""Pads the message for signing, returning the padded message. + + The padding is always a repetition of FF bytes. + + :return: 00 01 PADDING 00 MESSAGE + + >>> block = _pad_for_signing(b'hello', 16) + >>> len(block) + 16 + >>> block[0:2] + b'\x00\x01' + >>> block[-6:] + b'\x00hello' + >>> block[2:-6] + b'\xff\xff\xff\xff\xff\xff\xff\xff' + + """ + + max_msglength = target_length - 11 + msglength = len(message) + + if msglength > max_msglength: + raise OverflowError( + "%i bytes needed for message, but there is only" + " space for %i" % (msglength, max_msglength) + ) + + padding_length = target_length - msglength - 3 + + return b"".join([b"\x00\x01", padding_length * b"\xff", b"\x00", message]) + + +def encrypt(message: bytes, pub_key: key.PublicKey) -> bytes: + """Encrypts the given message using PKCS#1 v1.5 + + :param message: the message to encrypt. Must be a byte string no longer than + ``k-11`` bytes, where ``k`` is the number of bytes needed to encode + the ``n`` component of the public key. + :param pub_key: the :py:class:`rsa.PublicKey` to encrypt with. + :raise OverflowError: when the message is too large to fit in the padded + block. + + >>> from rsa import key, common + >>> (pub_key, priv_key) = key.newkeys(256) + >>> message = b'hello' + >>> crypto = encrypt(message, pub_key) + + The crypto text should be just as long as the public key 'n' component: + + >>> len(crypto) == common.byte_size(pub_key.n) + True + + """ + + keylength = common.byte_size(pub_key.n) + padded = _pad_for_encryption(message, keylength) + + payload = transform.bytes2int(padded) + encrypted = core.encrypt_int(payload, pub_key.e, pub_key.n) + block = transform.int2bytes(encrypted, keylength) + + return block + + +def decrypt(crypto: bytes, priv_key: key.PrivateKey) -> bytes: + r"""Decrypts the given message using PKCS#1 v1.5 + + The decryption is considered 'failed' when the resulting cleartext doesn't + start with the bytes 00 02, or when the 00 byte between the padding and + the message cannot be found. + + :param crypto: the crypto text as returned by :py:func:`rsa.encrypt` + :param priv_key: the :py:class:`rsa.PrivateKey` to decrypt with. + :raise DecryptionError: when the decryption fails. No details are given as + to why the code thinks the decryption fails, as this would leak + information about the private key. + + + >>> import rsa + >>> (pub_key, priv_key) = rsa.newkeys(256) + + It works with strings: + + >>> crypto = encrypt(b'hello', pub_key) + >>> decrypt(crypto, priv_key) + b'hello' + + And with binary data: + + >>> crypto = encrypt(b'\x00\x00\x00\x00\x01', pub_key) + >>> decrypt(crypto, priv_key) + b'\x00\x00\x00\x00\x01' + + Altering the encrypted information will *likely* cause a + :py:class:`rsa.pkcs1.DecryptionError`. If you want to be *sure*, use + :py:func:`rsa.sign`. + + + .. warning:: + + Never display the stack trace of a + :py:class:`rsa.pkcs1.DecryptionError` exception. It shows where in the + code the exception occurred, and thus leaks information about the key. + It's only a tiny bit of information, but every bit makes cracking the + keys easier. + + >>> crypto = encrypt(b'hello', pub_key) + >>> crypto = crypto[0:5] + b'X' + crypto[6:] # change a byte + >>> decrypt(crypto, priv_key) + Traceback (most recent call last): + ... + rsa.pkcs1.DecryptionError: Decryption failed + + """ + + blocksize = common.byte_size(priv_key.n) + encrypted = transform.bytes2int(crypto) + decrypted = priv_key.blinded_decrypt(encrypted) + cleartext = transform.int2bytes(decrypted, blocksize) + + # Detect leading zeroes in the crypto. These are not reflected in the + # encrypted value (as leading zeroes do not influence the value of an + # integer). This fixes CVE-2020-13757. + if len(crypto) > blocksize: + # This is operating on public information, so doesn't need to be constant-time. + raise DecryptionError("Decryption failed") + + # If we can't find the cleartext marker, decryption failed. + cleartext_marker_bad = not compare_digest(cleartext[:2], b"\x00\x02") + + # Find the 00 separator between the padding and the message + sep_idx = cleartext.find(b"\x00", 2) + + # sep_idx indicates the position of the `\x00` separator that separates the + # padding from the actual message. The padding should be at least 8 bytes + # long (see https://tools.ietf.org/html/rfc8017#section-7.2.2 step 3), which + # means the separator should be at least at index 10 (because of the + # `\x00\x02` marker that precedes it). + sep_idx_bad = sep_idx < 10 + + anything_bad = cleartext_marker_bad | sep_idx_bad + if anything_bad: + raise DecryptionError("Decryption failed") + + return cleartext[sep_idx + 1 :] + + +def sign_hash(hash_value: bytes, priv_key: key.PrivateKey, hash_method: str) -> bytes: + """Signs a precomputed hash with the private key. + + Hashes the message, then signs the hash with the given key. This is known + as a "detached signature", because the message itself isn't altered. + + :param hash_value: A precomputed hash to sign (ignores message). + :param priv_key: the :py:class:`rsa.PrivateKey` to sign with + :param hash_method: the hash method used on the message. Use 'MD5', 'SHA-1', + 'SHA-224', SHA-256', 'SHA-384' or 'SHA-512'. + :return: a message signature block. + :raise OverflowError: if the private key is too small to contain the + requested hash. + + """ + + # Get the ASN1 code for this hash method + if hash_method not in HASH_ASN1: + raise ValueError("Invalid hash method: %s" % hash_method) + asn1code = HASH_ASN1[hash_method] + + # Encrypt the hash with the private key + cleartext = asn1code + hash_value + keylength = common.byte_size(priv_key.n) + padded = _pad_for_signing(cleartext, keylength) + + payload = transform.bytes2int(padded) + encrypted = priv_key.blinded_encrypt(payload) + block = transform.int2bytes(encrypted, keylength) + + return block + + +def sign(message: bytes, priv_key: key.PrivateKey, hash_method: str) -> bytes: + """Signs the message with the private key. + + Hashes the message, then signs the hash with the given key. This is known + as a "detached signature", because the message itself isn't altered. + + :param message: the message to sign. Can be an 8-bit string or a file-like + object. If ``message`` has a ``read()`` method, it is assumed to be a + file-like object. + :param priv_key: the :py:class:`rsa.PrivateKey` to sign with + :param hash_method: the hash method used on the message. Use 'MD5', 'SHA-1', + 'SHA-224', SHA-256', 'SHA-384' or 'SHA-512'. + :return: a message signature block. + :raise OverflowError: if the private key is too small to contain the + requested hash. + + """ + + msg_hash = compute_hash(message, hash_method) + return sign_hash(msg_hash, priv_key, hash_method) + + +def verify(message: bytes, signature: bytes, pub_key: key.PublicKey) -> str: + """Verifies that the signature matches the message. + + The hash method is detected automatically from the signature. + + :param message: the signed message. Can be an 8-bit string or a file-like + object. If ``message`` has a ``read()`` method, it is assumed to be a + file-like object. + :param signature: the signature block, as created with :py:func:`rsa.sign`. + :param pub_key: the :py:class:`rsa.PublicKey` of the person signing the message. + :raise VerificationError: when the signature doesn't match the message. + :returns: the name of the used hash. + + """ + + keylength = common.byte_size(pub_key.n) + encrypted = transform.bytes2int(signature) + decrypted = core.decrypt_int(encrypted, pub_key.e, pub_key.n) + clearsig = transform.int2bytes(decrypted, keylength) + + # Get the hash method + method_name = _find_method_hash(clearsig) + message_hash = compute_hash(message, method_name) + + # Reconstruct the expected padded hash + cleartext = HASH_ASN1[method_name] + message_hash + expected = _pad_for_signing(cleartext, keylength) + + if len(signature) != keylength: + raise VerificationError("Verification failed") + + # Compare with the signed one + if expected != clearsig: + raise VerificationError("Verification failed") + + return method_name + + +def find_signature_hash(signature: bytes, pub_key: key.PublicKey) -> str: + """Returns the hash name detected from the signature. + + If you also want to verify the message, use :py:func:`rsa.verify()` instead. + It also returns the name of the used hash. + + :param signature: the signature block, as created with :py:func:`rsa.sign`. + :param pub_key: the :py:class:`rsa.PublicKey` of the person signing the message. + :returns: the name of the used hash. + """ + + keylength = common.byte_size(pub_key.n) + encrypted = transform.bytes2int(signature) + decrypted = core.decrypt_int(encrypted, pub_key.e, pub_key.n) + clearsig = transform.int2bytes(decrypted, keylength) + + return _find_method_hash(clearsig) + + +def yield_fixedblocks(infile: typing.BinaryIO, blocksize: int) -> typing.Iterator[bytes]: + """Generator, yields each block of ``blocksize`` bytes in the input file. + + :param infile: file to read and separate in blocks. + :param blocksize: block size in bytes. + :returns: a generator that yields the contents of each block + """ + + while True: + block = infile.read(blocksize) + + read_bytes = len(block) + if read_bytes == 0: + break + + yield block + + if read_bytes < blocksize: + break + + +def compute_hash(message: typing.Union[bytes, typing.BinaryIO], method_name: str) -> bytes: + """Returns the message digest. + + :param message: the signed message. Can be an 8-bit string or a file-like + object. If ``message`` has a ``read()`` method, it is assumed to be a + file-like object. + :param method_name: the hash method, must be a key of + :py:const:`rsa.pkcs1.HASH_METHODS`. + + """ + + if method_name not in HASH_METHODS: + raise ValueError("Invalid hash method: %s" % method_name) + + method = HASH_METHODS[method_name] + hasher = method() + + if isinstance(message, bytes): + hasher.update(message) + else: + assert hasattr(message, "read") and hasattr(message.read, "__call__") + # read as 1K blocks + for block in yield_fixedblocks(message, 1024): + hasher.update(block) + + return hasher.digest() + + +def _find_method_hash(clearsig: bytes) -> str: + """Finds the hash method. + + :param clearsig: full padded ASN1 and hash. + :return: the used hash method. + :raise VerificationFailed: when the hash method cannot be found + """ + + for (hashname, asn1code) in HASH_ASN1.items(): + if asn1code in clearsig: + return hashname + + raise VerificationError("Verification failed") + + +__all__ = [ + "encrypt", + "decrypt", + "sign", + "verify", + "DecryptionError", + "VerificationError", + "CryptoError", +] + +if __name__ == "__main__": + print("Running doctests 1000x or until failure") + import doctest + + for count in range(1000): + (failures, tests) = doctest.testmod() + if failures: + break + + if count % 100 == 0 and count: + print("%i times" % count) + + print("Doctests done") diff --git a/.venv/lib/python3.12/site-packages/rsa/pkcs1_v2.py b/.venv/lib/python3.12/site-packages/rsa/pkcs1_v2.py new file mode 100644 index 00000000..cc2c3170 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/pkcs1_v2.py @@ -0,0 +1,100 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Functions for PKCS#1 version 2 encryption and signing + +This module implements certain functionality from PKCS#1 version 2. Main +documentation is RFC 2437: https://tools.ietf.org/html/rfc2437 +""" + +from rsa import ( + common, + pkcs1, + transform, +) + + +def mgf1(seed: bytes, length: int, hasher: str = "SHA-1") -> bytes: + """ + MGF1 is a Mask Generation Function based on a hash function. + + A mask generation function takes an octet string of variable length and a + desired output length as input, and outputs an octet string of the desired + length. The plaintext-awareness of RSAES-OAEP relies on the random nature of + the output of the mask generation function, which in turn relies on the + random nature of the underlying hash. + + :param bytes seed: seed from which mask is generated, an octet string + :param int length: intended length in octets of the mask, at most 2^32(hLen) + :param str hasher: hash function (hLen denotes the length in octets of the hash + function output) + + :return: mask, an octet string of length `length` + :rtype: bytes + + :raise OverflowError: when `length` is too large for the specified `hasher` + :raise ValueError: when specified `hasher` is invalid + """ + + try: + hash_length = pkcs1.HASH_METHODS[hasher]().digest_size + except KeyError as ex: + raise ValueError( + "Invalid `hasher` specified. Please select one of: {hash_list}".format( + hash_list=", ".join(sorted(pkcs1.HASH_METHODS.keys())) + ) + ) from ex + + # If l > 2^32(hLen), output "mask too long" and stop. + if length > (2 ** 32 * hash_length): + raise OverflowError( + "Desired length should be at most 2**32 times the hasher's output " + "length ({hash_length} for {hasher} function)".format( + hash_length=hash_length, + hasher=hasher, + ) + ) + + # Looping `counter` from 0 to ceil(l / hLen)-1, build `output` based on the + # hashes formed by (`seed` + C), being `C` an octet string of length 4 + # generated by converting `counter` with the primitive I2OSP + output = b"".join( + pkcs1.compute_hash( + seed + transform.int2bytes(counter, fill_size=4), + method_name=hasher, + ) + for counter in range(common.ceil_div(length, hash_length) + 1) + ) + + # Output the leading `length` octets of `output` as the octet string mask. + return output[:length] + + +__all__ = [ + "mgf1", +] + +if __name__ == "__main__": + print("Running doctests 1000x or until failure") + import doctest + + for count in range(1000): + (failures, tests) = doctest.testmod() + if failures: + break + + if count % 100 == 0 and count: + print("%i times" % count) + + print("Doctests done") diff --git a/.venv/lib/python3.12/site-packages/rsa/prime.py b/.venv/lib/python3.12/site-packages/rsa/prime.py new file mode 100644 index 00000000..481c4d77 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/prime.py @@ -0,0 +1,198 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Numerical functions related to primes. + +Implementation based on the book Algorithm Design by Michael T. Goodrich and +Roberto Tamassia, 2002. +""" + +import rsa.common +import rsa.randnum + +__all__ = ["getprime", "are_relatively_prime"] + + +def gcd(p: int, q: int) -> int: + """Returns the greatest common divisor of p and q + + >>> gcd(48, 180) + 12 + """ + + while q != 0: + (p, q) = (q, p % q) + return p + + +def get_primality_testing_rounds(number: int) -> int: + """Returns minimum number of rounds for Miller-Rabing primality testing, + based on number bitsize. + + According to NIST FIPS 186-4, Appendix C, Table C.3, minimum number of + rounds of M-R testing, using an error probability of 2 ** (-100), for + different p, q bitsizes are: + * p, q bitsize: 512; rounds: 7 + * p, q bitsize: 1024; rounds: 4 + * p, q bitsize: 1536; rounds: 3 + See: http://nvlpubs.nist.gov/nistpubs/FIPS/NIST.FIPS.186-4.pdf + """ + + # Calculate number bitsize. + bitsize = rsa.common.bit_size(number) + # Set number of rounds. + if bitsize >= 1536: + return 3 + if bitsize >= 1024: + return 4 + if bitsize >= 512: + return 7 + # For smaller bitsizes, set arbitrary number of rounds. + return 10 + + +def miller_rabin_primality_testing(n: int, k: int) -> bool: + """Calculates whether n is composite (which is always correct) or prime + (which theoretically is incorrect with error probability 4**-k), by + applying Miller-Rabin primality testing. + + For reference and implementation example, see: + https://en.wikipedia.org/wiki/Miller%E2%80%93Rabin_primality_test + + :param n: Integer to be tested for primality. + :type n: int + :param k: Number of rounds (witnesses) of Miller-Rabin testing. + :type k: int + :return: False if the number is composite, True if it's probably prime. + :rtype: bool + """ + + # prevent potential infinite loop when d = 0 + if n < 2: + return False + + # Decompose (n - 1) to write it as (2 ** r) * d + # While d is even, divide it by 2 and increase the exponent. + d = n - 1 + r = 0 + + while not (d & 1): + r += 1 + d >>= 1 + + # Test k witnesses. + for _ in range(k): + # Generate random integer a, where 2 <= a <= (n - 2) + a = rsa.randnum.randint(n - 3) + 1 + + x = pow(a, d, n) + if x == 1 or x == n - 1: + continue + + for _ in range(r - 1): + x = pow(x, 2, n) + if x == 1: + # n is composite. + return False + if x == n - 1: + # Exit inner loop and continue with next witness. + break + else: + # If loop doesn't break, n is composite. + return False + + return True + + +def is_prime(number: int) -> bool: + """Returns True if the number is prime, and False otherwise. + + >>> is_prime(2) + True + >>> is_prime(42) + False + >>> is_prime(41) + True + """ + + # Check for small numbers. + if number < 10: + return number in {2, 3, 5, 7} + + # Check for even numbers. + if not (number & 1): + return False + + # Calculate minimum number of rounds. + k = get_primality_testing_rounds(number) + + # Run primality testing with (minimum + 1) rounds. + return miller_rabin_primality_testing(number, k + 1) + + +def getprime(nbits: int) -> int: + """Returns a prime number that can be stored in 'nbits' bits. + + >>> p = getprime(128) + >>> is_prime(p-1) + False + >>> is_prime(p) + True + >>> is_prime(p+1) + False + + >>> from rsa import common + >>> common.bit_size(p) == 128 + True + """ + + assert nbits > 3 # the loop will hang on too small numbers + + while True: + integer = rsa.randnum.read_random_odd_int(nbits) + + # Test for primeness + if is_prime(integer): + return integer + + # Retry if not prime + + +def are_relatively_prime(a: int, b: int) -> bool: + """Returns True if a and b are relatively prime, and False if they + are not. + + >>> are_relatively_prime(2, 3) + True + >>> are_relatively_prime(2, 4) + False + """ + + d = gcd(a, b) + return d == 1 + + +if __name__ == "__main__": + print("Running doctests 1000x or until failure") + import doctest + + for count in range(1000): + (failures, tests) = doctest.testmod() + if failures: + break + + if count % 100 == 0 and count: + print("%i times" % count) + + print("Doctests done") diff --git a/.venv/lib/python3.12/site-packages/rsa/py.typed b/.venv/lib/python3.12/site-packages/rsa/py.typed new file mode 100644 index 00000000..bfd62201 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/py.typed @@ -0,0 +1 @@ +# Marker file for PEP 561. The rsa package uses inline types. diff --git a/.venv/lib/python3.12/site-packages/rsa/randnum.py b/.venv/lib/python3.12/site-packages/rsa/randnum.py new file mode 100644 index 00000000..bb255c57 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/randnum.py @@ -0,0 +1,95 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Functions for generating random numbers.""" + +# Source inspired by code by Yesudeep Mangalapilly <yesudeep@gmail.com> + +import os +import struct + +from rsa import common, transform + + +def read_random_bits(nbits: int) -> bytes: + """Reads 'nbits' random bits. + + If nbits isn't a whole number of bytes, an extra byte will be appended with + only the lower bits set. + """ + + nbytes, rbits = divmod(nbits, 8) + + # Get the random bytes + randomdata = os.urandom(nbytes) + + # Add the remaining random bits + if rbits > 0: + randomvalue = ord(os.urandom(1)) + randomvalue >>= 8 - rbits + randomdata = struct.pack("B", randomvalue) + randomdata + + return randomdata + + +def read_random_int(nbits: int) -> int: + """Reads a random integer of approximately nbits bits.""" + + randomdata = read_random_bits(nbits) + value = transform.bytes2int(randomdata) + + # Ensure that the number is large enough to just fill out the required + # number of bits. + value |= 1 << (nbits - 1) + + return value + + +def read_random_odd_int(nbits: int) -> int: + """Reads a random odd integer of approximately nbits bits. + + >>> read_random_odd_int(512) & 1 + 1 + """ + + value = read_random_int(nbits) + + # Make sure it's odd + return value | 1 + + +def randint(maxvalue: int) -> int: + """Returns a random integer x with 1 <= x <= maxvalue + + May take a very long time in specific situations. If maxvalue needs N bits + to store, the closer maxvalue is to (2 ** N) - 1, the faster this function + is. + """ + + bit_size = common.bit_size(maxvalue) + + tries = 0 + while True: + value = read_random_int(bit_size) + if value <= maxvalue: + break + + if tries % 10 == 0 and tries: + # After a lot of tries to get the right number of bits but still + # smaller than maxvalue, decrease the number of bits by 1. That'll + # dramatically increase the chances to get a large enough number. + bit_size -= 1 + tries += 1 + + return value diff --git a/.venv/lib/python3.12/site-packages/rsa/transform.py b/.venv/lib/python3.12/site-packages/rsa/transform.py new file mode 100644 index 00000000..76ca8499 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/transform.py @@ -0,0 +1,72 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Data transformation functions. + +From bytes to a number, number to bytes, etc. +""" + +import math + + +def bytes2int(raw_bytes: bytes) -> int: + r"""Converts a list of bytes or an 8-bit string to an integer. + + When using unicode strings, encode it to some encoding like UTF8 first. + + >>> (((128 * 256) + 64) * 256) + 15 + 8405007 + >>> bytes2int(b'\x80@\x0f') + 8405007 + + """ + return int.from_bytes(raw_bytes, "big", signed=False) + + +def int2bytes(number: int, fill_size: int = 0) -> bytes: + """ + Convert an unsigned integer to bytes (big-endian):: + + Does not preserve leading zeros if you don't specify a fill size. + + :param number: + Integer value + :param fill_size: + If the optional fill size is given the length of the resulting + byte string is expected to be the fill size and will be padded + with prefix zero bytes to satisfy that length. + :returns: + Raw bytes (base-256 representation). + :raises: + ``OverflowError`` when fill_size is given and the number takes up more + bytes than fit into the block. This requires the ``overflow`` + argument to this function to be set to ``False`` otherwise, no + error will be raised. + """ + + if number < 0: + raise ValueError("Number must be an unsigned integer: %d" % number) + + bytes_required = max(1, math.ceil(number.bit_length() / 8)) + + if fill_size > 0: + return number.to_bytes(fill_size, "big") + + return number.to_bytes(bytes_required, "big") + + +if __name__ == "__main__": + import doctest + + doctest.testmod() diff --git a/.venv/lib/python3.12/site-packages/rsa/util.py b/.venv/lib/python3.12/site-packages/rsa/util.py new file mode 100644 index 00000000..8a598cca --- /dev/null +++ b/.venv/lib/python3.12/site-packages/rsa/util.py @@ -0,0 +1,97 @@ +# Copyright 2011 Sybren A. Stüvel <sybren@stuvel.eu> +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utility functions.""" + +import sys +from optparse import OptionParser + +import rsa.key + + +def private_to_public() -> None: + """Reads a private key and outputs the corresponding public key.""" + + # Parse the CLI options + parser = OptionParser( + usage="usage: %prog [options]", + description="Reads a private key and outputs the " + "corresponding public key. Both private and public keys use " + "the format described in PKCS#1 v1.5", + ) + + parser.add_option( + "-i", + "--input", + dest="infilename", + type="string", + help="Input filename. Reads from stdin if not specified", + ) + parser.add_option( + "-o", + "--output", + dest="outfilename", + type="string", + help="Output filename. Writes to stdout of not specified", + ) + + parser.add_option( + "--inform", + dest="inform", + help="key format of input - default PEM", + choices=("PEM", "DER"), + default="PEM", + ) + + parser.add_option( + "--outform", + dest="outform", + help="key format of output - default PEM", + choices=("PEM", "DER"), + default="PEM", + ) + + (cli, cli_args) = parser.parse_args(sys.argv) + + # Read the input data + if cli.infilename: + print( + "Reading private key from %s in %s format" % (cli.infilename, cli.inform), + file=sys.stderr, + ) + with open(cli.infilename, "rb") as infile: + in_data = infile.read() + else: + print("Reading private key from stdin in %s format" % cli.inform, file=sys.stderr) + in_data = sys.stdin.read().encode("ascii") + + assert type(in_data) == bytes, type(in_data) + + # Take the public fields and create a public key + priv_key = rsa.key.PrivateKey.load_pkcs1(in_data, cli.inform) + pub_key = rsa.key.PublicKey(priv_key.n, priv_key.e) + + # Save to the output file + out_data = pub_key.save_pkcs1(cli.outform) + + if cli.outfilename: + print( + "Writing public key to %s in %s format" % (cli.outfilename, cli.outform), + file=sys.stderr, + ) + with open(cli.outfilename, "wb") as outfile: + outfile.write(out_data) + else: + print("Writing public key to stdout in %s format" % cli.outform, file=sys.stderr) + sys.stdout.write(out_data.decode("ascii")) |