aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/core/providers/crypto
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/providers/crypto
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/providers/crypto')
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/crypto/__init__.py9
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/crypto/bcrypt.py195
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/crypto/nacl.py181
3 files changed, 385 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/providers/crypto/__init__.py b/.venv/lib/python3.12/site-packages/core/providers/crypto/__init__.py
new file mode 100644
index 00000000..e509f990
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/crypto/__init__.py
@@ -0,0 +1,9 @@
+from .bcrypt import BcryptCryptoConfig, BCryptCryptoProvider
+from .nacl import NaClCryptoConfig, NaClCryptoProvider
+
+__all__ = [
+ "BCryptCryptoProvider",
+ "BcryptCryptoConfig",
+ "NaClCryptoConfig",
+ "NaClCryptoProvider",
+]
diff --git a/.venv/lib/python3.12/site-packages/core/providers/crypto/bcrypt.py b/.venv/lib/python3.12/site-packages/core/providers/crypto/bcrypt.py
new file mode 100644
index 00000000..9c39977c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/crypto/bcrypt.py
@@ -0,0 +1,195 @@
+import base64
+import logging
+import os
+from abc import ABC
+from datetime import datetime, timezone
+from typing import Optional, Tuple
+
+import bcrypt
+import jwt
+import nacl.encoding
+import nacl.exceptions
+import nacl.signing
+import nacl.utils
+
+from core.base import CryptoConfig, CryptoProvider
+
+DEFAULT_BCRYPT_SECRET_KEY = "wNFbczH3QhUVcPALwtWZCPi0lrDlGV3P1DPRVEQCPbM" # Replace or load from env or secrets manager
+
+
+class BcryptCryptoConfig(CryptoConfig):
+ provider: str = "bcrypt"
+ # Number of rounds for bcrypt (increasing this makes hashing slower but more secure)
+ bcrypt_rounds: int = 12
+ secret_key: Optional[str] = None
+ api_key_bytes: int = 32 # Length of raw API keys
+
+ @property
+ def supported_providers(self) -> list[str]:
+ return ["bcrypt"]
+
+ def validate_config(self) -> None:
+ super().validate_config()
+ if self.provider not in self.supported_providers:
+ raise ValueError(f"Unsupported crypto provider: {self.provider}")
+ if self.bcrypt_rounds < 4 or self.bcrypt_rounds > 31:
+ raise ValueError("bcrypt_rounds must be between 4 and 31")
+
+ def verify_password(
+ self, plain_password: str, hashed_password: str
+ ) -> bool:
+ try:
+ # First try to decode as base64 (new format)
+ stored_hash = base64.b64decode(hashed_password.encode("utf-8"))
+ except Exception:
+ # If that fails, treat as raw bcrypt hash (old format)
+ stored_hash = hashed_password.encode("utf-8")
+
+ return bcrypt.checkpw(plain_password.encode("utf-8"), stored_hash)
+
+
+class BCryptCryptoProvider(CryptoProvider, ABC):
+ def __init__(self, config: BcryptCryptoConfig):
+ if not isinstance(config, BcryptCryptoConfig):
+ raise ValueError(
+ "BcryptCryptoProvider must be initialized with a BcryptCryptoConfig"
+ )
+ logging.info("Initializing BcryptCryptoProvider")
+ super().__init__(config)
+ self.config: BcryptCryptoConfig = config
+
+ # Load the secret key for JWT
+ # No fallback defaults: fail if not provided
+ self.secret_key = (
+ config.secret_key
+ or os.getenv("R2R_SECRET_KEY")
+ or DEFAULT_BCRYPT_SECRET_KEY
+ )
+ if not self.secret_key:
+ raise ValueError(
+ "No secret key provided for BcryptCryptoProvider."
+ )
+
+ def get_password_hash(self, password: str) -> str:
+ # Bcrypt expects bytes
+ password_bytes = password.encode("utf-8")
+ hashed = bcrypt.hashpw(
+ password_bytes, bcrypt.gensalt(rounds=self.config.bcrypt_rounds)
+ )
+ return base64.b64encode(hashed).decode("utf-8")
+
+ def verify_password(
+ self, plain_password: str, hashed_password: str
+ ) -> bool:
+ try:
+ # First try to decode as base64 (new format)
+ stored_hash = base64.b64decode(hashed_password.encode("utf-8"))
+ if not stored_hash.startswith(b"$2b$"): # Valid bcrypt hash prefix
+ stored_hash = hashed_password.encode("utf-8")
+ except Exception:
+ # Otherwise raw bcrypt hash (old format)
+ stored_hash = hashed_password.encode("utf-8")
+
+ try:
+ return bcrypt.checkpw(plain_password.encode("utf-8"), stored_hash)
+ except ValueError as e:
+ if "Invalid salt" in str(e):
+ # If it's an invalid salt, the hash format is wrong - try the other format
+ try:
+ stored_hash = (
+ hashed_password
+ if isinstance(hashed_password, bytes)
+ else hashed_password.encode("utf-8")
+ )
+ return bcrypt.checkpw(
+ plain_password.encode("utf-8"), stored_hash
+ )
+ except ValueError:
+ return False
+ raise
+
+ def generate_verification_code(self, length: int = 32) -> str:
+ random_bytes = nacl.utils.random(length)
+ return base64.urlsafe_b64encode(random_bytes)[:length].decode("utf-8")
+
+ def generate_signing_keypair(self) -> Tuple[str, str, str]:
+ signing_key = nacl.signing.SigningKey.generate()
+ verify_key = signing_key.verify_key
+
+ # Generate unique key_id
+ key_entropy = nacl.utils.random(16)
+ key_id = f"sk_{base64.urlsafe_b64encode(key_entropy).decode()}"
+
+ private_key = base64.b64encode(bytes(signing_key)).decode()
+ public_key = base64.b64encode(bytes(verify_key)).decode()
+ return key_id, private_key, public_key
+
+ def sign_request(self, private_key: str, data: str) -> str:
+ try:
+ key_bytes = base64.b64decode(private_key)
+ signing_key = nacl.signing.SigningKey(key_bytes)
+ signature = signing_key.sign(data.encode())
+ return base64.b64encode(signature.signature).decode()
+ except Exception as e:
+ raise ValueError(
+ f"Invalid private key or signing error: {str(e)}"
+ ) from e
+
+ def verify_request_signature(
+ self, public_key: str, signature: str, data: str
+ ) -> bool:
+ try:
+ key_bytes = base64.b64decode(public_key)
+ verify_key = nacl.signing.VerifyKey(key_bytes)
+ signature_bytes = base64.b64decode(signature)
+ verify_key.verify(data.encode(), signature_bytes)
+ return True
+ except (nacl.exceptions.BadSignatureError, ValueError):
+ return False
+
+ def generate_api_key(self) -> Tuple[str, str]:
+ # Similar approach as with NaCl provider:
+ key_id_bytes = nacl.utils.random(16)
+ key_id = f"key_{base64.urlsafe_b64encode(key_id_bytes).decode()}"
+
+ # Generate raw API key
+ raw_api_key = base64.urlsafe_b64encode(
+ nacl.utils.random(self.config.api_key_bytes)
+ ).decode()
+ return key_id, raw_api_key
+
+ def hash_api_key(self, raw_api_key: str) -> str:
+ # Hash with bcrypt
+ hashed = bcrypt.hashpw(
+ raw_api_key.encode("utf-8"),
+ bcrypt.gensalt(rounds=self.config.bcrypt_rounds),
+ )
+ return base64.b64encode(hashed).decode("utf-8")
+
+ def verify_api_key(self, raw_api_key: str, hashed_key: str) -> bool:
+ stored_hash = base64.b64decode(hashed_key.encode("utf-8"))
+ return bcrypt.checkpw(raw_api_key.encode("utf-8"), stored_hash)
+
+ def generate_secure_token(self, data: dict, expiry: datetime) -> str:
+ now = datetime.now(timezone.utc)
+ to_encode = {
+ **data,
+ "exp": expiry.timestamp(),
+ "iat": now.timestamp(),
+ "nbf": now.timestamp(),
+ "jti": base64.urlsafe_b64encode(nacl.utils.random(16)).decode(),
+ "nonce": base64.urlsafe_b64encode(nacl.utils.random(16)).decode(),
+ }
+ return jwt.encode(to_encode, self.secret_key, algorithm="HS256")
+
+ def verify_secure_token(self, token: str) -> Optional[dict]:
+ try:
+ payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
+ exp = payload.get("exp")
+ if exp is None or datetime.fromtimestamp(
+ exp, tz=timezone.utc
+ ) < datetime.now(timezone.utc):
+ return None
+ return payload
+ except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
+ return None
diff --git a/.venv/lib/python3.12/site-packages/core/providers/crypto/nacl.py b/.venv/lib/python3.12/site-packages/core/providers/crypto/nacl.py
new file mode 100644
index 00000000..63232565
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/crypto/nacl.py
@@ -0,0 +1,181 @@
+import base64
+import logging
+import os
+import string
+from datetime import datetime, timezone
+from typing import Optional, Tuple
+
+import jwt
+import nacl.encoding
+import nacl.exceptions
+import nacl.pwhash
+import nacl.signing
+from nacl.exceptions import BadSignatureError
+from nacl.pwhash import argon2i
+
+from core.base import CryptoConfig, CryptoProvider
+
+DEFAULT_NACL_SECRET_KEY = "wNFbczH3QhUVcPALwtWZCPi0lrDlGV3P1DPRVEQCPbM" # Replace or load from env or secrets manager
+
+
+def encode_bytes_readable(random_bytes: bytes, chars: str) -> str:
+ """Convert random bytes to a readable string using the given character
+ set."""
+ # Each byte gives us 8 bits of randomness
+ # We use modulo to map each byte to our character set
+ result = []
+ for byte in random_bytes:
+ # Use modulo to map the byte (0-255) to our character set length
+ idx = byte % len(chars)
+ result.append(chars[idx])
+ return "".join(result)
+
+
+class NaClCryptoConfig(CryptoConfig):
+ provider: str = "nacl"
+ # Interactive parameters for password ops (fast)
+ ops_limit: int = argon2i.OPSLIMIT_MIN
+ mem_limit: int = argon2i.MEMLIMIT_MIN
+ # Sensitive parameters for API key generation (slow but more secure)
+ api_ops_limit: int = argon2i.OPSLIMIT_INTERACTIVE
+ api_mem_limit: int = argon2i.MEMLIMIT_INTERACTIVE
+ api_key_bytes: int = 32
+ secret_key: Optional[str] = None
+
+
+class NaClCryptoProvider(CryptoProvider):
+ def __init__(self, config: NaClCryptoConfig):
+ if not isinstance(config, NaClCryptoConfig):
+ raise ValueError(
+ "NaClCryptoProvider must be initialized with a NaClCryptoConfig"
+ )
+ super().__init__(config)
+ self.config: NaClCryptoConfig = config
+ logging.info("Initializing NaClCryptoProvider")
+
+ # Securely load the secret key for JWT
+ # Priority: config.secret_key > environment variable > default
+ self.secret_key = (
+ config.secret_key
+ or os.getenv("R2R_SECRET_KEY")
+ or DEFAULT_NACL_SECRET_KEY
+ )
+
+ def get_password_hash(self, password: str) -> str:
+ password_bytes = password.encode("utf-8")
+ hashed = nacl.pwhash.argon2i.str(
+ password_bytes,
+ opslimit=self.config.ops_limit,
+ memlimit=self.config.mem_limit,
+ )
+ return base64.b64encode(hashed).decode("utf-8")
+
+ def verify_password(
+ self, plain_password: str, hashed_password: str
+ ) -> bool:
+ try:
+ stored_hash = base64.b64decode(hashed_password.encode("utf-8"))
+ nacl.pwhash.verify(stored_hash, plain_password.encode("utf-8"))
+ return True
+ except nacl.exceptions.InvalidkeyError:
+ return False
+
+ def generate_verification_code(self, length: int = 32) -> str:
+ random_bytes = nacl.utils.random(length)
+ return base64.urlsafe_b64encode(random_bytes)[:length].decode("utf-8")
+
+ def generate_api_key(self) -> Tuple[str, str]:
+ # Define our character set (excluding ambiguous characters)
+ chars = string.ascii_letters.replace("l", "").replace("I", "").replace(
+ "O", ""
+ ) + string.digits.replace("0", "").replace("1", "")
+
+ # Generate a unique key_id
+ key_id_bytes = nacl.utils.random(16) # 16 random bytes
+ key_id = f"pk_{encode_bytes_readable(key_id_bytes, chars)}"
+
+ # Generate a high-entropy API key
+ raw_api_key = f"sk_{encode_bytes_readable(nacl.utils.random(self.config.api_key_bytes), chars)}"
+
+ # The caller will store the hashed version in the database
+ return key_id, raw_api_key
+
+ def hash_api_key(self, raw_api_key: str) -> str:
+ hashed = nacl.pwhash.argon2i.str(
+ raw_api_key.encode("utf-8"),
+ opslimit=self.config.api_ops_limit,
+ memlimit=self.config.api_mem_limit,
+ )
+ return base64.b64encode(hashed).decode("utf-8")
+
+ def verify_api_key(self, raw_api_key: str, hashed_key: str) -> bool:
+ try:
+ stored_hash = base64.b64decode(hashed_key.encode("utf-8"))
+ nacl.pwhash.verify(stored_hash, raw_api_key.encode("utf-8"))
+ return True
+ except nacl.exceptions.InvalidkeyError:
+ return False
+
+ def sign_request(self, private_key: str, data: str) -> str:
+ try:
+ key_bytes = base64.b64decode(private_key)
+ signing_key = nacl.signing.SigningKey(key_bytes)
+ signature = signing_key.sign(data.encode())
+ return base64.b64encode(signature.signature).decode()
+ except Exception as e:
+ raise ValueError(
+ f"Invalid private key or signing error: {str(e)}"
+ ) from e
+
+ def verify_request_signature(
+ self, public_key: str, signature: str, data: str
+ ) -> bool:
+ try:
+ key_bytes = base64.b64decode(public_key)
+ verify_key = nacl.signing.VerifyKey(key_bytes)
+ signature_bytes = base64.b64decode(signature)
+ verify_key.verify(data.encode(), signature_bytes)
+ return True
+ except (BadSignatureError, ValueError):
+ return False
+
+ def generate_secure_token(self, data: dict, expiry: datetime) -> str:
+ """Generate a secure token using JWT with HS256.
+
+ The secret_key is used for symmetrical signing.
+ """
+ now = datetime.now(timezone.utc)
+ to_encode = {
+ **data,
+ "exp": expiry.timestamp(),
+ "iat": now.timestamp(),
+ "nbf": now.timestamp(),
+ "jti": base64.urlsafe_b64encode(nacl.utils.random(16)).decode(),
+ "nonce": base64.urlsafe_b64encode(nacl.utils.random(16)).decode(),
+ }
+
+ return jwt.encode(to_encode, self.secret_key, algorithm="HS256")
+
+ def verify_secure_token(self, token: str) -> Optional[dict]:
+ """Verify a secure token using the shared secret_key and JWT."""
+ try:
+ payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
+ exp = payload.get("exp")
+ if exp is None or datetime.fromtimestamp(
+ exp, tz=timezone.utc
+ ) < datetime.now(timezone.utc):
+ return None
+ return payload
+ except (jwt.ExpiredSignatureError, jwt.InvalidTokenError):
+ return None
+
+ def generate_signing_keypair(self) -> Tuple[str, str, str]:
+ signing_key = nacl.signing.SigningKey.generate()
+ private_key_b64 = base64.b64encode(signing_key.encode()).decode()
+ public_key_b64 = base64.b64encode(
+ signing_key.verify_key.encode()
+ ).decode()
+ # Generate a unique key_id
+ key_id_bytes = nacl.utils.random(16)
+ key_id = f"sign_{base64.urlsafe_b64encode(key_id_bytes).decode()}"
+ return (key_id, private_key_b64, public_key_b64)