aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py1047
1 files changed, 1047 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py b/.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py
new file mode 100644
index 00000000..0571ac9f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/caching/redis_cache.py
@@ -0,0 +1,1047 @@
+"""
+Redis Cache implementation
+
+Has 4 primary methods:
+ - set_cache
+ - get_cache
+ - async_set_cache
+ - async_get_cache
+"""
+
+import ast
+import asyncio
+import inspect
+import json
+import time
+from datetime import timedelta
+from typing import TYPE_CHECKING, Any, List, Optional, Tuple, Union
+
+import litellm
+from litellm._logging import print_verbose, verbose_logger
+from litellm.litellm_core_utils.core_helpers import _get_parent_otel_span_from_kwargs
+from litellm.types.caching import RedisPipelineIncrementOperation
+from litellm.types.services import ServiceTypes
+
+from .base_cache import BaseCache
+
+if TYPE_CHECKING:
+ from opentelemetry.trace import Span as _Span
+ from redis.asyncio import Redis, RedisCluster
+ from redis.asyncio.client import Pipeline
+ from redis.asyncio.cluster import ClusterPipeline
+
+ pipeline = Pipeline
+ cluster_pipeline = ClusterPipeline
+ async_redis_client = Redis
+ async_redis_cluster_client = RedisCluster
+ Span = _Span
+else:
+ pipeline = Any
+ cluster_pipeline = Any
+ async_redis_client = Any
+ async_redis_cluster_client = Any
+ Span = Any
+
+
+class RedisCache(BaseCache):
+ # if users don't provider one, use the default litellm cache
+
+ def __init__(
+ self,
+ host=None,
+ port=None,
+ password=None,
+ redis_flush_size: Optional[int] = 100,
+ namespace: Optional[str] = None,
+ startup_nodes: Optional[List] = None, # for redis-cluster
+ socket_timeout: Optional[float] = 5.0, # default 5 second timeout
+ **kwargs,
+ ):
+
+ from litellm._service_logger import ServiceLogging
+
+ from .._redis import get_redis_client, get_redis_connection_pool
+
+ redis_kwargs = {}
+ if host is not None:
+ redis_kwargs["host"] = host
+ if port is not None:
+ redis_kwargs["port"] = port
+ if password is not None:
+ redis_kwargs["password"] = password
+ if startup_nodes is not None:
+ redis_kwargs["startup_nodes"] = startup_nodes
+ if socket_timeout is not None:
+ redis_kwargs["socket_timeout"] = socket_timeout
+
+ ### HEALTH MONITORING OBJECT ###
+ if kwargs.get("service_logger_obj", None) is not None and isinstance(
+ kwargs["service_logger_obj"], ServiceLogging
+ ):
+ self.service_logger_obj = kwargs.pop("service_logger_obj")
+ else:
+ self.service_logger_obj = ServiceLogging()
+
+ redis_kwargs.update(kwargs)
+ self.redis_client = get_redis_client(**redis_kwargs)
+ self.redis_async_client: Optional[async_redis_client] = None
+ self.redis_kwargs = redis_kwargs
+ self.async_redis_conn_pool = get_redis_connection_pool(**redis_kwargs)
+
+ # redis namespaces
+ self.namespace = namespace
+ # for high traffic, we store the redis results in memory and then batch write to redis
+ self.redis_batch_writing_buffer: list = []
+ if redis_flush_size is None:
+ self.redis_flush_size: int = 100
+ else:
+ self.redis_flush_size = redis_flush_size
+ self.redis_version = "Unknown"
+ try:
+ if not inspect.iscoroutinefunction(self.redis_client):
+ self.redis_version = self.redis_client.info()["redis_version"] # type: ignore
+ except Exception:
+ pass
+
+ ### ASYNC HEALTH PING ###
+ try:
+ # asyncio.get_running_loop().create_task(self.ping())
+ _ = asyncio.get_running_loop().create_task(self.ping())
+ except Exception as e:
+ if "no running event loop" in str(e):
+ verbose_logger.debug(
+ "Ignoring async redis ping. No running event loop."
+ )
+ else:
+ verbose_logger.error(
+ "Error connecting to Async Redis client - {}".format(str(e)),
+ extra={"error": str(e)},
+ )
+
+ ### SYNC HEALTH PING ###
+ try:
+ if hasattr(self.redis_client, "ping"):
+ self.redis_client.ping() # type: ignore
+ except Exception as e:
+ verbose_logger.error(
+ "Error connecting to Sync Redis client", extra={"error": str(e)}
+ )
+
+ if litellm.default_redis_ttl is not None:
+ super().__init__(default_ttl=int(litellm.default_redis_ttl))
+ else:
+ super().__init__() # defaults to 60s
+
+ def init_async_client(
+ self,
+ ) -> Union[async_redis_client, async_redis_cluster_client]:
+ from .._redis import get_redis_async_client
+
+ if self.redis_async_client is None:
+ self.redis_async_client = get_redis_async_client(
+ connection_pool=self.async_redis_conn_pool, **self.redis_kwargs
+ )
+ return self.redis_async_client
+
+ def check_and_fix_namespace(self, key: str) -> str:
+ """
+ Make sure each key starts with the given namespace
+ """
+ if self.namespace is not None and not key.startswith(self.namespace):
+ key = self.namespace + ":" + key
+
+ return key
+
+ def set_cache(self, key, value, **kwargs):
+ ttl = self.get_ttl(**kwargs)
+ print_verbose(
+ f"Set Redis Cache: key: {key}\nValue {value}\nttl={ttl}, redis_version={self.redis_version}"
+ )
+ key = self.check_and_fix_namespace(key=key)
+ try:
+ start_time = time.time()
+ self.redis_client.set(name=key, value=str(value), ex=ttl)
+ end_time = time.time()
+ _duration = end_time - start_time
+ self.service_logger_obj.service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="set_cache",
+ start_time=start_time,
+ end_time=end_time,
+ )
+ except Exception as e:
+ # NON blocking - notify users Redis is throwing an exception
+ print_verbose(
+ f"litellm.caching.caching: set() - Got exception from REDIS : {str(e)}"
+ )
+
+ def increment_cache(
+ self, key, value: int, ttl: Optional[float] = None, **kwargs
+ ) -> int:
+ _redis_client = self.redis_client
+ start_time = time.time()
+ set_ttl = self.get_ttl(ttl=ttl)
+ try:
+ start_time = time.time()
+ result: int = _redis_client.incr(name=key, amount=value) # type: ignore
+ end_time = time.time()
+ _duration = end_time - start_time
+ self.service_logger_obj.service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="increment_cache",
+ start_time=start_time,
+ end_time=end_time,
+ )
+
+ if set_ttl is not None:
+ # check if key already has ttl, if not -> set ttl
+ start_time = time.time()
+ current_ttl = _redis_client.ttl(key)
+ end_time = time.time()
+ _duration = end_time - start_time
+ self.service_logger_obj.service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="increment_cache_ttl",
+ start_time=start_time,
+ end_time=end_time,
+ )
+ if current_ttl == -1:
+ # Key has no expiration
+ start_time = time.time()
+ _redis_client.expire(key, set_ttl) # type: ignore
+ end_time = time.time()
+ _duration = end_time - start_time
+ self.service_logger_obj.service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="increment_cache_expire",
+ start_time=start_time,
+ end_time=end_time,
+ )
+ return result
+ except Exception as e:
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ verbose_logger.error(
+ "LiteLLM Redis Caching: increment_cache() - Got exception from REDIS %s, Writing value=%s",
+ str(e),
+ value,
+ )
+ raise e
+
+ async def async_scan_iter(self, pattern: str, count: int = 100) -> list:
+ from redis.asyncio import Redis
+
+ start_time = time.time()
+ try:
+ keys = []
+ _redis_client: Redis = self.init_async_client() # type: ignore
+
+ async for key in _redis_client.scan_iter(match=pattern + "*", count=count):
+ keys.append(key)
+ if len(keys) >= count:
+ break
+
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="async_scan_iter",
+ start_time=start_time,
+ end_time=end_time,
+ )
+ ) # DO NOT SLOW DOWN CALL B/C OF THIS
+ return keys
+ except Exception as e:
+ # NON blocking - notify users Redis is throwing an exception
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_failure_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ error=e,
+ call_type="async_scan_iter",
+ start_time=start_time,
+ end_time=end_time,
+ )
+ )
+ raise e
+
+ async def async_set_cache(self, key, value, **kwargs):
+ from redis.asyncio import Redis
+
+ start_time = time.time()
+ try:
+ _redis_client: Redis = self.init_async_client() # type: ignore
+ except Exception as e:
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_failure_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ error=e,
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+ call_type="async_set_cache",
+ )
+ )
+ verbose_logger.error(
+ "LiteLLM Redis Caching: async set() - Got exception from REDIS %s, Writing value=%s",
+ str(e),
+ value,
+ )
+ raise e
+
+ key = self.check_and_fix_namespace(key=key)
+ ttl = self.get_ttl(**kwargs)
+ print_verbose(f"Set ASYNC Redis Cache: key: {key}\nValue {value}\nttl={ttl}")
+
+ try:
+ if not hasattr(_redis_client, "set"):
+ raise Exception("Redis client cannot set cache. Attribute not found.")
+ await _redis_client.set(name=key, value=json.dumps(value), ex=ttl)
+ print_verbose(
+ f"Successfully Set ASYNC Redis Cache: key: {key}\nValue {value}\nttl={ttl}"
+ )
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="async_set_cache",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+ event_metadata={"key": key},
+ )
+ )
+ except Exception as e:
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_failure_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ error=e,
+ call_type="async_set_cache",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+ event_metadata={"key": key},
+ )
+ )
+ verbose_logger.error(
+ "LiteLLM Redis Caching: async set() - Got exception from REDIS %s, Writing value=%s",
+ str(e),
+ value,
+ )
+
+ async def _pipeline_helper(
+ self,
+ pipe: Union[pipeline, cluster_pipeline],
+ cache_list: List[Tuple[Any, Any]],
+ ttl: Optional[float],
+ ) -> List:
+ """
+ Helper function for executing a pipeline of set operations on Redis
+ """
+ ttl = self.get_ttl(ttl=ttl)
+ # Iterate through each key-value pair in the cache_list and set them in the pipeline.
+ for cache_key, cache_value in cache_list:
+ cache_key = self.check_and_fix_namespace(key=cache_key)
+ print_verbose(
+ f"Set ASYNC Redis Cache PIPELINE: key: {cache_key}\nValue {cache_value}\nttl={ttl}"
+ )
+ json_cache_value = json.dumps(cache_value)
+ # Set the value with a TTL if it's provided.
+ _td: Optional[timedelta] = None
+ if ttl is not None:
+ _td = timedelta(seconds=ttl)
+ pipe.set( # type: ignore
+ name=cache_key,
+ value=json_cache_value,
+ ex=_td,
+ )
+ # Execute the pipeline and return the results.
+ results = await pipe.execute()
+ return results
+
+ async def async_set_cache_pipeline(
+ self, cache_list: List[Tuple[Any, Any]], ttl: Optional[float] = None, **kwargs
+ ):
+ """
+ Use Redis Pipelines for bulk write operations
+ """
+ # don't waste a network request if there's nothing to set
+ if len(cache_list) == 0:
+ return
+
+ _redis_client = self.init_async_client()
+ start_time = time.time()
+
+ print_verbose(
+ f"Set Async Redis Cache: key list: {cache_list}\nttl={ttl}, redis_version={self.redis_version}"
+ )
+ cache_value: Any = None
+ try:
+ async with _redis_client.pipeline(transaction=False) as pipe:
+ results = await self._pipeline_helper(pipe, cache_list, ttl)
+
+ print_verbose(f"pipeline results: {results}")
+ # Optionally, you could process 'results' to make sure that all set operations were successful.
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="async_set_cache_pipeline",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+ )
+ )
+ return None
+ except Exception as e:
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_failure_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ error=e,
+ call_type="async_set_cache_pipeline",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+ )
+ )
+
+ verbose_logger.error(
+ "LiteLLM Redis Caching: async set_cache_pipeline() - Got exception from REDIS %s, Writing value=%s",
+ str(e),
+ cache_value,
+ )
+
+ async def _set_cache_sadd_helper(
+ self,
+ redis_client: async_redis_client,
+ key: str,
+ value: List,
+ ttl: Optional[float],
+ ) -> None:
+ """Helper function for async_set_cache_sadd. Separated for testing."""
+ ttl = self.get_ttl(ttl=ttl)
+ try:
+ await redis_client.sadd(key, *value) # type: ignore
+ if ttl is not None:
+ _td = timedelta(seconds=ttl)
+ await redis_client.expire(key, _td)
+ except Exception:
+ raise
+
+ async def async_set_cache_sadd(
+ self, key, value: List, ttl: Optional[float], **kwargs
+ ):
+ from redis.asyncio import Redis
+
+ start_time = time.time()
+ try:
+ _redis_client: Redis = self.init_async_client() # type: ignore
+ except Exception as e:
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_failure_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ error=e,
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+ call_type="async_set_cache_sadd",
+ )
+ )
+ # NON blocking - notify users Redis is throwing an exception
+ verbose_logger.error(
+ "LiteLLM Redis Caching: async set() - Got exception from REDIS %s, Writing value=%s",
+ str(e),
+ value,
+ )
+ raise e
+
+ key = self.check_and_fix_namespace(key=key)
+ print_verbose(f"Set ASYNC Redis Cache: key: {key}\nValue {value}\nttl={ttl}")
+ try:
+ await self._set_cache_sadd_helper(
+ redis_client=_redis_client, key=key, value=value, ttl=ttl
+ )
+ print_verbose(
+ f"Successfully Set ASYNC Redis Cache SADD: key: {key}\nValue {value}\nttl={ttl}"
+ )
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="async_set_cache_sadd",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+ )
+ )
+ except Exception as e:
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_failure_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ error=e,
+ call_type="async_set_cache_sadd",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+ )
+ )
+ # NON blocking - notify users Redis is throwing an exception
+ verbose_logger.error(
+ "LiteLLM Redis Caching: async set_cache_sadd() - Got exception from REDIS %s, Writing value=%s",
+ str(e),
+ value,
+ )
+
+ async def batch_cache_write(self, key, value, **kwargs):
+ print_verbose(
+ f"in batch cache writing for redis buffer size={len(self.redis_batch_writing_buffer)}",
+ )
+ key = self.check_and_fix_namespace(key=key)
+ self.redis_batch_writing_buffer.append((key, value))
+ if len(self.redis_batch_writing_buffer) >= self.redis_flush_size:
+ await self.flush_cache_buffer() # logging done in here
+
+ async def async_increment(
+ self,
+ key,
+ value: float,
+ ttl: Optional[int] = None,
+ parent_otel_span: Optional[Span] = None,
+ ) -> float:
+ from redis.asyncio import Redis
+
+ _redis_client: Redis = self.init_async_client() # type: ignore
+ start_time = time.time()
+ _used_ttl = self.get_ttl(ttl=ttl)
+ key = self.check_and_fix_namespace(key=key)
+ try:
+ result = await _redis_client.incrbyfloat(name=key, amount=value)
+ if _used_ttl is not None:
+ # check if key already has ttl, if not -> set ttl
+ current_ttl = await _redis_client.ttl(key)
+ if current_ttl == -1:
+ # Key has no expiration
+ await _redis_client.expire(key, _used_ttl)
+
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+
+ asyncio.create_task(
+ self.service_logger_obj.async_service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="async_increment",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=parent_otel_span,
+ )
+ )
+ return result
+ except Exception as e:
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_failure_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ error=e,
+ call_type="async_increment",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=parent_otel_span,
+ )
+ )
+ verbose_logger.error(
+ "LiteLLM Redis Caching: async async_increment() - Got exception from REDIS %s, Writing value=%s",
+ str(e),
+ value,
+ )
+ raise e
+
+ async def flush_cache_buffer(self):
+ print_verbose(
+ f"flushing to redis....reached size of buffer {len(self.redis_batch_writing_buffer)}"
+ )
+ await self.async_set_cache_pipeline(self.redis_batch_writing_buffer)
+ self.redis_batch_writing_buffer = []
+
+ def _get_cache_logic(self, cached_response: Any):
+ """
+ Common 'get_cache_logic' across sync + async redis client implementations
+ """
+ if cached_response is None:
+ return cached_response
+ # cached_response is in `b{} convert it to ModelResponse
+ cached_response = cached_response.decode("utf-8") # Convert bytes to string
+ try:
+ cached_response = json.loads(
+ cached_response
+ ) # Convert string to dictionary
+ except Exception:
+ cached_response = ast.literal_eval(cached_response)
+ return cached_response
+
+ def get_cache(self, key, parent_otel_span: Optional[Span] = None, **kwargs):
+ try:
+ key = self.check_and_fix_namespace(key=key)
+ print_verbose(f"Get Redis Cache: key: {key}")
+ start_time = time.time()
+ cached_response = self.redis_client.get(key)
+ end_time = time.time()
+ _duration = end_time - start_time
+ self.service_logger_obj.service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="get_cache",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=parent_otel_span,
+ )
+ print_verbose(
+ f"Got Redis Cache: key: {key}, cached_response {cached_response}"
+ )
+ return self._get_cache_logic(cached_response=cached_response)
+ except Exception as e:
+ # NON blocking - notify users Redis is throwing an exception
+ verbose_logger.error(
+ "litellm.caching.caching: get() - Got exception from REDIS: ", e
+ )
+
+ def _run_redis_mget_operation(self, keys: List[str]) -> List[Any]:
+ """
+ Wrapper to call `mget` on the redis client
+
+ We use a wrapper so RedisCluster can override this method
+ """
+ return self.redis_client.mget(keys=keys) # type: ignore
+
+ async def _async_run_redis_mget_operation(self, keys: List[str]) -> List[Any]:
+ """
+ Wrapper to call `mget` on the redis client
+
+ We use a wrapper so RedisCluster can override this method
+ """
+ async_redis_client = self.init_async_client()
+ return await async_redis_client.mget(keys=keys) # type: ignore
+
+ def batch_get_cache(
+ self,
+ key_list: Union[List[str], List[Optional[str]]],
+ parent_otel_span: Optional[Span] = None,
+ ) -> dict:
+ """
+ Use Redis for bulk read operations
+
+ Args:
+ key_list: List of keys to get from Redis
+ parent_otel_span: Optional parent OpenTelemetry span
+
+ Returns:
+ dict: A dictionary mapping keys to their cached values
+ """
+ key_value_dict = {}
+ _key_list = [key for key in key_list if key is not None]
+
+ try:
+ _keys = []
+ for cache_key in _key_list:
+ cache_key = self.check_and_fix_namespace(key=cache_key or "")
+ _keys.append(cache_key)
+ start_time = time.time()
+ results: List = self._run_redis_mget_operation(keys=_keys)
+ end_time = time.time()
+ _duration = end_time - start_time
+ self.service_logger_obj.service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="batch_get_cache",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=parent_otel_span,
+ )
+
+ # Associate the results back with their keys.
+ # 'results' is a list of values corresponding to the order of keys in '_key_list'.
+ key_value_dict = dict(zip(_key_list, results))
+
+ decoded_results = {}
+ for k, v in key_value_dict.items():
+ if isinstance(k, bytes):
+ k = k.decode("utf-8")
+ v = self._get_cache_logic(v)
+ decoded_results[k] = v
+
+ return decoded_results
+ except Exception as e:
+ verbose_logger.error(f"Error occurred in batch get cache - {str(e)}")
+ return key_value_dict
+
+ async def async_get_cache(
+ self, key, parent_otel_span: Optional[Span] = None, **kwargs
+ ):
+ from redis.asyncio import Redis
+
+ _redis_client: Redis = self.init_async_client() # type: ignore
+ key = self.check_and_fix_namespace(key=key)
+ start_time = time.time()
+
+ try:
+ print_verbose(f"Get Async Redis Cache: key: {key}")
+ cached_response = await _redis_client.get(key)
+ print_verbose(
+ f"Got Async Redis Cache: key: {key}, cached_response {cached_response}"
+ )
+ response = self._get_cache_logic(cached_response=cached_response)
+
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="async_get_cache",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=parent_otel_span,
+ event_metadata={"key": key},
+ )
+ )
+ return response
+ except Exception as e:
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_failure_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ error=e,
+ call_type="async_get_cache",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=parent_otel_span,
+ event_metadata={"key": key},
+ )
+ )
+ print_verbose(
+ f"litellm.caching.caching: async get() - Got exception from REDIS: {str(e)}"
+ )
+
+ async def async_batch_get_cache(
+ self,
+ key_list: Union[List[str], List[Optional[str]]],
+ parent_otel_span: Optional[Span] = None,
+ ) -> dict:
+ """
+ Use Redis for bulk read operations
+
+ Args:
+ key_list: List of keys to get from Redis
+ parent_otel_span: Optional parent OpenTelemetry span
+
+ Returns:
+ dict: A dictionary mapping keys to their cached values
+
+ `.mget` does not support None keys. This will filter out None keys.
+ """
+ # typed as Any, redis python lib has incomplete type stubs for RedisCluster and does not include `mget`
+ key_value_dict = {}
+ start_time = time.time()
+ _key_list = [key for key in key_list if key is not None]
+ try:
+ _keys = []
+ for cache_key in _key_list:
+ cache_key = self.check_and_fix_namespace(key=cache_key)
+ _keys.append(cache_key)
+ results = await self._async_run_redis_mget_operation(keys=_keys)
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="async_batch_get_cache",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=parent_otel_span,
+ )
+ )
+
+ # Associate the results back with their keys.
+ # 'results' is a list of values corresponding to the order of keys in 'key_list'.
+ key_value_dict = dict(zip(_key_list, results))
+
+ decoded_results = {}
+ for k, v in key_value_dict.items():
+ if isinstance(k, bytes):
+ k = k.decode("utf-8")
+ v = self._get_cache_logic(v)
+ decoded_results[k] = v
+
+ return decoded_results
+ except Exception as e:
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_failure_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ error=e,
+ call_type="async_batch_get_cache",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=parent_otel_span,
+ )
+ )
+ verbose_logger.error(f"Error occurred in async batch get cache - {str(e)}")
+ return key_value_dict
+
+ def sync_ping(self) -> bool:
+ """
+ Tests if the sync redis client is correctly setup.
+ """
+ print_verbose("Pinging Sync Redis Cache")
+ start_time = time.time()
+ try:
+ response: bool = self.redis_client.ping() # type: ignore
+ print_verbose(f"Redis Cache PING: {response}")
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ self.service_logger_obj.service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="sync_ping",
+ start_time=start_time,
+ end_time=end_time,
+ )
+ return response
+ except Exception as e:
+ # NON blocking - notify users Redis is throwing an exception
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ self.service_logger_obj.service_failure_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ error=e,
+ call_type="sync_ping",
+ )
+ verbose_logger.error(
+ f"LiteLLM Redis Cache PING: - Got exception from REDIS : {str(e)}"
+ )
+ raise e
+
+ async def ping(self) -> bool:
+ # typed as Any, redis python lib has incomplete type stubs for RedisCluster and does not include `ping`
+ _redis_client: Any = self.init_async_client()
+ start_time = time.time()
+ print_verbose("Pinging Async Redis Cache")
+ try:
+ response = await _redis_client.ping()
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="async_ping",
+ )
+ )
+ return response
+ except Exception as e:
+ # NON blocking - notify users Redis is throwing an exception
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_failure_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ error=e,
+ call_type="async_ping",
+ )
+ )
+ verbose_logger.error(
+ f"LiteLLM Redis Cache PING: - Got exception from REDIS : {str(e)}"
+ )
+ raise e
+
+ async def delete_cache_keys(self, keys):
+ # typed as Any, redis python lib has incomplete type stubs for RedisCluster and does not include `delete`
+ _redis_client: Any = self.init_async_client()
+ # keys is a list, unpack it so it gets passed as individual elements to delete
+ await _redis_client.delete(*keys)
+
+ def client_list(self) -> List:
+ client_list: List = self.redis_client.client_list() # type: ignore
+ return client_list
+
+ def info(self):
+ info = self.redis_client.info()
+ return info
+
+ def flush_cache(self):
+ self.redis_client.flushall()
+
+ def flushall(self):
+ self.redis_client.flushall()
+
+ async def disconnect(self):
+ await self.async_redis_conn_pool.disconnect(inuse_connections=True)
+
+ async def async_delete_cache(self, key: str):
+ # typed as Any, redis python lib has incomplete type stubs for RedisCluster and does not include `delete`
+ _redis_client: Any = self.init_async_client()
+ # keys is str
+ await _redis_client.delete(key)
+
+ def delete_cache(self, key):
+ self.redis_client.delete(key)
+
+ async def _pipeline_increment_helper(
+ self,
+ pipe: pipeline,
+ increment_list: List[RedisPipelineIncrementOperation],
+ ) -> Optional[List[float]]:
+ """Helper function for pipeline increment operations"""
+ # Iterate through each increment operation and add commands to pipeline
+ for increment_op in increment_list:
+ cache_key = self.check_and_fix_namespace(key=increment_op["key"])
+ print_verbose(
+ f"Increment ASYNC Redis Cache PIPELINE: key: {cache_key}\nValue {increment_op['increment_value']}\nttl={increment_op['ttl']}"
+ )
+ pipe.incrbyfloat(cache_key, increment_op["increment_value"])
+ if increment_op["ttl"] is not None:
+ _td = timedelta(seconds=increment_op["ttl"])
+ pipe.expire(cache_key, _td)
+ # Execute the pipeline and return results
+ results = await pipe.execute()
+ print_verbose(f"Increment ASYNC Redis Cache PIPELINE: results: {results}")
+ return results
+
+ async def async_increment_pipeline(
+ self, increment_list: List[RedisPipelineIncrementOperation], **kwargs
+ ) -> Optional[List[float]]:
+ """
+ Use Redis Pipelines for bulk increment operations
+ Args:
+ increment_list: List of RedisPipelineIncrementOperation dicts containing:
+ - key: str
+ - increment_value: float
+ - ttl_seconds: int
+ """
+ # don't waste a network request if there's nothing to increment
+ if len(increment_list) == 0:
+ return None
+
+ from redis.asyncio import Redis
+
+ _redis_client: Redis = self.init_async_client() # type: ignore
+ start_time = time.time()
+
+ print_verbose(
+ f"Increment Async Redis Cache Pipeline: increment list: {increment_list}"
+ )
+
+ try:
+ async with _redis_client.pipeline(transaction=False) as pipe:
+ results = await self._pipeline_increment_helper(pipe, increment_list)
+
+ print_verbose(f"pipeline increment results: {results}")
+
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_success_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ call_type="async_increment_pipeline",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+ )
+ )
+ return results
+ except Exception as e:
+ ## LOGGING ##
+ end_time = time.time()
+ _duration = end_time - start_time
+ asyncio.create_task(
+ self.service_logger_obj.async_service_failure_hook(
+ service=ServiceTypes.REDIS,
+ duration=_duration,
+ error=e,
+ call_type="async_increment_pipeline",
+ start_time=start_time,
+ end_time=end_time,
+ parent_otel_span=_get_parent_otel_span_from_kwargs(kwargs),
+ )
+ )
+ verbose_logger.error(
+ "LiteLLM Redis Caching: async increment_pipeline() - Got exception from REDIS %s",
+ str(e),
+ )
+ raise e
+
+ async def async_get_ttl(self, key: str) -> Optional[int]:
+ """
+ Get the remaining TTL of a key in Redis
+
+ Args:
+ key (str): The key to get TTL for
+
+ Returns:
+ Optional[int]: The remaining TTL in seconds, or None if key doesn't exist
+
+ Redis ref: https://redis.io/docs/latest/commands/ttl/
+ """
+ try:
+ # typed as Any, redis python lib has incomplete type stubs for RedisCluster and does not include `ttl`
+ _redis_client: Any = self.init_async_client()
+ ttl = await _redis_client.ttl(key)
+ if ttl <= -1: # -1 means the key does not exist, -2 key does not exist
+ return None
+ return ttl
+ except Exception as e:
+ verbose_logger.debug(f"Redis TTL Error: {e}")
+ return None