aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/_redis.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/_redis.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/_redis.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/_redis.py325
1 files changed, 325 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/_redis.py b/.venv/lib/python3.12/site-packages/litellm/_redis.py
new file mode 100644
index 00000000..5b2f85b1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/_redis.py
@@ -0,0 +1,325 @@
+# +-----------------------------------------------+
+# | |
+# | Give Feedback / Get Help |
+# | https://github.com/BerriAI/litellm/issues/new |
+# | |
+# +-----------------------------------------------+
+#
+# Thank you users! We ❤️ you! - Krrish & Ishaan
+
+import inspect
+import json
+
+# s/o [@Frank Colson](https://www.linkedin.com/in/frank-colson-422b9b183/) for this redis implementation
+import os
+from typing import List, Optional, Union
+
+import redis # type: ignore
+import redis.asyncio as async_redis # type: ignore
+
+from litellm import get_secret, get_secret_str
+
+from ._logging import verbose_logger
+
+
+def _get_redis_kwargs():
+ arg_spec = inspect.getfullargspec(redis.Redis)
+
+ # Only allow primitive arguments
+ exclude_args = {
+ "self",
+ "connection_pool",
+ "retry",
+ }
+
+ include_args = ["url"]
+
+ available_args = [x for x in arg_spec.args if x not in exclude_args] + include_args
+
+ return available_args
+
+
+def _get_redis_url_kwargs(client=None):
+ if client is None:
+ client = redis.Redis.from_url
+ arg_spec = inspect.getfullargspec(redis.Redis.from_url)
+
+ # Only allow primitive arguments
+ exclude_args = {
+ "self",
+ "connection_pool",
+ "retry",
+ }
+
+ include_args = ["url"]
+
+ available_args = [x for x in arg_spec.args if x not in exclude_args] + include_args
+
+ return available_args
+
+
+def _get_redis_cluster_kwargs(client=None):
+ if client is None:
+ client = redis.Redis.from_url
+ arg_spec = inspect.getfullargspec(redis.RedisCluster)
+
+ # Only allow primitive arguments
+ exclude_args = {"self", "connection_pool", "retry", "host", "port", "startup_nodes"}
+
+ available_args = [x for x in arg_spec.args if x not in exclude_args]
+ available_args.append("password")
+ available_args.append("username")
+ available_args.append("ssl")
+
+ return available_args
+
+
+def _get_redis_env_kwarg_mapping():
+ PREFIX = "REDIS_"
+
+ return {f"{PREFIX}{x.upper()}": x for x in _get_redis_kwargs()}
+
+
+def _redis_kwargs_from_environment():
+ mapping = _get_redis_env_kwarg_mapping()
+
+ return_dict = {}
+ for k, v in mapping.items():
+ value = get_secret(k, default_value=None) # type: ignore
+ if value is not None:
+ return_dict[v] = value
+ return return_dict
+
+
+def get_redis_url_from_environment():
+ if "REDIS_URL" in os.environ:
+ return os.environ["REDIS_URL"]
+
+ if "REDIS_HOST" not in os.environ or "REDIS_PORT" not in os.environ:
+ raise ValueError(
+ "Either 'REDIS_URL' or both 'REDIS_HOST' and 'REDIS_PORT' must be specified for Redis."
+ )
+
+ if "REDIS_PASSWORD" in os.environ:
+ redis_password = f":{os.environ['REDIS_PASSWORD']}@"
+ else:
+ redis_password = ""
+
+ return (
+ f"redis://{redis_password}{os.environ['REDIS_HOST']}:{os.environ['REDIS_PORT']}"
+ )
+
+
+def _get_redis_client_logic(**env_overrides):
+ """
+ Common functionality across sync + async redis client implementations
+ """
+ ### check if "os.environ/<key-name>" passed in
+ for k, v in env_overrides.items():
+ if isinstance(v, str) and v.startswith("os.environ/"):
+ v = v.replace("os.environ/", "")
+ value = get_secret(v) # type: ignore
+ env_overrides[k] = value
+
+ redis_kwargs = {
+ **_redis_kwargs_from_environment(),
+ **env_overrides,
+ }
+
+ _startup_nodes: Optional[Union[str, list]] = redis_kwargs.get("startup_nodes", None) or get_secret( # type: ignore
+ "REDIS_CLUSTER_NODES"
+ )
+
+ if _startup_nodes is not None and isinstance(_startup_nodes, str):
+ redis_kwargs["startup_nodes"] = json.loads(_startup_nodes)
+
+ _sentinel_nodes: Optional[Union[str, list]] = redis_kwargs.get("sentinel_nodes", None) or get_secret( # type: ignore
+ "REDIS_SENTINEL_NODES"
+ )
+
+ if _sentinel_nodes is not None and isinstance(_sentinel_nodes, str):
+ redis_kwargs["sentinel_nodes"] = json.loads(_sentinel_nodes)
+
+ _sentinel_password: Optional[str] = redis_kwargs.get(
+ "sentinel_password", None
+ ) or get_secret_str("REDIS_SENTINEL_PASSWORD")
+
+ if _sentinel_password is not None:
+ redis_kwargs["sentinel_password"] = _sentinel_password
+
+ _service_name: Optional[str] = redis_kwargs.get("service_name", None) or get_secret( # type: ignore
+ "REDIS_SERVICE_NAME"
+ )
+
+ if _service_name is not None:
+ redis_kwargs["service_name"] = _service_name
+
+ if "url" in redis_kwargs and redis_kwargs["url"] is not None:
+ redis_kwargs.pop("host", None)
+ redis_kwargs.pop("port", None)
+ redis_kwargs.pop("db", None)
+ redis_kwargs.pop("password", None)
+ elif "startup_nodes" in redis_kwargs and redis_kwargs["startup_nodes"] is not None:
+ pass
+ elif (
+ "sentinel_nodes" in redis_kwargs and redis_kwargs["sentinel_nodes"] is not None
+ ):
+ pass
+ elif "host" not in redis_kwargs or redis_kwargs["host"] is None:
+ raise ValueError("Either 'host' or 'url' must be specified for redis.")
+
+ # litellm.print_verbose(f"redis_kwargs: {redis_kwargs}")
+ return redis_kwargs
+
+
+def init_redis_cluster(redis_kwargs) -> redis.RedisCluster:
+ _redis_cluster_nodes_in_env: Optional[str] = get_secret("REDIS_CLUSTER_NODES") # type: ignore
+ if _redis_cluster_nodes_in_env is not None:
+ try:
+ redis_kwargs["startup_nodes"] = json.loads(_redis_cluster_nodes_in_env)
+ except json.JSONDecodeError:
+ raise ValueError(
+ "REDIS_CLUSTER_NODES environment variable is not valid JSON. Please ensure it's properly formatted."
+ )
+
+ verbose_logger.debug("init_redis_cluster: startup nodes are being initialized.")
+ from redis.cluster import ClusterNode
+
+ args = _get_redis_cluster_kwargs()
+ cluster_kwargs = {}
+ for arg in redis_kwargs:
+ if arg in args:
+ cluster_kwargs[arg] = redis_kwargs[arg]
+
+ new_startup_nodes: List[ClusterNode] = []
+
+ for item in redis_kwargs["startup_nodes"]:
+ new_startup_nodes.append(ClusterNode(**item))
+
+ redis_kwargs.pop("startup_nodes")
+ return redis.RedisCluster(startup_nodes=new_startup_nodes, **cluster_kwargs) # type: ignore
+
+
+def _init_redis_sentinel(redis_kwargs) -> redis.Redis:
+ sentinel_nodes = redis_kwargs.get("sentinel_nodes")
+ service_name = redis_kwargs.get("service_name")
+
+ if not sentinel_nodes or not service_name:
+ raise ValueError(
+ "Both 'sentinel_nodes' and 'service_name' are required for Redis Sentinel."
+ )
+
+ verbose_logger.debug("init_redis_sentinel: sentinel nodes are being initialized.")
+
+ # Set up the Sentinel client
+ sentinel = redis.Sentinel(sentinel_nodes, socket_timeout=0.1)
+
+ # Return the master instance for the given service
+
+ return sentinel.master_for(service_name)
+
+
+def _init_async_redis_sentinel(redis_kwargs) -> async_redis.Redis:
+ sentinel_nodes = redis_kwargs.get("sentinel_nodes")
+ sentinel_password = redis_kwargs.get("sentinel_password")
+ service_name = redis_kwargs.get("service_name")
+
+ if not sentinel_nodes or not service_name:
+ raise ValueError(
+ "Both 'sentinel_nodes' and 'service_name' are required for Redis Sentinel."
+ )
+
+ verbose_logger.debug("init_redis_sentinel: sentinel nodes are being initialized.")
+
+ # Set up the Sentinel client
+ sentinel = async_redis.Sentinel(
+ sentinel_nodes,
+ socket_timeout=0.1,
+ password=sentinel_password,
+ )
+
+ # Return the master instance for the given service
+
+ return sentinel.master_for(service_name)
+
+
+def get_redis_client(**env_overrides):
+ redis_kwargs = _get_redis_client_logic(**env_overrides)
+ if "url" in redis_kwargs and redis_kwargs["url"] is not None:
+ args = _get_redis_url_kwargs()
+ url_kwargs = {}
+ for arg in redis_kwargs:
+ if arg in args:
+ url_kwargs[arg] = redis_kwargs[arg]
+
+ return redis.Redis.from_url(**url_kwargs)
+
+ if "startup_nodes" in redis_kwargs or get_secret("REDIS_CLUSTER_NODES") is not None: # type: ignore
+ return init_redis_cluster(redis_kwargs)
+
+ # Check for Redis Sentinel
+ if "sentinel_nodes" in redis_kwargs and "service_name" in redis_kwargs:
+ return _init_redis_sentinel(redis_kwargs)
+
+ return redis.Redis(**redis_kwargs)
+
+
+def get_redis_async_client(
+ **env_overrides,
+) -> async_redis.Redis:
+ redis_kwargs = _get_redis_client_logic(**env_overrides)
+ if "url" in redis_kwargs and redis_kwargs["url"] is not None:
+ args = _get_redis_url_kwargs(client=async_redis.Redis.from_url)
+ url_kwargs = {}
+ for arg in redis_kwargs:
+ if arg in args:
+ url_kwargs[arg] = redis_kwargs[arg]
+ else:
+ verbose_logger.debug(
+ "REDIS: ignoring argument: {}. Not an allowed async_redis.Redis.from_url arg.".format(
+ arg
+ )
+ )
+ return async_redis.Redis.from_url(**url_kwargs)
+
+ if "startup_nodes" in redis_kwargs:
+ from redis.cluster import ClusterNode
+
+ args = _get_redis_cluster_kwargs()
+ cluster_kwargs = {}
+ for arg in redis_kwargs:
+ if arg in args:
+ cluster_kwargs[arg] = redis_kwargs[arg]
+
+ new_startup_nodes: List[ClusterNode] = []
+
+ for item in redis_kwargs["startup_nodes"]:
+ new_startup_nodes.append(ClusterNode(**item))
+ redis_kwargs.pop("startup_nodes")
+ return async_redis.RedisCluster(
+ startup_nodes=new_startup_nodes, **cluster_kwargs # type: ignore
+ )
+
+ # Check for Redis Sentinel
+ if "sentinel_nodes" in redis_kwargs and "service_name" in redis_kwargs:
+ return _init_async_redis_sentinel(redis_kwargs)
+
+ return async_redis.Redis(
+ **redis_kwargs,
+ )
+
+
+def get_redis_connection_pool(**env_overrides):
+ redis_kwargs = _get_redis_client_logic(**env_overrides)
+ verbose_logger.debug("get_redis_connection_pool: redis_kwargs", redis_kwargs)
+ if "url" in redis_kwargs and redis_kwargs["url"] is not None:
+ return async_redis.BlockingConnectionPool.from_url(
+ timeout=5, url=redis_kwargs["url"]
+ )
+ connection_class = async_redis.Connection
+ if "ssl" in redis_kwargs:
+ connection_class = async_redis.SSLConnection
+ redis_kwargs.pop("ssl", None)
+ redis_kwargs["connection_class"] = connection_class
+ redis_kwargs.pop("startup_nodes", None)
+ return async_redis.BlockingConnectionPool(timeout=5, **redis_kwargs)