about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py1047
1 files changed, 1047 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py b/.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py
new file mode 100644
index 00000000..0571ac9f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py
@@ -0,0 +1,1047 @@
+"""
+Redis Cache implementation
+
+Has 4 primary methods:
+    - set_cache
+    - get_cache
+    - async_set_cache
+    - async_get_cache
+"""
+
+import ast
+import asyncio
+import inspect
+import json
+import time
+from datetime import timedelta
+from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Union
+
+import litellm
+from litellm._logging import print_verbose, verbose_logger
+from litellm.litellm_core_utils.core_helpers import _get_parent_otel_span_from_kwargs
+from litellm.types.caching import RedisPipelineIncrementOperation
+from litellm.types.services import ServiceTypes
+
+from .base_cache import BaseCache
+
+if TYPE_CHECKING:
+    from opentelemetry.trace import Span as _Span
+    from redis.asyncio import Redis, RedisCluster
+    from redis.asyncio.client import Pipeline
+    from redis.asyncio.cluster import ClusterPipeline
+
+    pipeline = Pipeline
+    cluster_pipeline = ClusterPipeline
+    async_redis_client = Redis
+    async_redis_cluster_client = RedisCluster
+    Span = _Span
+else:
+    pipeline = Any
+    cluster_pipeline = Any
+    async_redis_client = Any
+    async_redis_cluster_client = Any
+    Span = Any
+
+
+class RedisCache(BaseCache):
+    # if users don't provider one, use the default litellm cache
+
+    def __init__(
+        self,
+        host=None,
+        port=None,
+        password=None,
+        redis_flush_size: Optional[int] = 100,
+        namespace: Optional[str] = None,
+        startup_nodes: Optional[List] = None,  # for redis-cluster
+        socket_timeout: Optional[float] = 5.0,  # default 5 second timeout
+        **kwargs,
+    ):
+
+        from litellm._service_logger import ServiceLogging
+
+        from .._redis import get_redis_client, get_redis_connection_pool
+
+        redis_kwargs = {}
+        if host is not None:
+            redis_kwargs["host"] = host
+        if port is not None:
+            redis_kwargs["port"] = port
+        if password is not None:
+            redis_kwargs["password"] = password
+        if startup_nodes is not None:
+            redis_kwargs["startup_nodes"] = startup_nodes
+        if socket_timeout is not None:
+            redis_kwargs["socket_timeout"] = socket_timeout
+
+        ### HEALTH MONITORING OBJECT ###
+        if kwargs.get("service_logger_obj", None) is not None and isinstance(
+            kwargs["service_logger_obj"], ServiceLogging
+        ):
+            self.service_logger_obj = kwargs.pop("service_logger_obj")
+        else:
+            self.service_logger_obj = ServiceLogging()
+
+        redis_kwargs.update(kwargs)
+        self.redis_client = get_redis_client(**redis_kwargs)
+        self.redis_async_client: Optional[async_redis_client] = None
+        self.redis_kwargs = redis_kwargs
+        self.async_redis_conn_pool = get_redis_connection_pool(**redis_kwargs)
+
+        # redis namespaces
+        self.namespace = namespace
+        # for high traffic, we store the redis results in memory and then batch write to redis
+        self.redis_batch_writing_buffer: list = []
+        if redis_flush_size is None:
+            self.redis_flush_size: int = 100
+        else:
+            self.redis_flush_size = redis_flush_size
+        self.redis_version = "Unknown"
+        try:
+            if not inspect.iscoroutinefunction(self.redis_client):
+                self.redis_version = self.redis_client.info()["redis_version"]  # type: ignore
+        except Exception:
+            pass
+
+        ### ASYNC HEALTH PING ###
+        try:
+            # asyncio.get_running_loop().create_task(self.ping())
+            _ = asyncio.get_running_loop().create_task(self.ping())
+        except Exception as e:
+            if "no running event loop" in str(e):
+                verbose_logger.debug(
+                    "Ignoring async redis ping. No running event loop."
+                )
+            else:
+                verbose_logger.error(
+                    "Error connecting to Async Redis client - {}".format(str(e)),
+                    extra={"error": str(e)},
+                )
+
+        ### SYNC HEALTH PING ###
+        try:
+            if hasattr(self.redis_client, "ping"):
+                self.redis_client.ping()  # type: ignore
+        except Exception as e:
+            verbose_logger.error(
+                "Error connecting to Sync Redis client", extra={"error": str(e)}
+            )
+
+        if litellm.default_redis_ttl is not None:
+            super().__init__(default_ttl=int(litellm.default_redis_ttl))
+        else:
+            super().__init__()  # defaults to 60s
+
+    def init_async_client(
+        self,
+    ) -> Union[async_redis_client, async_redis_cluster_client]:
+        from .._redis import get_redis_async_client
+
+        if self.redis_async_client is None:
+            self.redis_async_client = get_redis_async_client(
+                connection_pool=self.async_redis_conn_pool, **self.redis_kwargs
+            )
+        return self.redis_async_client
+
+    def check_and_fix_namespace(self, key: str) -> str:
+        """
+        Make sure each key starts with the given namespace
+        """
+        if self.namespace is not None and not key.startswith(self.namespace):
+            key = self.namespace + ":" + key
+
+        return key
+
+    def set_cache(self, key, value, **kwargs):
+        ttl = self.get_ttl(**kwargs)
+        print_verbose(
+            f"Set Redis Cache: key: {key}\nValue {value}\nttl={ttl}, redis_version={self.redis_version}"
+        )
+        key = self.check_and_fix_namespace(key=key)
+        try:
+            start_time = time.time()
+            self.redis_client.set(name=key, value=str(value), ex=ttl)
+            end_time = time.time()
+            _duration = end_time - start_time
+            self.service_logger_obj.service_success_hook(
+                service=ServiceTypes.REDIS,
+                duration=_duration,
+                call_type="set_cache",
+                start_time=start_time,
+                end_time=end_time,
+            )
+        except Exception as e:
+            # NON blocking - notify users Redis is throwing an exception
+            print_verbose(
+                f"litellm.caching.caching: set() - Got exception from REDIS : {str(e)}"
+            )
+
+    def increment_cache(
+        self, key, value: int, ttl: Optional[float] = None, **kwargs
+    ) -> int:
+        _redis_client = self.redis_client
+        start_time = time.time()
+        set_ttl = self.get_ttl(ttl=ttl)
+        try:
+            start_time = time.time()
+            result: int = _redis_client.incr(name=key, amount=value)  # type: ignore
+            end_time = time.time()
+            _duration = end_time - start_time
+            self.service_logger_obj.service_success_hook(
+                service=ServiceTypes.REDIS,
+                duration=_duration,
+                call_type="increment_cache",
+                start_time=start_time,
+                end_time=end_time,
+            )
+
+            if set_ttl is not None:
+                # check if key already has ttl, if not -> set ttl
+                start_time = time.time()
+                current_ttl = _redis_client.ttl(key)
+                end_time = time.time()
+                _duration = end_time - start_time
+                self.service_logger_obj.service_success_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    call_type="increment_cache_ttl",
+                    start_time=start_time,
+                    end_time=end_time,
+                )
+                if current_ttl == -1:
+                    # Key has no expiration
+                    start_time = time.time()
+                    _redis_client.expire(key, set_ttl)  # type: ignore
+                    end_time = time.time()
+                    _duration = end_time - start_time
+                    self.service_logger_obj.service_success_hook(
+                        service=ServiceTypes.REDIS,
+                        duration=_duration,
+                        call_type="increment_cache_expire",
+                        start_time=start_time,
+                        end_time=end_time,
+                    )
+            return result
+        except Exception as e:
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            verbose_logger.error(
+                "LiteLLM Redis Caching: increment_cache() - Got exception from REDIS %s, Writing value=%s",
+                str(e),
+                value,
+            )
+            raise e
+
+    async def async_scan_iter(self, pattern: str, count: int = 100) -> list:
+        from redis.asyncio import Redis
+
+        start_time = time.time()
+        try:
+            keys = []
+            _redis_client: Redis = self.init_async_client()  # type: ignore
+
+            async for key in _redis_client.scan_iter(match=pattern + "*", count=count):
+                keys.append(key)
+                if len(keys) >= count:
+                    break
+
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_success_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    call_type="async_scan_iter",
+                    start_time=start_time,
+                    end_time=end_time,
+                )
+            )  # DO NOT SLOW DOWN CALL B/C OF THIS
+            return keys
+        except Exception as e:
+            # NON blocking - notify users Redis is throwing an exception
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_failure_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    error=e,
+                    call_type="async_scan_iter",
+                    start_time=start_time,
+                    end_time=end_time,
+                )
+            )
+            raise e
+
+    async def async_set_cache(self, key, value, **kwargs):
+        from redis.asyncio import Redis
+
+        start_time = time.time()
+        try:
+            _redis_client: Redis = self.init_async_client()  # type: ignore
+        except Exception as e:
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_failure_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    error=e,
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+                    call_type="async_set_cache",
+                )
+            )
+            verbose_logger.error(
+                "LiteLLM Redis Caching: async set() - Got exception from REDIS %s, Writing value=%s",
+                str(e),
+                value,
+            )
+            raise e
+
+        key = self.check_and_fix_namespace(key=key)
+        ttl = self.get_ttl(**kwargs)
+        print_verbose(f"Set ASYNC Redis Cache: key: {key}\nValue {value}\nttl={ttl}")
+
+        try:
+            if not hasattr(_redis_client, "set"):
+                raise Exception("Redis client cannot set cache. Attribute not found.")
+            await _redis_client.set(name=key, value=json.dumps(value), ex=ttl)
+            print_verbose(
+                f"Successfully Set ASYNC Redis Cache: key: {key}\nValue {value}\nttl={ttl}"
+            )
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_success_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    call_type="async_set_cache",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+                    event_metadata={"key": key},
+                )
+            )
+        except Exception as e:
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_failure_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    error=e,
+                    call_type="async_set_cache",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+                    event_metadata={"key": key},
+                )
+            )
+            verbose_logger.error(
+                "LiteLLM Redis Caching: async set() - Got exception from REDIS %s, Writing value=%s",
+                str(e),
+                value,
+            )
+
+    async def _pipeline_helper(
+        self,
+        pipe: Union[pipeline, cluster_pipeline],
+        cache_list: List[Tuple[Any, Any]],
+        ttl: Optional[float],
+    ) -> List:
+        """
+        Helper function for executing a pipeline of set operations on Redis
+        """
+        ttl = self.get_ttl(ttl=ttl)
+        # Iterate through each key-value pair in the cache_list and set them in the pipeline.
+        for cache_key, cache_value in cache_list:
+            cache_key = self.check_and_fix_namespace(key=cache_key)
+            print_verbose(
+                f"Set ASYNC Redis Cache PIPELINE: key: {cache_key}\nValue {cache_value}\nttl={ttl}"
+            )
+            json_cache_value = json.dumps(cache_value)
+            # Set the value with a TTL if it's provided.
+            _td: Optional[timedelta] = None
+            if ttl is not None:
+                _td = timedelta(seconds=ttl)
+            pipe.set(  # type: ignore
+                name=cache_key,
+                value=json_cache_value,
+                ex=_td,
+            )
+        # Execute the pipeline and return the results.
+        results = await pipe.execute()
+        return results
+
+    async def async_set_cache_pipeline(
+        self, cache_list: List[Tuple[Any, Any]], ttl: Optional[float] = None, **kwargs
+    ):
+        """
+        Use Redis Pipelines for bulk write operations
+        """
+        # don't waste a network request if there's nothing to set
+        if len(cache_list) == 0:
+            return
+
+        _redis_client = self.init_async_client()
+        start_time = time.time()
+
+        print_verbose(
+            f"Set Async Redis Cache: key list: {cache_list}\nttl={ttl}, redis_version={self.redis_version}"
+        )
+        cache_value: Any = None
+        try:
+            async with _redis_client.pipeline(transaction=False) as pipe:
+                results = await self._pipeline_helper(pipe, cache_list, ttl)
+
+            print_verbose(f"pipeline results: {results}")
+            # Optionally, you could process 'results' to make sure that all set operations were successful.
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_success_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    call_type="async_set_cache_pipeline",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+                )
+            )
+            return None
+        except Exception as e:
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_failure_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    error=e,
+                    call_type="async_set_cache_pipeline",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+                )
+            )
+
+            verbose_logger.error(
+                "LiteLLM Redis Caching: async set_cache_pipeline() - Got exception from REDIS %s, Writing value=%s",
+                str(e),
+                cache_value,
+            )
+
+    async def _set_cache_sadd_helper(
+        self,
+        redis_client: async_redis_client,
+        key: str,
+        value: List,
+        ttl: Optional[float],
+    ) -> None:
+        """Helper function for async_set_cache_sadd. Separated for testing."""
+        ttl = self.get_ttl(ttl=ttl)
+        try:
+            await redis_client.sadd(key, *value)  # type: ignore
+            if ttl is not None:
+                _td = timedelta(seconds=ttl)
+                await redis_client.expire(key, _td)
+        except Exception:
+            raise
+
+    async def async_set_cache_sadd(
+        self, key, value: List, ttl: Optional[float], **kwargs
+    ):
+        from redis.asyncio import Redis
+
+        start_time = time.time()
+        try:
+            _redis_client: Redis = self.init_async_client()  # type: ignore
+        except Exception as e:
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_failure_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    error=e,
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+                    call_type="async_set_cache_sadd",
+                )
+            )
+            # NON blocking - notify users Redis is throwing an exception
+            verbose_logger.error(
+                "LiteLLM Redis Caching: async set() - Got exception from REDIS %s, Writing value=%s",
+                str(e),
+                value,
+            )
+            raise e
+
+        key = self.check_and_fix_namespace(key=key)
+        print_verbose(f"Set ASYNC Redis Cache: key: {key}\nValue {value}\nttl={ttl}")
+        try:
+            await self._set_cache_sadd_helper(
+                redis_client=_redis_client, key=key, value=value, ttl=ttl
+            )
+            print_verbose(
+                f"Successfully Set ASYNC Redis Cache SADD: key: {key}\nValue {value}\nttl={ttl}"
+            )
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_success_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    call_type="async_set_cache_sadd",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+                )
+            )
+        except Exception as e:
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_failure_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    error=e,
+                    call_type="async_set_cache_sadd",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+                )
+            )
+            # NON blocking - notify users Redis is throwing an exception
+            verbose_logger.error(
+                "LiteLLM Redis Caching: async set_cache_sadd() - Got exception from REDIS %s, Writing value=%s",
+                str(e),
+                value,
+            )
+
+    async def batch_cache_write(self, key, value, **kwargs):
+        print_verbose(
+            f"in batch cache writing for redis buffer size={len(self.redis_batch_writing_buffer)}",
+        )
+        key = self.check_and_fix_namespace(key=key)
+        self.redis_batch_writing_buffer.append((key, value))
+        if len(self.redis_batch_writing_buffer) >= self.redis_flush_size:
+            await self.flush_cache_buffer()  # logging done in here
+
+    async def async_increment(
+        self,
+        key,
+        value: float,
+        ttl: Optional[int] = None,
+        parent_otel_span: Optional[Span] = None,
+    ) -> float:
+        from redis.asyncio import Redis
+
+        _redis_client: Redis = self.init_async_client()  # type: ignore
+        start_time = time.time()
+        _used_ttl = self.get_ttl(ttl=ttl)
+        key = self.check_and_fix_namespace(key=key)
+        try:
+            result = await _redis_client.incrbyfloat(name=key, amount=value)
+            if _used_ttl is not None:
+                # check if key already has ttl, if not -> set ttl
+                current_ttl = await _redis_client.ttl(key)
+                if current_ttl == -1:
+                    # Key has no expiration
+                    await _redis_client.expire(key, _used_ttl)
+
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+
+            asyncio.create_task(
+                self.service_logger_obj.async_service_success_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    call_type="async_increment",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=parent_otel_span,
+                )
+            )
+            return result
+        except Exception as e:
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_failure_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    error=e,
+                    call_type="async_increment",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=parent_otel_span,
+                )
+            )
+            verbose_logger.error(
+                "LiteLLM Redis Caching: async async_increment() - Got exception from REDIS %s, Writing value=%s",
+                str(e),
+                value,
+            )
+            raise e
+
+    async def flush_cache_buffer(self):
+        print_verbose(
+            f"flushing to redis....reached size of buffer {len(self.redis_batch_writing_buffer)}"
+        )
+        await self.async_set_cache_pipeline(self.redis_batch_writing_buffer)
+        self.redis_batch_writing_buffer = []
+
+    def _get_cache_logic(self, cached_response: Any):
+        """
+        Common 'get_cache_logic' across sync + async redis client implementations
+        """
+        if cached_response is None:
+            return cached_response
+        # cached_response is in `b{} convert it to ModelResponse
+        cached_response = cached_response.decode("utf-8")  # Convert bytes to string
+        try:
+            cached_response = json.loads(
+                cached_response
+            )  # Convert string to dictionary
+        except Exception:
+            cached_response = ast.literal_eval(cached_response)
+        return cached_response
+
+    def get_cache(self, key, parent_otel_span: Optional[Span] = None, **kwargs):
+        try:
+            key = self.check_and_fix_namespace(key=key)
+            print_verbose(f"Get Redis Cache: key: {key}")
+            start_time = time.time()
+            cached_response = self.redis_client.get(key)
+            end_time = time.time()
+            _duration = end_time - start_time
+            self.service_logger_obj.service_success_hook(
+                service=ServiceTypes.REDIS,
+                duration=_duration,
+                call_type="get_cache",
+                start_time=start_time,
+                end_time=end_time,
+                parent_otel_span=parent_otel_span,
+            )
+            print_verbose(
+                f"Got Redis Cache: key: {key}, cached_response {cached_response}"
+            )
+            return self._get_cache_logic(cached_response=cached_response)
+        except Exception as e:
+            # NON blocking - notify users Redis is throwing an exception
+            verbose_logger.error(
+                "litellm.caching.caching: get() - Got exception from REDIS: ", e
+            )
+
+    def _run_redis_mget_operation(self, keys: List[str]) -> List[Any]:
+        """
+        Wrapper to call `mget` on the redis client
+
+        We use a wrapper so RedisCluster can override this method
+        """
+        return self.redis_client.mget(keys=keys)  # type: ignore
+
+    async def _async_run_redis_mget_operation(self, keys: List[str]) -> List[Any]:
+        """
+        Wrapper to call `mget` on the redis client
+
+        We use a wrapper so RedisCluster can override this method
+        """
+        async_redis_client = self.init_async_client()
+        return await async_redis_client.mget(keys=keys)  # type: ignore
+
+    def batch_get_cache(
+        self,
+        key_list: Union[List[str], List[Optional[str]]],
+        parent_otel_span: Optional[Span] = None,
+    ) -> dict:
+        """
+        Use Redis for bulk read operations
+
+        Args:
+            key_list: List of keys to get from Redis
+            parent_otel_span: Optional parent OpenTelemetry span
+
+        Returns:
+            dict: A dictionary mapping keys to their cached values
+        """
+        key_value_dict = {}
+        _key_list = [key for key in key_list if key is not None]
+
+        try:
+            _keys = []
+            for cache_key in _key_list:
+                cache_key = self.check_and_fix_namespace(key=cache_key or "")
+                _keys.append(cache_key)
+            start_time = time.time()
+            results: List = self._run_redis_mget_operation(keys=_keys)
+            end_time = time.time()
+            _duration = end_time - start_time
+            self.service_logger_obj.service_success_hook(
+                service=ServiceTypes.REDIS,
+                duration=_duration,
+                call_type="batch_get_cache",
+                start_time=start_time,
+                end_time=end_time,
+                parent_otel_span=parent_otel_span,
+            )
+
+            # Associate the results back with their keys.
+            # 'results' is a list of values corresponding to the order of keys in '_key_list'.
+            key_value_dict = dict(zip(_key_list, results))
+
+            decoded_results = {}
+            for k, v in key_value_dict.items():
+                if isinstance(k, bytes):
+                    k = k.decode("utf-8")
+                v = self._get_cache_logic(v)
+                decoded_results[k] = v
+
+            return decoded_results
+        except Exception as e:
+            verbose_logger.error(f"Error occurred in batch get cache - {str(e)}")
+            return key_value_dict
+
+    async def async_get_cache(
+        self, key, parent_otel_span: Optional[Span] = None, **kwargs
+    ):
+        from redis.asyncio import Redis
+
+        _redis_client: Redis = self.init_async_client()  # type: ignore
+        key = self.check_and_fix_namespace(key=key)
+        start_time = time.time()
+
+        try:
+            print_verbose(f"Get Async Redis Cache: key: {key}")
+            cached_response = await _redis_client.get(key)
+            print_verbose(
+                f"Got Async Redis Cache: key: {key}, cached_response {cached_response}"
+            )
+            response = self._get_cache_logic(cached_response=cached_response)
+
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_success_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    call_type="async_get_cache",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=parent_otel_span,
+                    event_metadata={"key": key},
+                )
+            )
+            return response
+        except Exception as e:
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_failure_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    error=e,
+                    call_type="async_get_cache",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=parent_otel_span,
+                    event_metadata={"key": key},
+                )
+            )
+            print_verbose(
+                f"litellm.caching.caching: async get() - Got exception from REDIS: {str(e)}"
+            )
+
+    async def async_batch_get_cache(
+        self,
+        key_list: Union[List[str], List[Optional[str]]],
+        parent_otel_span: Optional[Span] = None,
+    ) -> dict:
+        """
+        Use Redis for bulk read operations
+
+        Args:
+            key_list: List of keys to get from Redis
+            parent_otel_span: Optional parent OpenTelemetry span
+
+        Returns:
+            dict: A dictionary mapping keys to their cached values
+
+        `.mget` does not support None keys. This will filter out None keys.
+        """
+        # typed as Any, redis python lib has incomplete type stubs for RedisCluster and does not include `mget`
+        key_value_dict = {}
+        start_time = time.time()
+        _key_list = [key for key in key_list if key is not None]
+        try:
+            _keys = []
+            for cache_key in _key_list:
+                cache_key = self.check_and_fix_namespace(key=cache_key)
+                _keys.append(cache_key)
+            results = await self._async_run_redis_mget_operation(keys=_keys)
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_success_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    call_type="async_batch_get_cache",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=parent_otel_span,
+                )
+            )
+
+            # Associate the results back with their keys.
+            # 'results' is a list of values corresponding to the order of keys in 'key_list'.
+            key_value_dict = dict(zip(_key_list, results))
+
+            decoded_results = {}
+            for k, v in key_value_dict.items():
+                if isinstance(k, bytes):
+                    k = k.decode("utf-8")
+                v = self._get_cache_logic(v)
+                decoded_results[k] = v
+
+            return decoded_results
+        except Exception as e:
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_failure_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    error=e,
+                    call_type="async_batch_get_cache",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=parent_otel_span,
+                )
+            )
+            verbose_logger.error(f"Error occurred in async batch get cache - {str(e)}")
+            return key_value_dict
+
+    def sync_ping(self) -> bool:
+        """
+        Tests if the sync redis client is correctly setup.
+        """
+        print_verbose("Pinging Sync Redis Cache")
+        start_time = time.time()
+        try:
+            response: bool = self.redis_client.ping()  # type: ignore
+            print_verbose(f"Redis Cache PING: {response}")
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            self.service_logger_obj.service_success_hook(
+                service=ServiceTypes.REDIS,
+                duration=_duration,
+                call_type="sync_ping",
+                start_time=start_time,
+                end_time=end_time,
+            )
+            return response
+        except Exception as e:
+            # NON blocking - notify users Redis is throwing an exception
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            self.service_logger_obj.service_failure_hook(
+                service=ServiceTypes.REDIS,
+                duration=_duration,
+                error=e,
+                call_type="sync_ping",
+            )
+            verbose_logger.error(
+                f"LiteLLM Redis Cache PING: - Got exception from REDIS : {str(e)}"
+            )
+            raise e
+
+    async def ping(self) -> bool:
+        # typed as Any, redis python lib has incomplete type stubs for RedisCluster and does not include `ping`
+        _redis_client: Any = self.init_async_client()
+        start_time = time.time()
+        print_verbose("Pinging Async Redis Cache")
+        try:
+            response = await _redis_client.ping()
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_success_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    call_type="async_ping",
+                )
+            )
+            return response
+        except Exception as e:
+            # NON blocking - notify users Redis is throwing an exception
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_failure_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    error=e,
+                    call_type="async_ping",
+                )
+            )
+            verbose_logger.error(
+                f"LiteLLM Redis Cache PING: - Got exception from REDIS : {str(e)}"
+            )
+            raise e
+
+    async def delete_cache_keys(self, keys):
+        # typed as Any, redis python lib has incomplete type stubs for RedisCluster and does not include `delete`
+        _redis_client: Any = self.init_async_client()
+        # keys is a list, unpack it so it gets passed as individual elements to delete
+        await _redis_client.delete(*keys)
+
+    def client_list(self) -> List:
+        client_list: List = self.redis_client.client_list()  # type: ignore
+        return client_list
+
+    def info(self):
+        info = self.redis_client.info()
+        return info
+
+    def flush_cache(self):
+        self.redis_client.flushall()
+
+    def flushall(self):
+        self.redis_client.flushall()
+
+    async def disconnect(self):
+        await self.async_redis_conn_pool.disconnect(inuse_connections=True)
+
+    async def async_delete_cache(self, key: str):
+        # typed as Any, redis python lib has incomplete type stubs for RedisCluster and does not include `delete`
+        _redis_client: Any = self.init_async_client()
+        # keys is str
+        await _redis_client.delete(key)
+
+    def delete_cache(self, key):
+        self.redis_client.delete(key)
+
+    async def _pipeline_increment_helper(
+        self,
+        pipe: pipeline,
+        increment_list: List[RedisPipelineIncrementOperation],
+    ) -> Optional[List[float]]:
+        """Helper function for pipeline increment operations"""
+        # Iterate through each increment operation and add commands to pipeline
+        for increment_op in increment_list:
+            cache_key = self.check_and_fix_namespace(key=increment_op["key"])
+            print_verbose(
+                f"Increment ASYNC Redis Cache PIPELINE: key: {cache_key}\nValue {increment_op['increment_value']}\nttl={increment_op['ttl']}"
+            )
+            pipe.incrbyfloat(cache_key, increment_op["increment_value"])
+            if increment_op["ttl"] is not None:
+                _td = timedelta(seconds=increment_op["ttl"])
+                pipe.expire(cache_key, _td)
+        # Execute the pipeline and return results
+        results = await pipe.execute()
+        print_verbose(f"Increment ASYNC Redis Cache PIPELINE: results: {results}")
+        return results
+
+    async def async_increment_pipeline(
+        self, increment_list: List[RedisPipelineIncrementOperation], **kwargs
+    ) -> Optional[List[float]]:
+        """
+        Use Redis Pipelines for bulk increment operations
+        Args:
+            increment_list: List of RedisPipelineIncrementOperation dicts containing:
+                - key: str
+                - increment_value: float
+                - ttl_seconds: int
+        """
+        # don't waste a network request if there's nothing to increment
+        if len(increment_list) == 0:
+            return None
+
+        from redis.asyncio import Redis
+
+        _redis_client: Redis = self.init_async_client()  # type: ignore
+        start_time = time.time()
+
+        print_verbose(
+            f"Increment Async Redis Cache Pipeline: increment list: {increment_list}"
+        )
+
+        try:
+            async with _redis_client.pipeline(transaction=False) as pipe:
+                results = await self._pipeline_increment_helper(pipe, increment_list)
+
+            print_verbose(f"pipeline increment results: {results}")
+
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_success_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    call_type="async_increment_pipeline",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+                )
+            )
+            return results
+        except Exception as e:
+            ## LOGGING ##
+            end_time = time.time()
+            _duration = end_time - start_time
+            asyncio.create_task(
+                self.service_logger_obj.async_service_failure_hook(
+                    service=ServiceTypes.REDIS,
+                    duration=_duration,
+                    error=e,
+                    call_type="async_increment_pipeline",
+                    start_time=start_time,
+                    end_time=end_time,
+                    parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+                )
+            )
+            verbose_logger.error(
+                "LiteLLM Redis Caching: async increment_pipeline() - Got exception from REDIS %s",
+                str(e),
+            )
+            raise e
+
+    async def async_get_ttl(self, key: str) -> Optional[int]:
+        """
+        Get the remaining TTL of a key in Redis
+
+        Args:
+            key (str): The key to get TTL for
+
+        Returns:
+            Optional[int]: The remaining TTL in seconds, or None if key doesn't exist
+
+        Redis ref: https://redis.io/docs/latest/commands/ttl/
+        """
+        try:
+            # typed as Any, redis python lib has incomplete type stubs for RedisCluster and does not include `ttl`
+            _redis_client: Any = self.init_async_client()
+            ttl = await _redis_client.ttl(key)
+            if ttl <= -1:  # -1 means the key does not exist, -2 key does not exist
+                return None
+            return ttl
+        except Exception as e:
+            verbose_logger.debug(f"Redis TTL Error: {e}")
+            return None