about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/secret_managers/main.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/secret_managers/main.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/secret_managers/main.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/secret_managers/main.py354
1 files changed, 354 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/secret_managers/main.py b/.venv/lib/python3.12/site-packages/litellm/secret_managers/main.py
new file mode 100644
index 00000000..e505484b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/secret_managers/main.py
@@ -0,0 +1,354 @@
+import ast
+import base64
+import binascii
+import os
+import traceback
+from typing import Any, Optional, Union
+
+import httpx
+
+import litellm
+from litellm._logging import print_verbose, verbose_logger
+from litellm.caching.caching import DualCache
+from litellm.llms.custom_httpx.http_handler import HTTPHandler
+from litellm.proxy._types import KeyManagementSystem
+
+oidc_cache = DualCache()
+
+
+######### Secret Manager ############################
+# checks if user has passed in a secret manager client
+# if passed in then checks the secret there
+def _is_base64(s):
+    try:
+        return base64.b64encode(base64.b64decode(s)).decode() == s
+    except binascii.Error:
+        return False
+
+
+def str_to_bool(value: Optional[str]) -> Optional[bool]:
+    """
+    Converts a string to a boolean if it's a recognized boolean string.
+    Returns None if the string is not a recognized boolean value.
+
+    :param value: The string to be checked.
+    :return: True or False if the string is a recognized boolean, otherwise None.
+    """
+    if value is None:
+        return None
+
+    true_values = {"true"}
+    false_values = {"false"}
+
+    value_lower = value.strip().lower()
+
+    if value_lower in true_values:
+        return True
+    elif value_lower in false_values:
+        return False
+    else:
+        return None
+
+
+def get_secret_str(
+    secret_name: str,
+    default_value: Optional[Union[str, bool]] = None,
+) -> Optional[str]:
+    """
+    Guarantees response from 'get_secret' is either string or none. Used for fixing linting errors.
+    """
+    value = get_secret(secret_name=secret_name, default_value=default_value)
+    if value is not None and not isinstance(value, str):
+        return None
+
+    return value
+
+
+def get_secret_bool(
+    secret_name: str,
+    default_value: Optional[bool] = None,
+) -> Optional[bool]:
+    """
+    Guarantees response from 'get_secret' is either boolean or none. Used for fixing linting errors.
+
+    Args:
+        secret_name: The name of the secret to get.
+        default_value: The default value to return if the secret is not found.
+
+    Returns:
+        The secret value as a boolean or None if the secret is not found.
+    """
+    _secret_value = get_secret(secret_name, default_value)
+    if _secret_value is None:
+        return None
+    elif isinstance(_secret_value, bool):
+        return _secret_value
+    else:
+        return str_to_bool(_secret_value)
+
+
+def get_secret(  # noqa: PLR0915
+    secret_name: str,
+    default_value: Optional[Union[str, bool]] = None,
+):
+    key_management_system = litellm._key_management_system
+    key_management_settings = litellm._key_management_settings
+    secret = None
+
+    if secret_name.startswith("os.environ/"):
+        secret_name = secret_name.replace("os.environ/", "")
+
+    # Example: oidc/google/https://bedrock-runtime.us-east-1.amazonaws.com/model/stability.stable-diffusion-xl-v1/invoke
+    if secret_name.startswith("oidc/"):
+        secret_name_split = secret_name.replace("oidc/", "")
+        oidc_provider, oidc_aud = secret_name_split.split("/", 1)
+        # TODO: Add caching for HTTP requests
+        if oidc_provider == "google":
+            oidc_token = oidc_cache.get_cache(key=secret_name)
+            if oidc_token is not None:
+                return oidc_token
+
+            oidc_client = HTTPHandler(timeout=httpx.Timeout(timeout=600.0, connect=5.0))
+            # https://cloud.google.com/compute/docs/instances/verifying-instance-identity#request_signature
+            response = oidc_client.get(
+                "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/identity",
+                params={"audience": oidc_aud},
+                headers={"Metadata-Flavor": "Google"},
+            )
+            if response.status_code == 200:
+                oidc_token = response.text
+                oidc_cache.set_cache(key=secret_name, value=oidc_token, ttl=3600 - 60)
+                return oidc_token
+            else:
+                raise ValueError("Google OIDC provider failed")
+        elif oidc_provider == "circleci":
+            # https://circleci.com/docs/openid-connect-tokens/
+            env_secret = os.getenv("CIRCLE_OIDC_TOKEN")
+            if env_secret is None:
+                raise ValueError("CIRCLE_OIDC_TOKEN not found in environment")
+            return env_secret
+        elif oidc_provider == "circleci_v2":
+            # https://circleci.com/docs/openid-connect-tokens/
+            env_secret = os.getenv("CIRCLE_OIDC_TOKEN_V2")
+            if env_secret is None:
+                raise ValueError("CIRCLE_OIDC_TOKEN_V2 not found in environment")
+            return env_secret
+        elif oidc_provider == "github":
+            # https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/configuring-openid-connect-in-cloud-providers#using-custom-actions
+            actions_id_token_request_url = os.getenv("ACTIONS_ID_TOKEN_REQUEST_URL")
+            actions_id_token_request_token = os.getenv("ACTIONS_ID_TOKEN_REQUEST_TOKEN")
+            if (
+                actions_id_token_request_url is None
+                or actions_id_token_request_token is None
+            ):
+                raise ValueError(
+                    "ACTIONS_ID_TOKEN_REQUEST_URL or ACTIONS_ID_TOKEN_REQUEST_TOKEN not found in environment"
+                )
+
+            oidc_token = oidc_cache.get_cache(key=secret_name)
+            if oidc_token is not None:
+                return oidc_token
+
+            oidc_client = HTTPHandler(timeout=httpx.Timeout(timeout=600.0, connect=5.0))
+            response = oidc_client.get(
+                actions_id_token_request_url,
+                params={"audience": oidc_aud},
+                headers={
+                    "Authorization": f"Bearer {actions_id_token_request_token}",
+                    "Accept": "application/json; api-version=2.0",
+                },
+            )
+            if response.status_code == 200:
+                oidc_token = response.json().get("value", None)
+                oidc_cache.set_cache(key=secret_name, value=oidc_token, ttl=300 - 5)
+                return oidc_token
+            else:
+                raise ValueError("Github OIDC provider failed")
+        elif oidc_provider == "azure":
+            # https://azure.github.io/azure-workload-identity/docs/quick-start.html
+            azure_federated_token_file = os.getenv("AZURE_FEDERATED_TOKEN_FILE")
+            if azure_federated_token_file is None:
+                raise ValueError("AZURE_FEDERATED_TOKEN_FILE not found in environment")
+            with open(azure_federated_token_file, "r") as f:
+                oidc_token = f.read()
+                return oidc_token
+        elif oidc_provider == "file":
+            # Load token from a file
+            with open(oidc_aud, "r") as f:
+                oidc_token = f.read()
+                return oidc_token
+        elif oidc_provider == "env":
+            # Load token directly from an environment variable
+            oidc_token = os.getenv(oidc_aud)
+            if oidc_token is None:
+                raise ValueError(f"Environment variable {oidc_aud} not found")
+            return oidc_token
+        elif oidc_provider == "env_path":
+            # Load token from a file path specified in an environment variable
+            token_file_path = os.getenv(oidc_aud)
+            if token_file_path is None:
+                raise ValueError(f"Environment variable {oidc_aud} not found")
+            with open(token_file_path, "r") as f:
+                oidc_token = f.read()
+                return oidc_token
+        else:
+            raise ValueError("Unsupported OIDC provider")
+
+    try:
+        if (
+            _should_read_secret_from_secret_manager()
+            and litellm.secret_manager_client is not None
+        ):
+            try:
+                client = litellm.secret_manager_client
+                key_manager = "local"
+                if key_management_system is not None:
+                    key_manager = key_management_system.value
+
+                if key_management_settings is not None:
+                    if (
+                        key_management_settings.hosted_keys is not None
+                        and secret_name not in key_management_settings.hosted_keys
+                    ):  # allow user to specify which keys to check in hosted key manager
+                        key_manager = "local"
+
+                if (
+                    key_manager == KeyManagementSystem.AZURE_KEY_VAULT.value
+                    or type(client).__module__ + "." + type(client).__name__
+                    == "azure.keyvault.secrets._client.SecretClient"
+                ):  # support Azure Secret Client - from azure.keyvault.secrets import SecretClient
+                    secret = client.get_secret(secret_name).value
+                elif (
+                    key_manager == KeyManagementSystem.GOOGLE_KMS.value
+                    or client.__class__.__name__ == "KeyManagementServiceClient"
+                ):
+                    encrypted_secret: Any = os.getenv(secret_name)
+                    if encrypted_secret is None:
+                        raise ValueError(
+                            "Google KMS requires the encrypted secret to be in the environment!"
+                        )
+                    b64_flag = _is_base64(encrypted_secret)
+                    if b64_flag is True:  # if passed in as encoded b64 string
+                        encrypted_secret = base64.b64decode(encrypted_secret)
+                        ciphertext = encrypted_secret
+                    else:
+                        raise ValueError(
+                            "Google KMS requires the encrypted secret to be encoded in base64"
+                        )  # fix for this vulnerability https://huntr.com/bounties/ae623c2f-b64b-4245-9ed4-f13a0a5824ce
+                    response = client.decrypt(
+                        request={
+                            "name": litellm._google_kms_resource_name,
+                            "ciphertext": ciphertext,
+                        }
+                    )
+                    secret = response.plaintext.decode(
+                        "utf-8"
+                    )  # assumes the original value was encoded with utf-8
+                elif key_manager == KeyManagementSystem.AWS_KMS.value:
+                    """
+                    Only check the tokens which start with 'aws_kms/'. This prevents latency impact caused by checking all keys.
+                    """
+                    encrypted_value = os.getenv(secret_name, None)
+                    if encrypted_value is None:
+                        raise Exception(
+                            "AWS KMS - Encrypted Value of Key={} is None".format(
+                                secret_name
+                            )
+                        )
+                    # Decode the base64 encoded ciphertext
+                    ciphertext_blob = base64.b64decode(encrypted_value)
+
+                    # Set up the parameters for the decrypt call
+                    params = {"CiphertextBlob": ciphertext_blob}
+                    # Perform the decryption
+                    response = client.decrypt(**params)
+
+                    # Extract and decode the plaintext
+                    plaintext = response["Plaintext"]
+                    secret = plaintext.decode("utf-8")
+                    if isinstance(secret, str):
+                        secret = secret.strip()
+                elif key_manager == KeyManagementSystem.AWS_SECRET_MANAGER.value:
+                    from litellm.secret_managers.aws_secret_manager_v2 import (
+                        AWSSecretsManagerV2,
+                    )
+
+                    if isinstance(client, AWSSecretsManagerV2):
+                        secret = client.sync_read_secret(
+                            secret_name=secret_name,
+                            primary_secret_name=key_management_settings.primary_secret_name,
+                        )
+                        print_verbose(f"get_secret_value_response: {secret}")
+                elif key_manager == KeyManagementSystem.GOOGLE_SECRET_MANAGER.value:
+                    try:
+                        secret = client.get_secret_from_google_secret_manager(
+                            secret_name
+                        )
+                        print_verbose(f"secret from google secret manager:  {secret}")
+                        if secret is None:
+                            raise ValueError(
+                                f"No secret found in Google Secret Manager for {secret_name}"
+                            )
+                    except Exception as e:
+                        print_verbose(f"An error occurred - {str(e)}")
+                        raise e
+                elif key_manager == KeyManagementSystem.HASHICORP_VAULT.value:
+                    try:
+                        secret = client.sync_read_secret(secret_name=secret_name)
+                        if secret is None:
+                            raise ValueError(
+                                f"No secret found in Hashicorp Secret Manager for {secret_name}"
+                            )
+                    except Exception as e:
+                        print_verbose(f"An error occurred - {str(e)}")
+                        raise e
+                elif key_manager == "local":
+                    secret = os.getenv(secret_name)
+                else:  # assume the default is infisicial client
+                    secret = client.get_secret(secret_name).secret_value
+            except Exception as e:  # check if it's in os.environ
+                verbose_logger.error(
+                    f"Defaulting to os.environ value for key={secret_name}. An exception occurred - {str(e)}.\n\n{traceback.format_exc()}"
+                )
+                secret = os.getenv(secret_name)
+            try:
+                if isinstance(secret, str):
+                    secret_value_as_bool = ast.literal_eval(secret)
+                    if isinstance(secret_value_as_bool, bool):
+                        return secret_value_as_bool
+                    else:
+                        return secret
+            except Exception:
+                return secret
+        else:
+            secret = os.environ.get(secret_name)
+            secret_value_as_bool = str_to_bool(secret) if secret is not None else None
+            if secret_value_as_bool is not None and isinstance(
+                secret_value_as_bool, bool
+            ):
+                return secret_value_as_bool
+            else:
+                return secret
+    except Exception as e:
+        if default_value is not None:
+            return default_value
+        else:
+            raise e
+
+
+def _should_read_secret_from_secret_manager() -> bool:
+    """
+    Returns True if the secret manager should be used to read the secret, False otherwise
+
+    - If the secret manager client is not set, return False
+    - If the `_key_management_settings` access mode is "read_only" or "read_and_write", return True
+    - Otherwise, return False
+    """
+    if litellm.secret_manager_client is not None:
+        if litellm._key_management_settings is not None:
+            if (
+                litellm._key_management_settings.access_mode == "read_only"
+                or litellm._key_management_settings.access_mode == "read_and_write"
+            ):
+                return True
+    return False