aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager_v2.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager_v2.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager_v2.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager_v2.py361
1 files changed, 361 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager_v2.py b/.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager_v2.py
new file mode 100644
index 00000000..fd89d6c5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/secret_managers/aws_secret_manager_v2.py
@@ -0,0 +1,361 @@
+"""
+This is a file for the AWS Secret Manager Integration
+
+Handles Async Operations for:
+- Read Secret
+- Write Secret
+- Delete Secret
+
+Relevant issue: https://github.com/BerriAI/litellm/issues/1883
+
+Requires:
+* `os.environ["AWS_REGION_NAME"],
+* `pip install boto3>=1.28.57`
+"""
+
+import json
+import os
+from typing import Any, Optional, Union
+
+import httpx
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.llms.bedrock.base_aws_llm import BaseAWSLLM
+from litellm.llms.custom_httpx.http_handler import (
+ _get_httpx_client,
+ get_async_httpx_client,
+)
+from litellm.proxy._types import KeyManagementSystem
+from litellm.types.llms.custom_http import httpxSpecialProvider
+
+from .base_secret_manager import BaseSecretManager
+
+
+class AWSSecretsManagerV2(BaseAWSLLM, BaseSecretManager):
+ def __init__(self, **kwargs):
+ BaseSecretManager.__init__(self, **kwargs)
+ BaseAWSLLM.__init__(self, **kwargs)
+
+ @classmethod
+ def validate_environment(cls):
+ if "AWS_REGION_NAME" not in os.environ:
+ raise ValueError("Missing required environment variable - AWS_REGION_NAME")
+
+ @classmethod
+ def load_aws_secret_manager(cls, use_aws_secret_manager: Optional[bool]):
+ """
+ Initialize AWSSecretsManagerV2 and sets litellm.secret_manager_client = AWSSecretsManagerV2() and litellm._key_management_system = KeyManagementSystem.AWS_SECRET_MANAGER
+ """
+ if use_aws_secret_manager is None or use_aws_secret_manager is False:
+ return
+ try:
+
+ cls.validate_environment()
+ litellm.secret_manager_client = cls()
+ litellm._key_management_system = KeyManagementSystem.AWS_SECRET_MANAGER
+
+ except Exception as e:
+ raise e
+
+ async def async_read_secret(
+ self,
+ secret_name: str,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ primary_secret_name: Optional[str] = None,
+ ) -> Optional[str]:
+ """
+ Async function to read a secret from AWS Secrets Manager
+
+ Returns:
+ str: Secret value
+ Raises:
+ ValueError: If the secret is not found or an HTTP error occurs
+ """
+ if primary_secret_name:
+ return await self.async_read_secret_from_primary_secret(
+ secret_name=secret_name, primary_secret_name=primary_secret_name
+ )
+
+ endpoint_url, headers, body = self._prepare_request(
+ action="GetSecretValue",
+ secret_name=secret_name,
+ optional_params=optional_params,
+ )
+
+ async_client = get_async_httpx_client(
+ llm_provider=httpxSpecialProvider.SecretManager,
+ params={"timeout": timeout},
+ )
+
+ try:
+ response = await async_client.post(
+ url=endpoint_url, headers=headers, data=body.decode("utf-8")
+ )
+ response.raise_for_status()
+ return response.json()["SecretString"]
+ except httpx.TimeoutException:
+ raise ValueError("Timeout error occurred")
+ except Exception as e:
+ verbose_logger.exception(
+ "Error reading secret='%s' from AWS Secrets Manager: %s",
+ secret_name,
+ str(e),
+ )
+ return None
+
+ def sync_read_secret(
+ self,
+ secret_name: str,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ primary_secret_name: Optional[str] = None,
+ ) -> Optional[str]:
+ """
+ Sync function to read a secret from AWS Secrets Manager
+
+ Done for backwards compatibility with existing codebase, since get_secret is a sync function
+ """
+ # self._prepare_request uses these env vars, we cannot read them from AWS Secrets Manager. If we do we'd get stuck in an infinite loop
+ if secret_name in [
+ "AWS_ACCESS_KEY_ID",
+ "AWS_SECRET_ACCESS_KEY",
+ "AWS_REGION_NAME",
+ "AWS_REGION",
+ "AWS_BEDROCK_RUNTIME_ENDPOINT",
+ ]:
+ return os.getenv(secret_name)
+
+ if primary_secret_name:
+ return self.sync_read_secret_from_primary_secret(
+ secret_name=secret_name, primary_secret_name=primary_secret_name
+ )
+
+ endpoint_url, headers, body = self._prepare_request(
+ action="GetSecretValue",
+ secret_name=secret_name,
+ optional_params=optional_params,
+ )
+
+ sync_client = _get_httpx_client(
+ params={"timeout": timeout},
+ )
+
+ try:
+ response = sync_client.post(
+ url=endpoint_url, headers=headers, data=body.decode("utf-8")
+ )
+ return response.json()["SecretString"]
+ except httpx.TimeoutException:
+ raise ValueError("Timeout error occurred")
+ except httpx.HTTPStatusError as e:
+ verbose_logger.exception(
+ "Error reading secret='%s' from AWS Secrets Manager: %s, %s",
+ secret_name,
+ str(e.response.text),
+ str(e.response.status_code),
+ )
+ except Exception as e:
+ verbose_logger.exception(
+ "Error reading secret='%s' from AWS Secrets Manager: %s",
+ secret_name,
+ str(e),
+ )
+ return None
+
+ def _parse_primary_secret(self, primary_secret_json_str: Optional[str]) -> dict:
+ """
+ Parse the primary secret JSON string into a dictionary
+
+ Args:
+ primary_secret_json_str: JSON string containing key-value pairs
+
+ Returns:
+ Dictionary of key-value pairs from the primary secret
+ """
+ return json.loads(primary_secret_json_str or "{}")
+
+ def sync_read_secret_from_primary_secret(
+ self, secret_name: str, primary_secret_name: str
+ ) -> Optional[str]:
+ """
+ Read a secret from the primary secret
+ """
+ primary_secret_json_str = self.sync_read_secret(secret_name=primary_secret_name)
+ primary_secret_kv_pairs = self._parse_primary_secret(primary_secret_json_str)
+ return primary_secret_kv_pairs.get(secret_name)
+
+ async def async_read_secret_from_primary_secret(
+ self, secret_name: str, primary_secret_name: str
+ ) -> Optional[str]:
+ """
+ Read a secret from the primary secret
+ """
+ primary_secret_json_str = await self.async_read_secret(
+ secret_name=primary_secret_name
+ )
+ primary_secret_kv_pairs = self._parse_primary_secret(primary_secret_json_str)
+ return primary_secret_kv_pairs.get(secret_name)
+
+ async def async_write_secret(
+ self,
+ secret_name: str,
+ secret_value: str,
+ description: Optional[str] = None,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> dict:
+ """
+ Async function to write a secret to AWS Secrets Manager
+
+ Args:
+ secret_name: Name of the secret
+ secret_value: Value to store (can be a JSON string)
+ description: Optional description for the secret
+ optional_params: Additional AWS parameters
+ timeout: Request timeout
+ """
+ import uuid
+
+ # Prepare the request data
+ data = {"Name": secret_name, "SecretString": secret_value}
+ if description:
+ data["Description"] = description
+
+ data["ClientRequestToken"] = str(uuid.uuid4())
+
+ endpoint_url, headers, body = self._prepare_request(
+ action="CreateSecret",
+ secret_name=secret_name,
+ secret_value=secret_value,
+ optional_params=optional_params,
+ request_data=data, # Pass the complete request data
+ )
+
+ async_client = get_async_httpx_client(
+ llm_provider=httpxSpecialProvider.SecretManager,
+ params={"timeout": timeout},
+ )
+
+ try:
+ response = await async_client.post(
+ url=endpoint_url, headers=headers, data=body.decode("utf-8")
+ )
+ response.raise_for_status()
+ return response.json()
+ except httpx.HTTPStatusError as err:
+ raise ValueError(f"HTTP error occurred: {err.response.text}")
+ except httpx.TimeoutException:
+ raise ValueError("Timeout error occurred")
+
+ async def async_delete_secret(
+ self,
+ secret_name: str,
+ recovery_window_in_days: Optional[int] = 7,
+ optional_params: Optional[dict] = None,
+ timeout: Optional[Union[float, httpx.Timeout]] = None,
+ ) -> dict:
+ """
+ Async function to delete a secret from AWS Secrets Manager
+
+ Args:
+ secret_name: Name of the secret to delete
+ recovery_window_in_days: Number of days before permanent deletion (default: 7)
+ optional_params: Additional AWS parameters
+ timeout: Request timeout
+
+ Returns:
+ dict: Response from AWS Secrets Manager containing deletion details
+ """
+ # Prepare the request data
+ data = {
+ "SecretId": secret_name,
+ "RecoveryWindowInDays": recovery_window_in_days,
+ }
+
+ endpoint_url, headers, body = self._prepare_request(
+ action="DeleteSecret",
+ secret_name=secret_name,
+ optional_params=optional_params,
+ request_data=data,
+ )
+
+ async_client = get_async_httpx_client(
+ llm_provider=httpxSpecialProvider.SecretManager,
+ params={"timeout": timeout},
+ )
+
+ try:
+ response = await async_client.post(
+ url=endpoint_url, headers=headers, data=body.decode("utf-8")
+ )
+ response.raise_for_status()
+ return response.json()
+ except httpx.HTTPStatusError as err:
+ raise ValueError(f"HTTP error occurred: {err.response.text}")
+ except httpx.TimeoutException:
+ raise ValueError("Timeout error occurred")
+
+ def _prepare_request(
+ self,
+ action: str, # "GetSecretValue" or "PutSecretValue"
+ secret_name: str,
+ secret_value: Optional[str] = None,
+ optional_params: Optional[dict] = None,
+ request_data: Optional[dict] = None,
+ ) -> tuple[str, Any, bytes]:
+ """Prepare the AWS Secrets Manager request"""
+ try:
+ from botocore.auth import SigV4Auth
+ from botocore.awsrequest import AWSRequest
+ except ImportError:
+ raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.")
+ optional_params = optional_params or {}
+ boto3_credentials_info = self._get_boto_credentials_from_optional_params(
+ optional_params
+ )
+
+ # Get endpoint
+ _, endpoint_url = self.get_runtime_endpoint(
+ api_base=None,
+ aws_bedrock_runtime_endpoint=boto3_credentials_info.aws_bedrock_runtime_endpoint,
+ aws_region_name=boto3_credentials_info.aws_region_name,
+ )
+ endpoint_url = endpoint_url.replace("bedrock-runtime", "secretsmanager")
+
+ # Use provided request_data if available, otherwise build default data
+ if request_data:
+ data = request_data
+ else:
+ data = {"SecretId": secret_name}
+ if secret_value and action == "PutSecretValue":
+ data["SecretString"] = secret_value
+
+ body = json.dumps(data).encode("utf-8")
+ headers = {
+ "Content-Type": "application/x-amz-json-1.1",
+ "X-Amz-Target": f"secretsmanager.{action}",
+ }
+
+ # Sign request
+ request = AWSRequest(
+ method="POST", url=endpoint_url, data=body, headers=headers
+ )
+ SigV4Auth(
+ boto3_credentials_info.credentials,
+ "secretsmanager",
+ boto3_credentials_info.aws_region_name,
+ ).add_auth(request)
+ prepped = request.prepare()
+
+ return endpoint_url, prepped.headers, body
+
+
+# if __name__ == "__main__":
+# print("loading aws secret manager v2")
+# aws_secret_manager_v2 = AWSSecretsManagerV2()
+
+# print("writing secret to aws secret manager v2")
+# asyncio.run(aws_secret_manager_v2.async_write_secret(secret_name="test_secret_3", secret_value="test_value_2"))
+# print("reading secret from aws secret manager v2")