diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py | 159 |
1 files changed, 159 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py b/.venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py new file mode 100644 index 00000000..301591c6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py @@ -0,0 +1,159 @@ +""" +S3 Cache implementation +WARNING: DO NOT USE THIS IN PRODUCTION - This is not ASYNC + +Has 4 methods: + - set_cache + - get_cache + - async_set_cache + - async_get_cache +""" + +import ast +import asyncio +import json +from typing import Optional + +from litellm._logging import print_verbose, verbose_logger + +from .base_cache import BaseCache + + +class S3Cache(BaseCache): + def __init__( + self, + s3_bucket_name, + s3_region_name=None, + s3_api_version=None, + s3_use_ssl: Optional[bool] = True, + s3_verify=None, + s3_endpoint_url=None, + s3_aws_access_key_id=None, + s3_aws_secret_access_key=None, + s3_aws_session_token=None, + s3_config=None, + s3_path=None, + **kwargs, + ): + import boto3 + + self.bucket_name = s3_bucket_name + self.key_prefix = s3_path.rstrip("/") + "/" if s3_path else "" + # Create an S3 client with custom endpoint URL + + self.s3_client = boto3.client( + "s3", + region_name=s3_region_name, + endpoint_url=s3_endpoint_url, + api_version=s3_api_version, + use_ssl=s3_use_ssl, + verify=s3_verify, + aws_access_key_id=s3_aws_access_key_id, + aws_secret_access_key=s3_aws_secret_access_key, + aws_session_token=s3_aws_session_token, + config=s3_config, + **kwargs, + ) + + def set_cache(self, key, value, **kwargs): + try: + print_verbose(f"LiteLLM SET Cache - S3. Key={key}. Value={value}") + ttl = kwargs.get("ttl", None) + # Convert value to JSON before storing in S3 + serialized_value = json.dumps(value) + key = self.key_prefix + key + + if ttl is not None: + cache_control = f"immutable, max-age={ttl}, s-maxage={ttl}" + import datetime + + # Calculate expiration time + expiration_time = datetime.datetime.now() + ttl + + # Upload the data to S3 with the calculated expiration time + self.s3_client.put_object( + Bucket=self.bucket_name, + Key=key, + Body=serialized_value, + Expires=expiration_time, + CacheControl=cache_control, + ContentType="application/json", + ContentLanguage="en", + ContentDisposition=f'inline; filename="{key}.json"', + ) + else: + cache_control = "immutable, max-age=31536000, s-maxage=31536000" + # Upload the data to S3 without specifying Expires + self.s3_client.put_object( + Bucket=self.bucket_name, + Key=key, + Body=serialized_value, + CacheControl=cache_control, + ContentType="application/json", + ContentLanguage="en", + ContentDisposition=f'inline; filename="{key}.json"', + ) + except Exception as e: + # NON blocking - notify users S3 is throwing an exception + print_verbose(f"S3 Caching: set_cache() - Got exception from S3: {e}") + + async def async_set_cache(self, key, value, **kwargs): + self.set_cache(key=key, value=value, **kwargs) + + def get_cache(self, key, **kwargs): + import botocore + + try: + key = self.key_prefix + key + + print_verbose(f"Get S3 Cache: key: {key}") + # Download the data from S3 + cached_response = self.s3_client.get_object( + Bucket=self.bucket_name, Key=key + ) + + if cached_response is not None: + # cached_response is in `b{} convert it to ModelResponse + cached_response = ( + cached_response["Body"].read().decode("utf-8") + ) # Convert bytes to string + try: + cached_response = json.loads( + cached_response + ) # Convert string to dictionary + except Exception: + cached_response = ast.literal_eval(cached_response) + if type(cached_response) is not dict: + cached_response = dict(cached_response) + verbose_logger.debug( + f"Got S3 Cache: key: {key}, cached_response {cached_response}. Type Response {type(cached_response)}" + ) + + return cached_response + except botocore.exceptions.ClientError as e: # type: ignore + if e.response["Error"]["Code"] == "NoSuchKey": + verbose_logger.debug( + f"S3 Cache: The specified key '{key}' does not exist in the S3 bucket." + ) + return None + + except Exception as e: + # NON blocking - notify users S3 is throwing an exception + verbose_logger.error( + f"S3 Caching: get_cache() - Got exception from S3: {e}" + ) + + async def async_get_cache(self, key, **kwargs): + return self.get_cache(key=key, **kwargs) + + def flush_cache(self): + pass + + async def disconnect(self): + pass + + async def async_set_cache_pipeline(self, cache_list, **kwargs): + tasks = [] + for val in cache_list: + tasks.append(self.async_set_cache(val[0], val[1], **kwargs)) + await asyncio.gather(*tasks) |