aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/secret_managers
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/secret_managers
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/secret_managers')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/secret_managers/Readme.md3
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager.py143
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager_v2.py361
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/secret_managers/base_secret_manager.py176
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/secret_managers/get_azure_ad_token_provider.py38
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/secret_managers/google_kms.py43
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/secret_managers/google_secret_manager.py116
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/secret_managers/hashicorp_secret_manager.py318
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/secret_managers/main.py354
9 files changed, 1552 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/secret_managers/Readme.md b/.venv/lib/python3.12/site-packages/litellm/secret_managers/Readme.md
new file mode 100644
index 00000000..9b226890
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/secret_managers/Readme.md
@@ -0,0 +1,3 @@
+## Supported Secret Managers to read credentials from
+
+Example read OPENAI_API_KEY, AZURE_API_KEY from a secret manager \ No newline at end of file
diff --git a/.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager.py b/.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager.py
new file mode 100644
index 00000000..fbe951e6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager.py
@@ -0,0 +1,143 @@
+"""
+This is a file for the AWS Secret Manager Integration
+
+Relevant issue: https://github.com/BerriAI/litellm/issues/1883
+
+Requires:
+* `os.environ["AWS_REGION_NAME"],
+* `pip install boto3>=1.28.57`
+"""
+
+import ast
+import base64
+import os
+import re
+from typing import Any, Dict, Optional
+
+import litellm
+from litellm.proxy._types import KeyManagementSystem
+
+
+def validate_environment():
+ if "AWS_REGION_NAME" not in os.environ:
+ raise ValueError("Missing required environment variable - AWS_REGION_NAME")
+
+
+def load_aws_kms(use_aws_kms: Optional[bool]):
+ if use_aws_kms is None or use_aws_kms is False:
+ return
+ try:
+ import boto3
+
+ validate_environment()
+
+ # Create a Secrets Manager client
+ kms_client = boto3.client("kms", region_name=os.getenv("AWS_REGION_NAME"))
+
+ litellm.secret_manager_client = kms_client
+ litellm._key_management_system = KeyManagementSystem.AWS_KMS
+
+ except Exception as e:
+ raise e
+
+
+class AWSKeyManagementService_V2:
+ """
+ V2 Clean Class for decrypting keys from AWS KeyManagementService
+ """
+
+ def __init__(self) -> None:
+ self.validate_environment()
+ self.kms_client = self.load_aws_kms(use_aws_kms=True)
+
+ def validate_environment(
+ self,
+ ):
+ if "AWS_REGION_NAME" not in os.environ:
+ raise ValueError("Missing required environment variable - AWS_REGION_NAME")
+
+ ## CHECK IF LICENSE IN ENV ## - premium feature
+ is_litellm_license_in_env: bool = False
+
+ if os.getenv("LITELLM_LICENSE", None) is not None:
+ is_litellm_license_in_env = True
+ elif os.getenv("LITELLM_SECRET_AWS_KMS_LITELLM_LICENSE", None) is not None:
+ is_litellm_license_in_env = True
+ if is_litellm_license_in_env is False:
+ raise ValueError(
+ "AWSKeyManagementService V2 is an Enterprise Feature. Please add a valid LITELLM_LICENSE to your envionment."
+ )
+
+ def load_aws_kms(self, use_aws_kms: Optional[bool]):
+ if use_aws_kms is None or use_aws_kms is False:
+ return
+ try:
+ import boto3
+
+ validate_environment()
+
+ # Create a Secrets Manager client
+ kms_client = boto3.client("kms", region_name=os.getenv("AWS_REGION_NAME"))
+
+ return kms_client
+ except Exception as e:
+ raise e
+
+ def decrypt_value(self, secret_name: str) -> Any:
+ if self.kms_client is None:
+ raise ValueError("kms_client is None")
+ encrypted_value = os.getenv(secret_name, None)
+ if encrypted_value is None:
+ raise Exception(
+ "AWS KMS - Encrypted Value of Key={} is None".format(secret_name)
+ )
+ if isinstance(encrypted_value, str) and encrypted_value.startswith("aws_kms/"):
+ encrypted_value = encrypted_value.replace("aws_kms/", "")
+
+ # Decode the base64 encoded ciphertext
+ ciphertext_blob = base64.b64decode(encrypted_value)
+
+ # Set up the parameters for the decrypt call
+ params = {"CiphertextBlob": ciphertext_blob}
+ # Perform the decryption
+ response = self.kms_client.decrypt(**params)
+
+ # Extract and decode the plaintext
+ plaintext = response["Plaintext"]
+ secret = plaintext.decode("utf-8")
+ if isinstance(secret, str):
+ secret = secret.strip()
+ try:
+ secret_value_as_bool = ast.literal_eval(secret)
+ if isinstance(secret_value_as_bool, bool):
+ return secret_value_as_bool
+ except Exception:
+ pass
+
+ return secret
+
+
+"""
+- look for all values in the env with `aws_kms/<hashed_key>`
+- decrypt keys
+- rewrite env var with decrypted key (). Note: this environment variable will only be available to the current process and any child processes spawned from it. Once the Python script ends, the environment variable will not persist.
+"""
+
+
+def decrypt_env_var() -> Dict[str, Any]:
+ # setup client class
+ aws_kms = AWSKeyManagementService_V2()
+ # iterate through env - for `aws_kms/`
+ new_values = {}
+ for k, v in os.environ.items():
+ if (
+ k is not None
+ and isinstance(k, str)
+ and k.lower().startswith("litellm_secret_aws_kms")
+ ) or (v is not None and isinstance(v, str) and v.startswith("aws_kms/")):
+ decrypted_value = aws_kms.decrypt_value(secret_name=k)
+ # reset env var
+ k = re.sub("litellm_secret_aws_kms_", "", k, flags=re.IGNORECASE)
+ new_values[k] = decrypted_value
+
+ return new_values
diff --git a/.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager_v2.py b/.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager_v2.py
new file mode 100644
index 00000000..fd89d6c5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager_v2.py
@@ -0,0 +1,361 @@
+"""
+This is a file for the AWS Secret Manager Integration
+
+Handles Async Operations for:
+- Read Secret
+- Write Secret
+- Delete Secret
+
+Relevant issue: https://github.com/BerriAI/litellm/issues/1883
+
+Requires:
+* `os.environ["AWS_REGION_NAME"],
+* `pip install boto3>=1.28.57`
+"""
+
+import json
+import os
+from typing import Any, Optional, Union
+
+import httpx
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.llms.bedrock.base_aws_llm import BaseAWSLLM
+from litellm.llms.custom_httpx.http_handler import (
+ _get_httpx_client,
+ get_async_httpx_client,
+)
+from litellm.proxy._types import KeyManagementSystem
+from litellm.types.llms.custom_http import httpxSpecialProvider
+
+from .base_secret_manager import BaseSecretManager
+
+
+class AWSSecretsManagerV2(BaseAWSLLM, BaseSecretManager):
+ def __init__(self, **kwargs):
+ BaseSecretManager.__init__(self, **kwargs)
+ BaseAWSLLM.__init__(self, **kwargs)
+
+ @classmethod
+ def validate_environment(cls):
+ if "AWS_REGION_NAME" not in os.environ:
+ raise ValueError("Missing required environment variable - AWS_REGION_NAME")
+
+ @classmethod
+ def load_aws_secret_manager(cls, use_aws_secret_manager: Optional[bool]):
+ """
+ Initialize AWSSecretsManagerV2 and sets litellm.secret_manager_client = AWSSecretsManagerV2() and litellm._key_management_system = KeyManagementSystem.AWS_SECRET_MANAGER
+ """
+ if use_aws_secret_manager is None or use_aws_secret_manager is False:
+ return
+ try:
+
+ cls.validate_environment()
+ litellm.secret_manager_client = cls()
+ litellm._key_management_system = KeyManagementSystem.AWS_SECRET_MANAGER
+
+ except Exception as e:
+ raise e
+
+ async def async_read_secret(
+ self,
+ secret_name: str,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ primary_secret_name: Optional[str] = None,
+ ) -> Optional[str]:
+ """
+ Async function to read a secret from AWS Secrets Manager
+
+ Returns:
+ str: Secret value
+ Raises:
+ ValueError: If the secret is not found or an HTTP error occurs
+ """
+ if primary_secret_name:
+ return await self.async_read_secret_from_primary_secret(
+ secret_name=secret_name, primary_secret_name=primary_secret_name
+ )
+
+ endpoint_url, headers, body = self._prepare_request(
+ action="GetSecretValue",
+ secret_name=secret_name,
+ optional_params=optional_params,
+ )
+
+ async_client = get_async_httpx_client(
+ llm_provider=httpxSpecialProvider.SecretManager,
+ params={"timeout": timeout},
+ )
+
+ try:
+ response = await async_client.post(
+ url=endpoint_url, headers=headers, data=body.decode("utf-8")
+ )
+ response.raise_for_status()
+ return response.json()["SecretString"]
+ except httpx.TimeoutException:
+ raise ValueError("Timeout error occurred")
+ except Exception as e:
+ verbose_logger.exception(
+ "Error reading secret='%s' from AWS Secrets Manager: %s",
+ secret_name,
+ str(e),
+ )
+ return None
+
+ def sync_read_secret(
+ self,
+ secret_name: str,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ primary_secret_name: Optional[str] = None,
+ ) -> Optional[str]:
+ """
+ Sync function to read a secret from AWS Secrets Manager
+
+ Done for backwards compatibility with existing codebase, since get_secret is a sync function
+ """
+ # self._prepare_request uses these env vars, we cannot read them from AWS Secrets Manager. If we do we'd get stuck in an infinite loop
+ if secret_name in [
+ "AWS_ACCESS_KEY_ID",
+ "AWS_SECRET_ACCESS_KEY",
+ "AWS_REGION_NAME",
+ "AWS_REGION",
+ "AWS_BEDROCK_RUNTIME_ENDPOINT",
+ ]:
+ return os.getenv(secret_name)
+
+ if primary_secret_name:
+ return self.sync_read_secret_from_primary_secret(
+ secret_name=secret_name, primary_secret_name=primary_secret_name
+ )
+
+ endpoint_url, headers, body = self._prepare_request(
+ action="GetSecretValue",
+ secret_name=secret_name,
+ optional_params=optional_params,
+ )
+
+ sync_client = _get_httpx_client(
+ params={"timeout": timeout},
+ )
+
+ try:
+ response = sync_client.post(
+ url=endpoint_url, headers=headers, data=body.decode("utf-8")
+ )
+ return response.json()["SecretString"]
+ except httpx.TimeoutException:
+ raise ValueError("Timeout error occurred")
+ except httpx.HTTPStatusError as e:
+ verbose_logger.exception(
+ "Error reading secret='%s' from AWS Secrets Manager: %s, %s",
+ secret_name,
+ str(e.response.text),
+ str(e.response.status_code),
+ )
+ except Exception as e:
+ verbose_logger.exception(
+ "Error reading secret='%s' from AWS Secrets Manager: %s",
+ secret_name,
+ str(e),
+ )
+ return None
+
+ def _parse_primary_secret(self, primary_secret_json_str: Optional[str]) -> dict:
+ """
+ Parse the primary secret JSON string into a dictionary
+
+ Args:
+ primary_secret_json_str: JSON string containing key-value pairs
+
+ Returns:
+ Dictionary of key-value pairs from the primary secret
+ """
+ return json.loads(primary_secret_json_str or "{}")
+
+ def sync_read_secret_from_primary_secret(
+ self, secret_name: str, primary_secret_name: str
+ ) -> Optional[str]:
+ """
+ Read a secret from the primary secret
+ """
+ primary_secret_json_str = self.sync_read_secret(secret_name=primary_secret_name)
+ primary_secret_kv_pairs = self._parse_primary_secret(primary_secret_json_str)
+ return primary_secret_kv_pairs.get(secret_name)
+
+ async def async_read_secret_from_primary_secret(
+ self, secret_name: str, primary_secret_name: str
+ ) -> Optional[str]:
+ """
+ Read a secret from the primary secret
+ """
+ primary_secret_json_str = await self.async_read_secret(
+ secret_name=primary_secret_name
+ )
+ primary_secret_kv_pairs = self._parse_primary_secret(primary_secret_json_str)
+ return primary_secret_kv_pairs.get(secret_name)
+
+ async def async_write_secret(
+ self,
+ secret_name: str,
+ secret_value: str,
+ description: Optional[str] = None,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> dict:
+ """
+ Async function to write a secret to AWS Secrets Manager
+
+ Args:
+ secret_name: Name of the secret
+ secret_value: Value to store (can be a JSON string)
+ description: Optional description for the secret
+ optional_params: Additional AWS parameters
+ timeout: Request timeout
+ """
+ import uuid
+
+ # Prepare the request data
+ data = {"Name": secret_name, "SecretString": secret_value}
+ if description:
+ data["Description"] = description
+
+ data["ClientRequestToken"] = str(uuid.uuid4())
+
+ endpoint_url, headers, body = self._prepare_request(
+ action="CreateSecret",
+ secret_name=secret_name,
+ secret_value=secret_value,
+ optional_params=optional_params,
+ request_data=data, # Pass the complete request data
+ )
+
+ async_client = get_async_httpx_client(
+ llm_provider=httpxSpecialProvider.SecretManager,
+ params={"timeout": timeout},
+ )
+
+ try:
+ response = await async_client.post(
+ url=endpoint_url, headers=headers, data=body.decode("utf-8")
+ )
+ response.raise_for_status()
+ return response.json()
+ except httpx.HTTPStatusError as err:
+ raise ValueError(f"HTTP error occurred: {err.response.text}")
+ except httpx.TimeoutException:
+ raise ValueError("Timeout error occurred")
+
+ async def async_delete_secret(
+ self,
+ secret_name: str,
+ recovery_window_in_days: Optional[int] = 7,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> dict:
+ """
+ Async function to delete a secret from AWS Secrets Manager
+
+ Args:
+ secret_name: Name of the secret to delete
+ recovery_window_in_days: Number of days before permanent deletion (default: 7)
+ optional_params: Additional AWS parameters
+ timeout: Request timeout
+
+ Returns:
+ dict: Response from AWS Secrets Manager containing deletion details
+ """
+ # Prepare the request data
+ data = {
+ "SecretId": secret_name,
+ "RecoveryWindowInDays": recovery_window_in_days,
+ }
+
+ endpoint_url, headers, body = self._prepare_request(
+ action="DeleteSecret",
+ secret_name=secret_name,
+ optional_params=optional_params,
+ request_data=data,
+ )
+
+ async_client = get_async_httpx_client(
+ llm_provider=httpxSpecialProvider.SecretManager,
+ params={"timeout": timeout},
+ )
+
+ try:
+ response = await async_client.post(
+ url=endpoint_url, headers=headers, data=body.decode("utf-8")
+ )
+ response.raise_for_status()
+ return response.json()
+ except httpx.HTTPStatusError as err:
+ raise ValueError(f"HTTP error occurred: {err.response.text}")
+ except httpx.TimeoutException:
+ raise ValueError("Timeout error occurred")
+
+ def _prepare_request(
+ self,
+ action: str, # "GetSecretValue" or "PutSecretValue"
+ secret_name: str,
+ secret_value: Optional[str] = None,
+ optional_params: Optional[dict] = None,
+ request_data: Optional[dict] = None,
+ ) -> tuple[str, Any, bytes]:
+ """Prepare the AWS Secrets Manager request"""
+ try:
+ from botocore.auth import SigV4Auth
+ from botocore.awsrequest import AWSRequest
+ except ImportError:
+ raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.")
+ optional_params = optional_params or {}
+ boto3_credentials_info = self._get_boto_credentials_from_optional_params(
+ optional_params
+ )
+
+ # Get endpoint
+ _, endpoint_url = self.get_runtime_endpoint(
+ api_base=None,
+ aws_bedrock_runtime_endpoint=boto3_credentials_info.aws_bedrock_runtime_endpoint,
+ aws_region_name=boto3_credentials_info.aws_region_name,
+ )
+ endpoint_url = endpoint_url.replace("bedrock-runtime", "secretsmanager")
+
+ # Use provided request_data if available, otherwise build default data
+ if request_data:
+ data = request_data
+ else:
+ data = {"SecretId": secret_name}
+ if secret_value and action == "PutSecretValue":
+ data["SecretString"] = secret_value
+
+ body = json.dumps(data).encode("utf-8")
+ headers = {
+ "Content-Type": "application/x-amz-json-1.1",
+ "X-Amz-Target": f"secretsmanager.{action}",
+ }
+
+ # Sign request
+ request = AWSRequest(
+ method="POST", url=endpoint_url, data=body, headers=headers
+ )
+ SigV4Auth(
+ boto3_credentials_info.credentials,
+ "secretsmanager",
+ boto3_credentials_info.aws_region_name,
+ ).add_auth(request)
+ prepped = request.prepare()
+
+ return endpoint_url, prepped.headers, body
+
+
+# if __name__ == "__main__":
+# print("loading aws secret manager v2")
+# aws_secret_manager_v2 = AWSSecretsManagerV2()
+
+# print("writing secret to aws secret manager v2")
+# asyncio.run(aws_secret_manager_v2.async_write_secret(secret_name="test_secret_3", secret_value="test_value_2"))
+# print("reading secret from aws secret manager v2")
diff --git a/.venv/lib/python3.12/site-packages/litellm/secret_managers/base_secret_manager.py b/.venv/lib/python3.12/site-packages/litellm/secret_managers/base_secret_manager.py
new file mode 100644
index 00000000..0b5bf74d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/secret_managers/base_secret_manager.py
@@ -0,0 +1,176 @@
+from abc import ABC, abstractmethod
+from typing import Any, Dict, Optional, Union
+
+import httpx
+
+from litellm import verbose_logger
+
+
+class BaseSecretManager(ABC):
+ """
+ Abstract base class for secret management implementations.
+ """
+
+ @abstractmethod
+ async def async_read_secret(
+ self,
+ secret_name: str,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> Optional[str]:
+ """
+ Asynchronously read a secret from the secret manager.
+
+ Args:
+ secret_name (str): Name/path of the secret to read
+ optional_params (Optional[dict]): Additional parameters specific to the secret manager
+ timeout (Optional[Union[float, httpx.Timeout]]): Request timeout
+
+ Returns:
+ Optional[str]: The secret value if found, None otherwise
+ """
+ pass
+
+ @abstractmethod
+ def sync_read_secret(
+ self,
+ secret_name: str,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> Optional[str]:
+ """
+ Synchronously read a secret from the secret manager.
+
+ Args:
+ secret_name (str): Name/path of the secret to read
+ optional_params (Optional[dict]): Additional parameters specific to the secret manager
+ timeout (Optional[Union[float, httpx.Timeout]]): Request timeout
+
+ Returns:
+ Optional[str]: The secret value if found, None otherwise
+ """
+ pass
+
+ @abstractmethod
+ async def async_write_secret(
+ self,
+ secret_name: str,
+ secret_value: str,
+ description: Optional[str] = None,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> Dict[str, Any]:
+ """
+ Asynchronously write a secret to the secret manager.
+
+ Args:
+ secret_name (str): Name/path of the secret to write
+ secret_value (str): Value to store
+ description (Optional[str]): Description of the secret. Some secret managers allow storing a description with the secret.
+ optional_params (Optional[dict]): Additional parameters specific to the secret manager
+ timeout (Optional[Union[float, httpx.Timeout]]): Request timeout
+ Returns:
+ Dict[str, Any]: Response from the secret manager containing write operation details
+ """
+ pass
+
+ @abstractmethod
+ async def async_delete_secret(
+ self,
+ secret_name: str,
+ recovery_window_in_days: Optional[int] = 7,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> dict:
+ """
+ Async function to delete a secret from the secret manager
+
+ Args:
+ secret_name: Name of the secret to delete
+ recovery_window_in_days: Number of days before permanent deletion (default: 7)
+ optional_params: Additional parameters specific to the secret manager
+ timeout: Request timeout
+
+ Returns:
+ dict: Response from the secret manager containing deletion details
+ """
+ pass
+
+ async def async_rotate_secret(
+ self,
+ current_secret_name: str,
+ new_secret_name: str,
+ new_secret_value: str,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> dict:
+ """
+ Async function to rotate a secret by creating a new one and deleting the old one.
+ This allows for both value and name changes during rotation.
+
+ Args:
+ current_secret_name: Current name of the secret
+ new_secret_name: New name for the secret
+ new_secret_value: New value for the secret
+ optional_params: Additional AWS parameters
+ timeout: Request timeout
+
+ Returns:
+ dict: Response containing the new secret details
+
+ Raises:
+ ValueError: If the secret doesn't exist or if there's an HTTP error
+ """
+ try:
+ # First verify the old secret exists
+ old_secret = await self.async_read_secret(
+ secret_name=current_secret_name,
+ optional_params=optional_params,
+ timeout=timeout,
+ )
+
+ if old_secret is None:
+ raise ValueError(f"Current secret {current_secret_name} not found")
+
+ # Create new secret with new name and value
+ create_response = await self.async_write_secret(
+ secret_name=new_secret_name,
+ secret_value=new_secret_value,
+ description=f"Rotated from {current_secret_name}",
+ optional_params=optional_params,
+ timeout=timeout,
+ )
+
+ # Verify new secret was created successfully
+ new_secret = await self.async_read_secret(
+ secret_name=new_secret_name,
+ optional_params=optional_params,
+ timeout=timeout,
+ )
+
+ if new_secret is None:
+ raise ValueError(f"Failed to verify new secret {new_secret_name}")
+
+ # If everything is successful, delete the old secret
+ await self.async_delete_secret(
+ secret_name=current_secret_name,
+ recovery_window_in_days=7, # Keep for recovery if needed
+ optional_params=optional_params,
+ timeout=timeout,
+ )
+
+ return create_response
+
+ except httpx.HTTPStatusError as err:
+ verbose_logger.exception(
+ "Error rotating secret in AWS Secrets Manager: %s",
+ str(err.response.text),
+ )
+ raise ValueError(f"HTTP error occurred: {err.response.text}")
+ except httpx.TimeoutException:
+ raise ValueError("Timeout error occurred")
+ except Exception as e:
+ verbose_logger.exception(
+ "Error rotating secret in AWS Secrets Manager: %s", str(e)
+ )
+ raise
diff --git a/.venv/lib/python3.12/site-packages/litellm/secret_managers/get_azure_ad_token_provider.py b/.venv/lib/python3.12/site-packages/litellm/secret_managers/get_azure_ad_token_provider.py
new file mode 100644
index 00000000..5403675b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/secret_managers/get_azure_ad_token_provider.py
@@ -0,0 +1,38 @@
+import os
+from typing import Callable
+
+
+def get_azure_ad_token_provider() -> Callable[[], str]:
+ """
+ Get Azure AD token provider based on Service Principal with Secret workflow.
+
+ Based on: https://github.com/openai/openai-python/blob/main/examples/azure_ad.py
+ See Also:
+ https://learn.microsoft.com/en-us/python/api/overview/azure/identity-readme?view=azure-python#service-principal-with-secret;
+ https://learn.microsoft.com/en-us/python/api/azure-identity/azure.identity.clientsecretcredential?view=azure-python.
+
+ Returns:
+ Callable that returns a temporary authentication token.
+ """
+ import azure.identity as identity
+ from azure.identity import get_bearer_token_provider
+
+ azure_scope = os.environ.get(
+ "AZURE_SCOPE", "https://cognitiveservices.azure.com/.default"
+ )
+ cred = os.environ.get("AZURE_CREDENTIAL", "ClientSecretCredential")
+
+ cred_cls = getattr(identity, cred)
+ # ClientSecretCredential, DefaultAzureCredential, AzureCliCredential
+ if cred == "ClientSecretCredential":
+ credential = cred_cls(
+ client_id=os.environ["AZURE_CLIENT_ID"],
+ client_secret=os.environ["AZURE_CLIENT_SECRET"],
+ tenant_id=os.environ["AZURE_TENANT_ID"],
+ )
+ elif cred == "ManagedIdentityCredential":
+ credential = cred_cls(client_id=os.environ["AZURE_CLIENT_ID"])
+ else:
+ credential = cred_cls()
+
+ return get_bearer_token_provider(credential, azure_scope)
diff --git a/.venv/lib/python3.12/site-packages/litellm/secret_managers/google_kms.py b/.venv/lib/python3.12/site-packages/litellm/secret_managers/google_kms.py
new file mode 100644
index 00000000..18e25abe
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/secret_managers/google_kms.py
@@ -0,0 +1,43 @@
+"""
+This is a file for the Google KMS integration
+
+Relevant issue: https://github.com/BerriAI/litellm/issues/1235
+
+Requires:
+* `os.environ["GOOGLE_APPLICATION_CREDENTIALS"], os.environ["GOOGLE_KMS_RESOURCE_NAME"]`
+* `pip install google-cloud-kms`
+"""
+
+import os
+from typing import Optional
+
+import litellm
+from litellm.proxy._types import KeyManagementSystem
+
+
+def validate_environment():
+ if "GOOGLE_APPLICATION_CREDENTIALS" not in os.environ:
+ raise ValueError(
+ "Missing required environment variable - GOOGLE_APPLICATION_CREDENTIALS"
+ )
+ if "GOOGLE_KMS_RESOURCE_NAME" not in os.environ:
+ raise ValueError(
+ "Missing required environment variable - GOOGLE_KMS_RESOURCE_NAME"
+ )
+
+
+def load_google_kms(use_google_kms: Optional[bool]):
+ if use_google_kms is None or use_google_kms is False:
+ return
+ try:
+ from google.cloud import kms_v1 # type: ignore
+
+ validate_environment()
+
+ # Create the KMS client
+ client = kms_v1.KeyManagementServiceClient()
+ litellm.secret_manager_client = client
+ litellm._key_management_system = KeyManagementSystem.GOOGLE_KMS
+ litellm._google_kms_resource_name = os.getenv("GOOGLE_KMS_RESOURCE_NAME")
+ except Exception as e:
+ raise e
diff --git a/.venv/lib/python3.12/site-packages/litellm/secret_managers/google_secret_manager.py b/.venv/lib/python3.12/site-packages/litellm/secret_managers/google_secret_manager.py
new file mode 100644
index 00000000..f21963c3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/secret_managers/google_secret_manager.py
@@ -0,0 +1,116 @@
+import base64
+import os
+from typing import Optional
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.caching.caching import InMemoryCache
+from litellm.integrations.gcs_bucket.gcs_bucket_base import GCSBucketBase
+from litellm.llms.custom_httpx.http_handler import _get_httpx_client
+from litellm.proxy._types import CommonProxyErrors, KeyManagementSystem
+
+
+class GoogleSecretManager(GCSBucketBase):
+ def __init__(
+ self,
+ refresh_interval: Optional[int] = 86400,
+ always_read_secret_manager: Optional[bool] = False,
+ ) -> None:
+ """
+ Args:
+ refresh_interval (int, optional): The refresh interval in seconds. Defaults to 86400. (24 hours)
+ always_read_secret_manager (bool, optional): Whether to always read from the secret manager. Defaults to False. Since we do want to cache values
+ """
+ from litellm.proxy.proxy_server import premium_user
+
+ if premium_user is not True:
+ raise ValueError(
+ f"Google Secret Manager requires an Enterprise License {CommonProxyErrors.not_premium_user.value}"
+ )
+ super().__init__()
+ self.PROJECT_ID = os.environ.get("GOOGLE_SECRET_MANAGER_PROJECT_ID", None)
+ if self.PROJECT_ID is None:
+ raise ValueError(
+ "Google Secret Manager requires a project ID, please set 'GOOGLE_SECRET_MANAGER_PROJECT_ID' in your .env"
+ )
+ self.sync_httpx_client = _get_httpx_client()
+ litellm.secret_manager_client = self
+ litellm._key_management_system = KeyManagementSystem.GOOGLE_SECRET_MANAGER
+ _refresh_interval = os.environ.get(
+ "GOOGLE_SECRET_MANAGER_REFRESH_INTERVAL", refresh_interval
+ )
+ _refresh_interval = (
+ int(_refresh_interval) if _refresh_interval else refresh_interval
+ )
+ self.cache = InMemoryCache(
+ default_ttl=_refresh_interval
+ ) # store in memory for 1 day
+
+ _always_read_secret_manager = os.environ.get(
+ "GOOGLE_SECRET_MANAGER_ALWAYS_READ_SECRET_MANAGER",
+ )
+ if (
+ _always_read_secret_manager
+ and _always_read_secret_manager.lower() == "true"
+ ):
+ self.always_read_secret_manager = True
+ else:
+ # by default this should be False, we want to use in memory caching for this. It's a bad idea to fetch from secret manager for all requests
+ self.always_read_secret_manager = always_read_secret_manager or False
+
+ def get_secret_from_google_secret_manager(self, secret_name: str) -> Optional[str]:
+ """
+ Retrieve a secret from Google Secret Manager or cache.
+
+ Args:
+ secret_name (str): The name of the secret.
+
+ Returns:
+ str: The secret value if successful, None otherwise.
+ """
+ if self.always_read_secret_manager is not True:
+ cached_secret = self.cache.get_cache(secret_name)
+ if cached_secret is not None:
+ return cached_secret
+ if secret_name in self.cache.cache_dict:
+ return cached_secret
+
+ _secret_name = (
+ f"projects/{self.PROJECT_ID}/secrets/{secret_name}/versions/latest"
+ )
+ headers = self.sync_construct_request_headers()
+ url = f"https://secretmanager.googleapis.com/v1/{_secret_name}:access"
+
+ # Send the GET request to retrieve the secret
+ response = self.sync_httpx_client.get(url=url, headers=headers)
+
+ if response.status_code != 200:
+ verbose_logger.error(
+ "Google Secret Manager retrieval error: %s", str(response.text)
+ )
+ self.cache.set_cache(
+ secret_name, None
+ ) # Cache that the secret was not found
+ raise ValueError(
+ f"secret {secret_name} not found in Google Secret Manager. Error: {response.text}"
+ )
+
+ verbose_logger.debug(
+ "Google Secret Manager retrieval response status code: %s",
+ response.status_code,
+ )
+
+ # Parse the JSON response and return the secret value
+ secret_data = response.json()
+ _base64_encoded_value = secret_data.get("payload", {}).get("data")
+
+ # decode the base64 encoded value
+ if _base64_encoded_value is not None:
+ _decoded_value = base64.b64decode(_base64_encoded_value).decode("utf-8")
+ self.cache.set_cache(
+ secret_name, _decoded_value
+ ) # Cache the retrieved secret
+ return _decoded_value
+
+ self.cache.set_cache(secret_name, None) # Cache that the secret was not found
+ raise ValueError(f"secret {secret_name} not found in Google Secret Manager")
diff --git a/.venv/lib/python3.12/site-packages/litellm/secret_managers/hashicorp_secret_manager.py b/.venv/lib/python3.12/site-packages/litellm/secret_managers/hashicorp_secret_manager.py
new file mode 100644
index 00000000..a3d129f8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/secret_managers/hashicorp_secret_manager.py
@@ -0,0 +1,318 @@
+import os
+from typing import Any, Dict, Optional, Union
+
+import httpx
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.caching import InMemoryCache
+from litellm.llms.custom_httpx.http_handler import (
+ _get_httpx_client,
+ get_async_httpx_client,
+ httpxSpecialProvider,
+)
+from litellm.proxy._types import KeyManagementSystem
+
+from .base_secret_manager import BaseSecretManager
+
+
+class HashicorpSecretManager(BaseSecretManager):
+ def __init__(self):
+ from litellm.proxy.proxy_server import CommonProxyErrors, premium_user
+
+ # Vault-specific config
+ self.vault_addr = os.getenv("HCP_VAULT_ADDR", "http://127.0.0.1:8200")
+ self.vault_token = os.getenv("HCP_VAULT_TOKEN", "")
+ # If your KV engine is mounted somewhere other than "secret", adjust here:
+ self.vault_namespace = os.getenv("HCP_VAULT_NAMESPACE", None)
+
+ # Optional config for TLS cert auth
+ self.tls_cert_path = os.getenv("HCP_VAULT_CLIENT_CERT", "")
+ self.tls_key_path = os.getenv("HCP_VAULT_CLIENT_KEY", "")
+ self.vault_cert_role = os.getenv("HCP_VAULT_CERT_ROLE", None)
+
+ # Validate environment
+ if not self.vault_token:
+ raise ValueError(
+ "Missing Vault token. Please set VAULT_TOKEN in your environment."
+ )
+
+ litellm.secret_manager_client = self
+ litellm._key_management_system = KeyManagementSystem.HASHICORP_VAULT
+ _refresh_interval = os.environ.get("HCP_VAULT_REFRESH_INTERVAL", 86400)
+ _refresh_interval = int(_refresh_interval) if _refresh_interval else 86400
+ self.cache = InMemoryCache(
+ default_ttl=_refresh_interval
+ ) # store in memory for 1 day
+
+ if premium_user is not True:
+ raise ValueError(
+ f"Hashicorp secret manager is only available for premium users. {CommonProxyErrors.not_premium_user.value}"
+ )
+
+ def _auth_via_tls_cert(self) -> str:
+ """
+ Ref: https://developer.hashicorp.com/vault/api-docs/auth/cert
+
+ Request:
+ ```
+ curl \
+ --request POST \
+ --cacert vault-ca.pem \
+ --cert cert.pem \
+ --key key.pem \
+ --header "X-Vault-Namespace: mynamespace/" \
+ --data '{"name": "my-cert-role"}' \
+ https://127.0.0.1:8200/v1/auth/cert/login
+ ```
+
+ Response:
+ ```
+ {
+ "auth": {
+ "client_token": "cf95f87d-f95b-47ff-b1f5-ba7bff850425",
+ "policies": ["web", "stage"],
+ "lease_duration": 3600,
+ "renewable": true
+ }
+ }
+ ```
+ """
+ verbose_logger.debug("Using TLS cert auth for Hashicorp Vault")
+ # Vault endpoint for cert-based login, e.g. '/v1/auth/cert/login'
+ login_url = f"{self.vault_addr}/v1/auth/cert/login"
+
+ # Include your Vault namespace in the header if you're using namespaces.
+ # E.g. self.vault_namespace = 'mynamespace/'
+ # If you only have root namespace, you can omit this header entirely.
+ headers = {}
+ if hasattr(self, "vault_namespace") and self.vault_namespace:
+ headers["X-Vault-Namespace"] = self.vault_namespace
+ try:
+ # We use the client cert and key for mutual TLS
+ resp = httpx.post(
+ login_url,
+ cert=(self.tls_cert_path, self.tls_key_path),
+ headers=headers,
+ json=self._get_tls_cert_auth_body(),
+ )
+ resp.raise_for_status()
+ token = resp.json()["auth"]["client_token"]
+ _lease_duration = resp.json()["auth"]["lease_duration"]
+ verbose_logger.info("Successfully obtained Vault token via TLS cert auth.")
+ self.cache.set_cache(
+ key="hcp_vault_token", value=token, ttl=_lease_duration
+ )
+ return token
+ except Exception as e:
+ raise RuntimeError(f"Could not authenticate to Vault via TLS cert: {e}")
+
+ def _get_tls_cert_auth_body(self) -> dict:
+ return {"name": self.vault_cert_role}
+
+ def get_url(self, secret_name: str) -> str:
+ _url = f"{self.vault_addr}/v1/"
+ if self.vault_namespace:
+ _url += f"{self.vault_namespace}/"
+ _url += f"secret/data/{secret_name}"
+ return _url
+
+ def _get_request_headers(self) -> dict:
+ if self.tls_cert_path and self.tls_key_path:
+ return {"X-Vault-Token": self._auth_via_tls_cert()}
+ return {"X-Vault-Token": self.vault_token}
+
+ async def async_read_secret(
+ self,
+ secret_name: str,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> Optional[str]:
+ """
+ Reads a secret from Vault KV v2 using an async HTTPX client.
+ secret_name is just the path inside the KV mount (e.g., 'myapp/config').
+ Returns the entire data dict from data.data, or None on failure.
+ """
+ if self.cache.get_cache(secret_name) is not None:
+ return self.cache.get_cache(secret_name)
+ async_client = get_async_httpx_client(
+ llm_provider=httpxSpecialProvider.SecretManager,
+ )
+ try:
+ # For KV v2: /v1/<mount>/data/<path>
+ # Example: http://127.0.0.1:8200/v1/secret/data/myapp/config
+ _url = self.get_url(secret_name)
+ url = _url
+
+ response = await async_client.get(url, headers=self._get_request_headers())
+ response.raise_for_status()
+
+ # For KV v2, the secret is in response.json()["data"]["data"]
+ json_resp = response.json()
+ _value = self._get_secret_value_from_json_response(json_resp)
+ self.cache.set_cache(secret_name, _value)
+ return _value
+
+ except Exception as e:
+ verbose_logger.exception(f"Error reading secret from Hashicorp Vault: {e}")
+ return None
+
+ def sync_read_secret(
+ self,
+ secret_name: str,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> Optional[str]:
+ """
+ Reads a secret from Vault KV v2 using a sync HTTPX client.
+ secret_name is just the path inside the KV mount (e.g., 'myapp/config').
+ Returns the entire data dict from data.data, or None on failure.
+ """
+ if self.cache.get_cache(secret_name) is not None:
+ return self.cache.get_cache(secret_name)
+ sync_client = _get_httpx_client()
+ try:
+ # For KV v2: /v1/<mount>/data/<path>
+ url = self.get_url(secret_name)
+
+ response = sync_client.get(url, headers=self._get_request_headers())
+ response.raise_for_status()
+
+ # For KV v2, the secret is in response.json()["data"]["data"]
+ json_resp = response.json()
+ _value = self._get_secret_value_from_json_response(json_resp)
+ self.cache.set_cache(secret_name, _value)
+ return _value
+
+ except Exception as e:
+ verbose_logger.exception(f"Error reading secret from Hashicorp Vault: {e}")
+ return None
+
+ async def async_write_secret(
+ self,
+ secret_name: str,
+ secret_value: str,
+ description: Optional[str] = None,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> Dict[str, Any]:
+ """
+ Writes a secret to Vault KV v2 using an async HTTPX client.
+
+ Args:
+ secret_name: Path inside the KV mount (e.g., 'myapp/config')
+ secret_value: Value to store
+ description: Optional description for the secret
+ optional_params: Additional parameters to include in the secret data
+ timeout: Request timeout
+
+ Returns:
+ dict: Response containing status and details of the operation
+ """
+ async_client = get_async_httpx_client(
+ llm_provider=httpxSpecialProvider.SecretManager,
+ params={"timeout": timeout},
+ )
+
+ try:
+ url = self.get_url(secret_name)
+
+ # Prepare the secret data
+ data = {"data": {"key": secret_value}}
+
+ if description:
+ data["data"]["description"] = description
+
+ response = await async_client.post(
+ url=url, headers=self._get_request_headers(), json=data
+ )
+ response.raise_for_status()
+ return response.json()
+ except Exception as e:
+ verbose_logger.exception(f"Error writing secret to Hashicorp Vault: {e}")
+ return {"status": "error", "message": str(e)}
+
+ async def async_rotate_secret(
+ self,
+ current_secret_name: str,
+ new_secret_name: str,
+ new_secret_value: str,
+ optional_params: Dict | None = None,
+ timeout: float | httpx.Timeout | None = None,
+ ) -> Dict:
+ raise NotImplementedError("Hashicorp does not support secret rotation")
+
+ async def async_delete_secret(
+ self,
+ secret_name: str,
+ recovery_window_in_days: Optional[int] = 7,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> dict:
+ """
+ Async function to delete a secret from Hashicorp Vault.
+ In KV v2, this marks the latest version of the secret as deleted.
+
+ Args:
+ secret_name: Name of the secret to delete
+ recovery_window_in_days: Not used for Vault (Vault handles this internally)
+ optional_params: Additional parameters specific to the secret manager
+ timeout: Request timeout
+
+ Returns:
+ dict: Response containing status and details of the operation
+ """
+ async_client = get_async_httpx_client(
+ llm_provider=httpxSpecialProvider.SecretManager,
+ params={"timeout": timeout},
+ )
+
+ try:
+ # For KV v2 delete: /v1/<mount>/data/<path>
+ url = self.get_url(secret_name)
+
+ response = await async_client.delete(
+ url=url, headers=self._get_request_headers()
+ )
+ response.raise_for_status()
+
+ # Clear the cache for this secret
+ self.cache.delete_cache(secret_name)
+
+ return {
+ "status": "success",
+ "message": f"Secret {secret_name} deleted successfully",
+ }
+ except Exception as e:
+ verbose_logger.exception(f"Error deleting secret from Hashicorp Vault: {e}")
+ return {"status": "error", "message": str(e)}
+
+ def _get_secret_value_from_json_response(
+ self, json_resp: Optional[dict]
+ ) -> Optional[str]:
+ """
+ Get the secret value from the JSON response
+
+ Json response from hashicorp vault is of the form:
+
+ {
+ "request_id":"036ba77c-018b-31dd-047b-323bcd0cd332",
+ "lease_id":"",
+ "renewable":false,
+ "lease_duration":0,
+ "data":
+ {"data":
+ {"key":"Vault Is The Way"},
+ "metadata":{"created_time":"2025-01-01T22:13:50.93942388Z","custom_metadata":null,"deletion_time":"","destroyed":false,"version":1}
+ },
+ "wrap_info":null,
+ "warnings":null,
+ "auth":null,
+ "mount_type":"kv"
+ }
+
+ Note: LiteLLM assumes that all secrets are stored as under the key "key"
+ """
+ if json_resp is None:
+ return None
+ return json_resp.get("data", {}).get("data", {}).get("key", None)
diff --git a/.venv/lib/python3.12/site-packages/litellm/secret_managers/main.py b/.venv/lib/python3.12/site-packages/litellm/secret_managers/main.py
new file mode 100644
index 00000000..e505484b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/secret_managers/main.py
@@ -0,0 +1,354 @@
+import ast
+import base64
+import binascii
+import os
+import traceback
+from typing import Any, Optional, Union
+
+import httpx
+
+import litellm
+from litellm._logging import print_verbose, verbose_logger
+from litellm.caching.caching import DualCache
+from litellm.llms.custom_httpx.http_handler import HTTPHandler
+from litellm.proxy._types import KeyManagementSystem
+
+oidc_cache = DualCache()
+
+
+######### Secret Manager ############################
+# checks if user has passed in a secret manager client
+# if passed in then checks the secret there
+def _is_base64(s):
+ try:
+ return base64.b64encode(base64.b64decode(s)).decode() == s
+ except binascii.Error:
+ return False
+
+
+def str_to_bool(value: Optional[str]) -> Optional[bool]:
+ """
+ Converts a string to a boolean if it's a recognized boolean string.
+ Returns None if the string is not a recognized boolean value.
+
+ :param value: The string to be checked.
+ :return: True or False if the string is a recognized boolean, otherwise None.
+ """
+ if value is None:
+ return None
+
+ true_values = {"true"}
+ false_values = {"false"}
+
+ value_lower = value.strip().lower()
+
+ if value_lower in true_values:
+ return True
+ elif value_lower in false_values:
+ return False
+ else:
+ return None
+
+
+def get_secret_str(
+ secret_name: str,
+ default_value: Optional[Union[str, bool]] = None,
+) -> Optional[str]:
+ """
+ Guarantees response from 'get_secret' is either string or none. Used for fixing linting errors.
+ """
+ value = get_secret(secret_name=secret_name, default_value=default_value)
+ if value is not None and not isinstance(value, str):
+ return None
+
+ return value
+
+
+def get_secret_bool(
+ secret_name: str,
+ default_value: Optional[bool] = None,
+) -> Optional[bool]:
+ """
+ Guarantees response from 'get_secret' is either boolean or none. Used for fixing linting errors.
+
+ Args:
+ secret_name: The name of the secret to get.
+ default_value: The default value to return if the secret is not found.
+
+ Returns:
+ The secret value as a boolean or None if the secret is not found.
+ """
+ _secret_value = get_secret(secret_name, default_value)
+ if _secret_value is None:
+ return None
+ elif isinstance(_secret_value, bool):
+ return _secret_value
+ else:
+ return str_to_bool(_secret_value)
+
+
+def get_secret( # noqa: PLR0915
+ secret_name: str,
+ default_value: Optional[Union[str, bool]] = None,
+):
+ key_management_system = litellm._key_management_system
+ key_management_settings = litellm._key_management_settings
+ secret = None
+
+ if secret_name.startswith("os.environ/"):
+ secret_name = secret_name.replace("os.environ/", "")
+
+ # Example: oidc/google/https://bedrock-runtime.us-east-1.amazonaws.com/model/stability.stable-diffusion-xl-v1/invoke
+ if secret_name.startswith("oidc/"):
+ secret_name_split = secret_name.replace("oidc/", "")
+ oidc_provider, oidc_aud = secret_name_split.split("/", 1)
+ # TODO: Add caching for HTTP requests
+ if oidc_provider == "google":
+ oidc_token = oidc_cache.get_cache(key=secret_name)
+ if oidc_token is not None:
+ return oidc_token
+
+ oidc_client = HTTPHandler(timeout=httpx.Timeout(timeout=600.0, connect=5.0))
+ # https://cloud.google.com/compute/docs/instances/verifying-instance-identity#request_signature
+ response = oidc_client.get(
+ "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/identity",
+ params={"audience": oidc_aud},
+ headers={"Metadata-Flavor": "Google"},
+ )
+ if response.status_code == 200:
+ oidc_token = response.text
+ oidc_cache.set_cache(key=secret_name, value=oidc_token, ttl=3600 - 60)
+ return oidc_token
+ else:
+ raise ValueError("Google OIDC provider failed")
+ elif oidc_provider == "circleci":
+ # https://circleci.com/docs/openid-connect-tokens/
+ env_secret = os.getenv("CIRCLE_OIDC_TOKEN")
+ if env_secret is None:
+ raise ValueError("CIRCLE_OIDC_TOKEN not found in environment")
+ return env_secret
+ elif oidc_provider == "circleci_v2":
+ # https://circleci.com/docs/openid-connect-tokens/
+ env_secret = os.getenv("CIRCLE_OIDC_TOKEN_V2")
+ if env_secret is None:
+ raise ValueError("CIRCLE_OIDC_TOKEN_V2 not found in environment")
+ return env_secret
+ elif oidc_provider == "github":
+ # https://docs.github.com/en/actions/deployment/security-hardening-your-deployments/configuring-openid-connect-in-cloud-providers#using-custom-actions
+ actions_id_token_request_url = os.getenv("ACTIONS_ID_TOKEN_REQUEST_URL")
+ actions_id_token_request_token = os.getenv("ACTIONS_ID_TOKEN_REQUEST_TOKEN")
+ if (
+ actions_id_token_request_url is None
+ or actions_id_token_request_token is None
+ ):
+ raise ValueError(
+ "ACTIONS_ID_TOKEN_REQUEST_URL or ACTIONS_ID_TOKEN_REQUEST_TOKEN not found in environment"
+ )
+
+ oidc_token = oidc_cache.get_cache(key=secret_name)
+ if oidc_token is not None:
+ return oidc_token
+
+ oidc_client = HTTPHandler(timeout=httpx.Timeout(timeout=600.0, connect=5.0))
+ response = oidc_client.get(
+ actions_id_token_request_url,
+ params={"audience": oidc_aud},
+ headers={
+ "Authorization": f"Bearer {actions_id_token_request_token}",
+ "Accept": "application/json; api-version=2.0",
+ },
+ )
+ if response.status_code == 200:
+ oidc_token = response.json().get("value", None)
+ oidc_cache.set_cache(key=secret_name, value=oidc_token, ttl=300 - 5)
+ return oidc_token
+ else:
+ raise ValueError("Github OIDC provider failed")
+ elif oidc_provider == "azure":
+ # https://azure.github.io/azure-workload-identity/docs/quick-start.html
+ azure_federated_token_file = os.getenv("AZURE_FEDERATED_TOKEN_FILE")
+ if azure_federated_token_file is None:
+ raise ValueError("AZURE_FEDERATED_TOKEN_FILE not found in environment")
+ with open(azure_federated_token_file, "r") as f:
+ oidc_token = f.read()
+ return oidc_token
+ elif oidc_provider == "file":
+ # Load token from a file
+ with open(oidc_aud, "r") as f:
+ oidc_token = f.read()
+ return oidc_token
+ elif oidc_provider == "env":
+ # Load token directly from an environment variable
+ oidc_token = os.getenv(oidc_aud)
+ if oidc_token is None:
+ raise ValueError(f"Environment variable {oidc_aud} not found")
+ return oidc_token
+ elif oidc_provider == "env_path":
+ # Load token from a file path specified in an environment variable
+ token_file_path = os.getenv(oidc_aud)
+ if token_file_path is None:
+ raise ValueError(f"Environment variable {oidc_aud} not found")
+ with open(token_file_path, "r") as f:
+ oidc_token = f.read()
+ return oidc_token
+ else:
+ raise ValueError("Unsupported OIDC provider")
+
+ try:
+ if (
+ _should_read_secret_from_secret_manager()
+ and litellm.secret_manager_client is not None
+ ):
+ try:
+ client = litellm.secret_manager_client
+ key_manager = "local"
+ if key_management_system is not None:
+ key_manager = key_management_system.value
+
+ if key_management_settings is not None:
+ if (
+ key_management_settings.hosted_keys is not None
+ and secret_name not in key_management_settings.hosted_keys
+ ): # allow user to specify which keys to check in hosted key manager
+ key_manager = "local"
+
+ if (
+ key_manager == KeyManagementSystem.AZURE_KEY_VAULT.value
+ or type(client).__module__ + "." + type(client).__name__
+ == "azure.keyvault.secrets._client.SecretClient"
+ ): # support Azure Secret Client - from azure.keyvault.secrets import SecretClient
+ secret = client.get_secret(secret_name).value
+ elif (
+ key_manager == KeyManagementSystem.GOOGLE_KMS.value
+ or client.__class__.__name__ == "KeyManagementServiceClient"
+ ):
+ encrypted_secret: Any = os.getenv(secret_name)
+ if encrypted_secret is None:
+ raise ValueError(
+ "Google KMS requires the encrypted secret to be in the environment!"
+ )
+ b64_flag = _is_base64(encrypted_secret)
+ if b64_flag is True: # if passed in as encoded b64 string
+ encrypted_secret = base64.b64decode(encrypted_secret)
+ ciphertext = encrypted_secret
+ else:
+ raise ValueError(
+ "Google KMS requires the encrypted secret to be encoded in base64"
+ ) # fix for this vulnerability https://huntr.com/bounties/ae623c2f-b64b-4245-9ed4-f13a0a5824ce
+ response = client.decrypt(
+ request={
+ "name": litellm._google_kms_resource_name,
+ "ciphertext": ciphertext,
+ }
+ )
+ secret = response.plaintext.decode(
+ "utf-8"
+ ) # assumes the original value was encoded with utf-8
+ elif key_manager == KeyManagementSystem.AWS_KMS.value:
+ """
+ Only check the tokens which start with 'aws_kms/'. This prevents latency impact caused by checking all keys.
+ """
+ encrypted_value = os.getenv(secret_name, None)
+ if encrypted_value is None:
+ raise Exception(
+ "AWS KMS - Encrypted Value of Key={} is None".format(
+ secret_name
+ )
+ )
+ # Decode the base64 encoded ciphertext
+ ciphertext_blob = base64.b64decode(encrypted_value)
+
+ # Set up the parameters for the decrypt call
+ params = {"CiphertextBlob": ciphertext_blob}
+ # Perform the decryption
+ response = client.decrypt(**params)
+
+ # Extract and decode the plaintext
+ plaintext = response["Plaintext"]
+ secret = plaintext.decode("utf-8")
+ if isinstance(secret, str):
+ secret = secret.strip()
+ elif key_manager == KeyManagementSystem.AWS_SECRET_MANAGER.value:
+ from litellm.secret_managers.aws_secret_manager_v2 import (
+ AWSSecretsManagerV2,
+ )
+
+ if isinstance(client, AWSSecretsManagerV2):
+ secret = client.sync_read_secret(
+ secret_name=secret_name,
+ primary_secret_name=key_management_settings.primary_secret_name,
+ )
+ print_verbose(f"get_secret_value_response: {secret}")
+ elif key_manager == KeyManagementSystem.GOOGLE_SECRET_MANAGER.value:
+ try:
+ secret = client.get_secret_from_google_secret_manager(
+ secret_name
+ )
+ print_verbose(f"secret from google secret manager: {secret}")
+ if secret is None:
+ raise ValueError(
+ f"No secret found in Google Secret Manager for {secret_name}"
+ )
+ except Exception as e:
+ print_verbose(f"An error occurred - {str(e)}")
+ raise e
+ elif key_manager == KeyManagementSystem.HASHICORP_VAULT.value:
+ try:
+ secret = client.sync_read_secret(secret_name=secret_name)
+ if secret is None:
+ raise ValueError(
+ f"No secret found in Hashicorp Secret Manager for {secret_name}"
+ )
+ except Exception as e:
+ print_verbose(f"An error occurred - {str(e)}")
+ raise e
+ elif key_manager == "local":
+ secret = os.getenv(secret_name)
+ else: # assume the default is infisicial client
+ secret = client.get_secret(secret_name).secret_value
+ except Exception as e: # check if it's in os.environ
+ verbose_logger.error(
+ f"Defaulting to os.environ value for key={secret_name}. An exception occurred - {str(e)}.\n\n{traceback.format_exc()}"
+ )
+ secret = os.getenv(secret_name)
+ try:
+ if isinstance(secret, str):
+ secret_value_as_bool = ast.literal_eval(secret)
+ if isinstance(secret_value_as_bool, bool):
+ return secret_value_as_bool
+ else:
+ return secret
+ except Exception:
+ return secret
+ else:
+ secret = os.environ.get(secret_name)
+ secret_value_as_bool = str_to_bool(secret) if secret is not None else None
+ if secret_value_as_bool is not None and isinstance(
+ secret_value_as_bool, bool
+ ):
+ return secret_value_as_bool
+ else:
+ return secret
+ except Exception as e:
+ if default_value is not None:
+ return default_value
+ else:
+ raise e
+
+
+def _should_read_secret_from_secret_manager() -> bool:
+ """
+ Returns True if the secret manager should be used to read the secret, False otherwise
+
+ - If the secret manager client is not set, return False
+ - If the `_key_management_settings` access mode is "read_only" or "read_and_write", return True
+ - Otherwise, return False
+ """
+ if litellm.secret_manager_client is not None:
+ if litellm._key_management_settings is not None:
+ if (
+ litellm._key_management_settings.access_mode == "read_only"
+ or litellm._key_management_settings.access_mode == "read_and_write"
+ ):
+ return True
+ return False