about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/core/providers/crypto
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/providers/crypto
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/providers/crypto')
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/crypto/__init__.py9
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/crypto/bcrypt.py195
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/crypto/nacl.py181
3 files changed, 385 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/providers/crypto/__init__.py b/.venv/lib/python3.12/site-packages/core/providers/crypto/__init__.py
new file mode 100644
index 00000000..e509f990
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/crypto/__init__.py
@@ -0,0 +1,9 @@
+from .bcrypt import BcryptCryptoConfig, BCryptCryptoProvider
+from .nacl import NaClCryptoConfig, NaClCryptoProvider
+
+__all__ = [
+    "BCryptCryptoProvider",
+    "BcryptCryptoConfig",
+    "NaClCryptoConfig",
+    "NaClCryptoProvider",
+]
diff --git a/.venv/lib/python3.12/site-packages/core/providers/crypto/bcrypt.py b/.venv/lib/python3.12/site-packages/core/providers/crypto/bcrypt.py
new file mode 100644
index 00000000..9c39977c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/crypto/bcrypt.py
@@ -0,0 +1,195 @@
+import base64
+import logging
+import os
+from abc import ABC
+from datetime import datetime, timezone
+from typing import Optional, Tuple
+
+import bcrypt
+import jwt
+import nacl.encoding
+import nacl.exceptions
+import nacl.signing
+import nacl.utils
+
+from core.base import CryptoConfig, CryptoProvider
+
+DEFAULT_BCRYPT_SECRET_KEY = "wNFbczH3QhUVcPALwtWZCPi0lrDlGV3P1DPRVEQCPbM"  # Replace or load from env or secrets manager
+
+
+class BcryptCryptoConfig(CryptoConfig):
+    provider: str = "bcrypt"
+    # Number of rounds for bcrypt (increasing this makes hashing slower but more secure)
+    bcrypt_rounds: int = 12
+    secret_key: Optional[str] = None
+    api_key_bytes: int = 32  # Length of raw API keys
+
+    @property
+    def supported_providers(self) -> list[str]:
+        return ["bcrypt"]
+
+    def validate_config(self) -> None:
+        super().validate_config()
+        if self.provider not in self.supported_providers:
+            raise ValueError(f"Unsupported crypto provider: {self.provider}")
+        if self.bcrypt_rounds < 4 or self.bcrypt_rounds > 31:
+            raise ValueError("bcrypt_rounds must be between 4 and 31")
+
+    def verify_password(
+        self, plain_password: str, hashed_password: str
+    ) -> bool:
+        try:
+            # First try to decode as base64 (new format)
+            stored_hash = base64.b64decode(hashed_password.encode("utf-8"))
+        except Exception:
+            # If that fails, treat as raw bcrypt hash (old format)
+            stored_hash = hashed_password.encode("utf-8")
+
+        return bcrypt.checkpw(plain_password.encode("utf-8"), stored_hash)
+
+
+class BCryptCryptoProvider(CryptoProvider, ABC):
+    def __init__(self, config: BcryptCryptoConfig):
+        if not isinstance(config, BcryptCryptoConfig):
+            raise ValueError(
+                "BcryptCryptoProvider must be initialized with a BcryptCryptoConfig"
+            )
+        logging.info("Initializing BcryptCryptoProvider")
+        super().__init__(config)
+        self.config: BcryptCryptoConfig = config
+
+        # Load the secret key for JWT
+        # No fallback defaults: fail if not provided
+        self.secret_key = (
+            config.secret_key
+            or os.getenv("R2R_SECRET_KEY")
+            or DEFAULT_BCRYPT_SECRET_KEY
+        )
+        if not self.secret_key:
+            raise ValueError(
+                "No secret key provided for BcryptCryptoProvider."
+            )
+
+    def get_password_hash(self, password: str) -> str:
+        # Bcrypt expects bytes
+        password_bytes = password.encode("utf-8")
+        hashed = bcrypt.hashpw(
+            password_bytes, bcrypt.gensalt(rounds=self.config.bcrypt_rounds)
+        )
+        return base64.b64encode(hashed).decode("utf-8")
+
+    def verify_password(
+        self, plain_password: str, hashed_password: str
+    ) -> bool:
+        try:
+            # First try to decode as base64 (new format)
+            stored_hash = base64.b64decode(hashed_password.encode("utf-8"))
+            if not stored_hash.startswith(b"$2b$"):  # Valid bcrypt hash prefix
+                stored_hash = hashed_password.encode("utf-8")
+        except Exception:
+            # Otherwise raw bcrypt hash (old format)
+            stored_hash = hashed_password.encode("utf-8")
+
+        try:
+            return bcrypt.checkpw(plain_password.encode("utf-8"), stored_hash)
+        except ValueError as e:
+            if "Invalid salt" in str(e):
+                # If it's an invalid salt, the hash format is wrong - try the other format
+                try:
+                    stored_hash = (
+                        hashed_password
+                        if isinstance(hashed_password, bytes)
+                        else hashed_password.encode("utf-8")
+                    )
+                    return bcrypt.checkpw(
+                        plain_password.encode("utf-8"), stored_hash
+                    )
+                except ValueError:
+                    return False
+            raise
+
+    def generate_verification_code(self, length: int = 32) -> str:
+        random_bytes = nacl.utils.random(length)
+        return base64.urlsafe_b64encode(random_bytes)[:length].decode("utf-8")
+
+    def generate_signing_keypair(self) -> Tuple[str, str, str]:
+        signing_key = nacl.signing.SigningKey.generate()
+        verify_key = signing_key.verify_key
+
+        # Generate unique key_id
+        key_entropy = nacl.utils.random(16)
+        key_id = f"sk_{base64.urlsafe_b64encode(key_entropy).decode()}"
+
+        private_key = base64.b64encode(bytes(signing_key)).decode()
+        public_key = base64.b64encode(bytes(verify_key)).decode()
+        return key_id, private_key, public_key
+
+    def sign_request(self, private_key: str, data: str) -> str:
+        try:
+            key_bytes = base64.b64decode(private_key)
+            signing_key = nacl.signing.SigningKey(key_bytes)
+            signature = signing_key.sign(data.encode())
+            return base64.b64encode(signature.signature).decode()
+        except Exception as e:
+            raise ValueError(
+                f"Invalid private key or signing error: {str(e)}"
+            ) from e
+
+    def verify_request_signature(
+        self, public_key: str, signature: str, data: str
+    ) -> bool:
+        try:
+            key_bytes = base64.b64decode(public_key)
+            verify_key = nacl.signing.VerifyKey(key_bytes)
+            signature_bytes = base64.b64decode(signature)
+            verify_key.verify(data.encode(), signature_bytes)
+            return True
+        except (nacl.exceptions.BadSignatureError, ValueError):
+            return False
+
+    def generate_api_key(self) -> Tuple[str, str]:
+        # Similar approach as with NaCl provider:
+        key_id_bytes = nacl.utils.random(16)
+        key_id = f"key_{base64.urlsafe_b64encode(key_id_bytes).decode()}"
+
+        # Generate raw API key
+        raw_api_key = base64.urlsafe_b64encode(
+            nacl.utils.random(self.config.api_key_bytes)
+        ).decode()
+        return key_id, raw_api_key
+
+    def hash_api_key(self, raw_api_key: str) -> str:
+        # Hash with bcrypt
+        hashed = bcrypt.hashpw(
+            raw_api_key.encode("utf-8"),
+            bcrypt.gensalt(rounds=self.config.bcrypt_rounds),
+        )
+        return base64.b64encode(hashed).decode("utf-8")
+
+    def verify_api_key(self, raw_api_key: str, hashed_key: str) -> bool:
+        stored_hash = base64.b64decode(hashed_key.encode("utf-8"))
+        return bcrypt.checkpw(raw_api_key.encode("utf-8"), stored_hash)
+
+    def generate_secure_token(self, data: dict, expiry: datetime) -> str:
+        now = datetime.now(timezone.utc)
+        to_encode = {
+            **data,
+            "exp": expiry.timestamp(),
+            "iat": now.timestamp(),
+            "nbf": now.timestamp(),
+            "jti": base64.urlsafe_b64encode(nacl.utils.random(16)).decode(),
+            "nonce": base64.urlsafe_b64encode(nacl.utils.random(16)).decode(),
+        }
+        return jwt.encode(to_encode, self.secret_key, algorithm="HS256")
+
+    def verify_secure_token(self, token: str) -> Optional[dict]:
+        try:
+            payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
+            exp = payload.get("exp")
+            if exp is None or datetime.fromtimestamp(
+                exp, tz=timezone.utc
+            ) < datetime.now(timezone.utc):
+                return None
+            return payload
+        except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
+            return None
diff --git a/.venv/lib/python3.12/site-packages/core/providers/crypto/nacl.py b/.venv/lib/python3.12/site-packages/core/providers/crypto/nacl.py
new file mode 100644
index 00000000..63232565
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/crypto/nacl.py
@@ -0,0 +1,181 @@
+import base64
+import logging
+import os
+import string
+from datetime import datetime, timezone
+from typing import Optional, Tuple
+
+import jwt
+import nacl.encoding
+import nacl.exceptions
+import nacl.pwhash
+import nacl.signing
+from nacl.exceptions import BadSignatureError
+from nacl.pwhash import argon2i
+
+from core.base import CryptoConfig, CryptoProvider
+
+DEFAULT_NACL_SECRET_KEY = "wNFbczH3QhUVcPALwtWZCPi0lrDlGV3P1DPRVEQCPbM"  # Replace or load from env or secrets manager
+
+
+def encode_bytes_readable(random_bytes: bytes, chars: str) -> str:
+    """Convert random bytes to a readable string using the given character
+    set."""
+    # Each byte gives us 8 bits of randomness
+    # We use modulo to map each byte to our character set
+    result = []
+    for byte in random_bytes:
+        # Use modulo to map the byte (0-255) to our character set length
+        idx = byte % len(chars)
+        result.append(chars[idx])
+    return "".join(result)
+
+
+class NaClCryptoConfig(CryptoConfig):
+    provider: str = "nacl"
+    # Interactive parameters for password ops (fast)
+    ops_limit: int = argon2i.OPSLIMIT_MIN
+    mem_limit: int = argon2i.MEMLIMIT_MIN
+    # Sensitive parameters for API key generation (slow but more secure)
+    api_ops_limit: int = argon2i.OPSLIMIT_INTERACTIVE
+    api_mem_limit: int = argon2i.MEMLIMIT_INTERACTIVE
+    api_key_bytes: int = 32
+    secret_key: Optional[str] = None
+
+
+class NaClCryptoProvider(CryptoProvider):
+    def __init__(self, config: NaClCryptoConfig):
+        if not isinstance(config, NaClCryptoConfig):
+            raise ValueError(
+                "NaClCryptoProvider must be initialized with a NaClCryptoConfig"
+            )
+        super().__init__(config)
+        self.config: NaClCryptoConfig = config
+        logging.info("Initializing NaClCryptoProvider")
+
+        # Securely load the secret key for JWT
+        # Priority: config.secret_key > environment variable > default
+        self.secret_key = (
+            config.secret_key
+            or os.getenv("R2R_SECRET_KEY")
+            or DEFAULT_NACL_SECRET_KEY
+        )
+
+    def get_password_hash(self, password: str) -> str:
+        password_bytes = password.encode("utf-8")
+        hashed = nacl.pwhash.argon2i.str(
+            password_bytes,
+            opslimit=self.config.ops_limit,
+            memlimit=self.config.mem_limit,
+        )
+        return base64.b64encode(hashed).decode("utf-8")
+
+    def verify_password(
+        self, plain_password: str, hashed_password: str
+    ) -> bool:
+        try:
+            stored_hash = base64.b64decode(hashed_password.encode("utf-8"))
+            nacl.pwhash.verify(stored_hash, plain_password.encode("utf-8"))
+            return True
+        except nacl.exceptions.InvalidkeyError:
+            return False
+
+    def generate_verification_code(self, length: int = 32) -> str:
+        random_bytes = nacl.utils.random(length)
+        return base64.urlsafe_b64encode(random_bytes)[:length].decode("utf-8")
+
+    def generate_api_key(self) -> Tuple[str, str]:
+        # Define our character set (excluding ambiguous characters)
+        chars = string.ascii_letters.replace("l", "").replace("I", "").replace(
+            "O", ""
+        ) + string.digits.replace("0", "").replace("1", "")
+
+        # Generate a unique key_id
+        key_id_bytes = nacl.utils.random(16)  # 16 random bytes
+        key_id = f"pk_{encode_bytes_readable(key_id_bytes, chars)}"
+
+        # Generate a high-entropy API key
+        raw_api_key = f"sk_{encode_bytes_readable(nacl.utils.random(self.config.api_key_bytes), chars)}"
+
+        # The caller will store the hashed version in the database
+        return key_id, raw_api_key
+
+    def hash_api_key(self, raw_api_key: str) -> str:
+        hashed = nacl.pwhash.argon2i.str(
+            raw_api_key.encode("utf-8"),
+            opslimit=self.config.api_ops_limit,
+            memlimit=self.config.api_mem_limit,
+        )
+        return base64.b64encode(hashed).decode("utf-8")
+
+    def verify_api_key(self, raw_api_key: str, hashed_key: str) -> bool:
+        try:
+            stored_hash = base64.b64decode(hashed_key.encode("utf-8"))
+            nacl.pwhash.verify(stored_hash, raw_api_key.encode("utf-8"))
+            return True
+        except nacl.exceptions.InvalidkeyError:
+            return False
+
+    def sign_request(self, private_key: str, data: str) -> str:
+        try:
+            key_bytes = base64.b64decode(private_key)
+            signing_key = nacl.signing.SigningKey(key_bytes)
+            signature = signing_key.sign(data.encode())
+            return base64.b64encode(signature.signature).decode()
+        except Exception as e:
+            raise ValueError(
+                f"Invalid private key or signing error: {str(e)}"
+            ) from e
+
+    def verify_request_signature(
+        self, public_key: str, signature: str, data: str
+    ) -> bool:
+        try:
+            key_bytes = base64.b64decode(public_key)
+            verify_key = nacl.signing.VerifyKey(key_bytes)
+            signature_bytes = base64.b64decode(signature)
+            verify_key.verify(data.encode(), signature_bytes)
+            return True
+        except (BadSignatureError, ValueError):
+            return False
+
+    def generate_secure_token(self, data: dict, expiry: datetime) -> str:
+        """Generate a secure token using JWT with HS256.
+
+        The secret_key is used for symmetrical signing.
+        """
+        now = datetime.now(timezone.utc)
+        to_encode = {
+            **data,
+            "exp": expiry.timestamp(),
+            "iat": now.timestamp(),
+            "nbf": now.timestamp(),
+            "jti": base64.urlsafe_b64encode(nacl.utils.random(16)).decode(),
+            "nonce": base64.urlsafe_b64encode(nacl.utils.random(16)).decode(),
+        }
+
+        return jwt.encode(to_encode, self.secret_key, algorithm="HS256")
+
+    def verify_secure_token(self, token: str) -> Optional[dict]:
+        """Verify a secure token using the shared secret_key and JWT."""
+        try:
+            payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
+            exp = payload.get("exp")
+            if exp is None or datetime.fromtimestamp(
+                exp, tz=timezone.utc
+            ) < datetime.now(timezone.utc):
+                return None
+            return payload
+        except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
+            return None
+
+    def generate_signing_keypair(self) -> Tuple[str, str, str]:
+        signing_key = nacl.signing.SigningKey.generate()
+        private_key_b64 = base64.b64encode(signing_key.encode()).decode()
+        public_key_b64 = base64.b64encode(
+            signing_key.verify_key.encode()
+        ).decode()
+        # Generate a unique key_id
+        key_id_bytes = nacl.utils.random(16)
+        key_id = f"sign_{base64.urlsafe_b64encode(key_id_bytes).decode()}"
+        return (key_id, private_key_b64, public_key_b64)