about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py159
1 files changed, 159 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py b/.venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py
new file mode 100644
index 00000000..301591c6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/caching/s3_cache.py
@@ -0,0 +1,159 @@
+"""
+S3 Cache implementation
+WARNING: DO NOT USE THIS IN PRODUCTION - This is not ASYNC
+
+Has 4 methods:
+    - set_cache
+    - get_cache
+    - async_set_cache
+    - async_get_cache
+"""
+
+import ast
+import asyncio
+import json
+from typing import Optional
+
+from litellm._logging import print_verbose, verbose_logger
+
+from .base_cache import BaseCache
+
+
+class S3Cache(BaseCache):
+    def __init__(
+        self,
+        s3_bucket_name,
+        s3_region_name=None,
+        s3_api_version=None,
+        s3_use_ssl: Optional[bool] = True,
+        s3_verify=None,
+        s3_endpoint_url=None,
+        s3_aws_access_key_id=None,
+        s3_aws_secret_access_key=None,
+        s3_aws_session_token=None,
+        s3_config=None,
+        s3_path=None,
+        **kwargs,
+    ):
+        import boto3
+
+        self.bucket_name = s3_bucket_name
+        self.key_prefix = s3_path.rstrip("/") + "/" if s3_path else ""
+        # Create an S3 client with custom endpoint URL
+
+        self.s3_client = boto3.client(
+            "s3",
+            region_name=s3_region_name,
+            endpoint_url=s3_endpoint_url,
+            api_version=s3_api_version,
+            use_ssl=s3_use_ssl,
+            verify=s3_verify,
+            aws_access_key_id=s3_aws_access_key_id,
+            aws_secret_access_key=s3_aws_secret_access_key,
+            aws_session_token=s3_aws_session_token,
+            config=s3_config,
+            **kwargs,
+        )
+
+    def set_cache(self, key, value, **kwargs):
+        try:
+            print_verbose(f"LiteLLM SET Cache - S3. Key={key}. Value={value}")
+            ttl = kwargs.get("ttl", None)
+            # Convert value to JSON before storing in S3
+            serialized_value = json.dumps(value)
+            key = self.key_prefix + key
+
+            if ttl is not None:
+                cache_control = f"immutable, max-age={ttl}, s-maxage={ttl}"
+                import datetime
+
+                # Calculate expiration time
+                expiration_time = datetime.datetime.now() + ttl
+
+                # Upload the data to S3 with the calculated expiration time
+                self.s3_client.put_object(
+                    Bucket=self.bucket_name,
+                    Key=key,
+                    Body=serialized_value,
+                    Expires=expiration_time,
+                    CacheControl=cache_control,
+                    ContentType="application/json",
+                    ContentLanguage="en",
+                    ContentDisposition=f'inline; filename="{key}.json"',
+                )
+            else:
+                cache_control = "immutable, max-age=31536000, s-maxage=31536000"
+                # Upload the data to S3 without specifying Expires
+                self.s3_client.put_object(
+                    Bucket=self.bucket_name,
+                    Key=key,
+                    Body=serialized_value,
+                    CacheControl=cache_control,
+                    ContentType="application/json",
+                    ContentLanguage="en",
+                    ContentDisposition=f'inline; filename="{key}.json"',
+                )
+        except Exception as e:
+            # NON blocking - notify users S3 is throwing an exception
+            print_verbose(f"S3 Caching: set_cache() - Got exception from S3: {e}")
+
+    async def async_set_cache(self, key, value, **kwargs):
+        self.set_cache(key=key, value=value, **kwargs)
+
+    def get_cache(self, key, **kwargs):
+        import botocore
+
+        try:
+            key = self.key_prefix + key
+
+            print_verbose(f"Get S3 Cache: key: {key}")
+            # Download the data from S3
+            cached_response = self.s3_client.get_object(
+                Bucket=self.bucket_name, Key=key
+            )
+
+            if cached_response is not None:
+                # cached_response is in `b{} convert it to ModelResponse
+                cached_response = (
+                    cached_response["Body"].read().decode("utf-8")
+                )  # Convert bytes to string
+                try:
+                    cached_response = json.loads(
+                        cached_response
+                    )  # Convert string to dictionary
+                except Exception:
+                    cached_response = ast.literal_eval(cached_response)
+            if type(cached_response) is not dict:
+                cached_response = dict(cached_response)
+            verbose_logger.debug(
+                f"Got S3 Cache: key: {key}, cached_response {cached_response}. Type Response {type(cached_response)}"
+            )
+
+            return cached_response
+        except botocore.exceptions.ClientError as e:  # type: ignore
+            if e.response["Error"]["Code"] == "NoSuchKey":
+                verbose_logger.debug(
+                    f"S3 Cache: The specified key '{key}' does not exist in the S3 bucket."
+                )
+                return None
+
+        except Exception as e:
+            # NON blocking - notify users S3 is throwing an exception
+            verbose_logger.error(
+                f"S3 Caching: get_cache() - Got exception from S3: {e}"
+            )
+
+    async def async_get_cache(self, key, **kwargs):
+        return self.get_cache(key=key, **kwargs)
+
+    def flush_cache(self):
+        pass
+
+    async def disconnect(self):
+        pass
+
+    async def async_set_cache_pipeline(self, cache_list, **kwargs):
+        tasks = []
+        for val in cache_list:
+            tasks.append(self.async_set_cache(val[0], val[1], **kwargs))
+        await asyncio.gather(*tasks)