about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/proxy/hooks/dynamic_rate_limiter.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/proxy/hooks/dynamic_rate_limiter.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/proxy/hooks/dynamic_rate_limiter.py298
1 files changed, 298 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/proxy/hooks/dynamic_rate_limiter.py b/.venv/lib/python3.12/site-packages/litellm/proxy/hooks/dynamic_rate_limiter.py
new file mode 100644
index 00000000..15a9bc1b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/proxy/hooks/dynamic_rate_limiter.py
@@ -0,0 +1,298 @@
+# What is this?
+## Allocates dynamic tpm/rpm quota for a project based on current traffic
+## Tracks num active projects per minute
+
+import asyncio
+import os
+from typing import List, Literal, Optional, Tuple, Union
+
+from fastapi import HTTPException
+
+import litellm
+from litellm import ModelResponse, Router
+from litellm._logging import verbose_proxy_logger
+from litellm.caching.caching import DualCache
+from litellm.integrations.custom_logger import CustomLogger
+from litellm.proxy._types import UserAPIKeyAuth
+from litellm.types.router import ModelGroupInfo
+from litellm.utils import get_utc_datetime
+
+
+class DynamicRateLimiterCache:
+    """
+    Thin wrapper on DualCache for this file.
+
+    Track number of active projects calling a model.
+    """
+
+    def __init__(self, cache: DualCache) -> None:
+        self.cache = cache
+        self.ttl = 60  # 1 min ttl
+
+    async def async_get_cache(self, model: str) -> Optional[int]:
+        dt = get_utc_datetime()
+        current_minute = dt.strftime("%H-%M")
+        key_name = "{}:{}".format(current_minute, model)
+        _response = await self.cache.async_get_cache(key=key_name)
+        response: Optional[int] = None
+        if _response is not None:
+            response = len(_response)
+        return response
+
+    async def async_set_cache_sadd(self, model: str, value: List):
+        """
+        Add value to set.
+
+        Parameters:
+        - model: str, the name of the model group
+        - value: str, the team id
+
+        Returns:
+        - None
+
+        Raises:
+        - Exception, if unable to connect to cache client (if redis caching enabled)
+        """
+        try:
+            dt = get_utc_datetime()
+            current_minute = dt.strftime("%H-%M")
+
+            key_name = "{}:{}".format(current_minute, model)
+            await self.cache.async_set_cache_sadd(
+                key=key_name, value=value, ttl=self.ttl
+            )
+        except Exception as e:
+            verbose_proxy_logger.exception(
+                "litellm.proxy.hooks.dynamic_rate_limiter.py::async_set_cache_sadd(): Exception occured - {}".format(
+                    str(e)
+                )
+            )
+            raise e
+
+
+class _PROXY_DynamicRateLimitHandler(CustomLogger):
+
+    # Class variables or attributes
+    def __init__(self, internal_usage_cache: DualCache):
+        self.internal_usage_cache = DynamicRateLimiterCache(cache=internal_usage_cache)
+
+    def update_variables(self, llm_router: Router):
+        self.llm_router = llm_router
+
+    async def check_available_usage(
+        self, model: str, priority: Optional[str] = None
+    ) -> Tuple[
+        Optional[int], Optional[int], Optional[int], Optional[int], Optional[int]
+    ]:
+        """
+        For a given model, get its available tpm
+
+        Params:
+        - model: str, the name of the model in the router model_list
+        - priority: Optional[str], the priority for the request.
+
+        Returns
+        - Tuple[available_tpm, available_tpm, model_tpm, model_rpm, active_projects]
+            - available_tpm: int or null - always 0 or positive.
+            - available_tpm: int or null - always 0 or positive.
+            - remaining_model_tpm: int or null. If available tpm is int, then this will be too.
+            - remaining_model_rpm: int or null. If available rpm is int, then this will be too.
+            - active_projects: int or null
+        """
+        try:
+            weight: float = 1
+            if (
+                litellm.priority_reservation is None
+                or priority not in litellm.priority_reservation
+            ):
+                verbose_proxy_logger.error(
+                    "Priority Reservation not set. priority={}, but litellm.priority_reservation is {}.".format(
+                        priority, litellm.priority_reservation
+                    )
+                )
+            elif priority is not None and litellm.priority_reservation is not None:
+                if os.getenv("LITELLM_LICENSE", None) is None:
+                    verbose_proxy_logger.error(
+                        "PREMIUM FEATURE: Reserving tpm/rpm by priority is a premium feature. Please add a 'LITELLM_LICENSE' to your .env to enable this.\nGet a license: https://docs.litellm.ai/docs/proxy/enterprise."
+                    )
+                else:
+                    weight = litellm.priority_reservation[priority]
+
+            active_projects = await self.internal_usage_cache.async_get_cache(
+                model=model
+            )
+            current_model_tpm, current_model_rpm = (
+                await self.llm_router.get_model_group_usage(model_group=model)
+            )
+            model_group_info: Optional[ModelGroupInfo] = (
+                self.llm_router.get_model_group_info(model_group=model)
+            )
+            total_model_tpm: Optional[int] = None
+            total_model_rpm: Optional[int] = None
+            if model_group_info is not None:
+                if model_group_info.tpm is not None:
+                    total_model_tpm = model_group_info.tpm
+                if model_group_info.rpm is not None:
+                    total_model_rpm = model_group_info.rpm
+
+            remaining_model_tpm: Optional[int] = None
+            if total_model_tpm is not None and current_model_tpm is not None:
+                remaining_model_tpm = total_model_tpm - current_model_tpm
+            elif total_model_tpm is not None:
+                remaining_model_tpm = total_model_tpm
+
+            remaining_model_rpm: Optional[int] = None
+            if total_model_rpm is not None and current_model_rpm is not None:
+                remaining_model_rpm = total_model_rpm - current_model_rpm
+            elif total_model_rpm is not None:
+                remaining_model_rpm = total_model_rpm
+
+            available_tpm: Optional[int] = None
+
+            if remaining_model_tpm is not None:
+                if active_projects is not None:
+                    available_tpm = int(remaining_model_tpm * weight / active_projects)
+                else:
+                    available_tpm = int(remaining_model_tpm * weight)
+
+            if available_tpm is not None and available_tpm < 0:
+                available_tpm = 0
+
+            available_rpm: Optional[int] = None
+
+            if remaining_model_rpm is not None:
+                if active_projects is not None:
+                    available_rpm = int(remaining_model_rpm * weight / active_projects)
+                else:
+                    available_rpm = int(remaining_model_rpm * weight)
+
+            if available_rpm is not None and available_rpm < 0:
+                available_rpm = 0
+            return (
+                available_tpm,
+                available_rpm,
+                remaining_model_tpm,
+                remaining_model_rpm,
+                active_projects,
+            )
+        except Exception as e:
+            verbose_proxy_logger.exception(
+                "litellm.proxy.hooks.dynamic_rate_limiter.py::check_available_usage: Exception occurred - {}".format(
+                    str(e)
+                )
+            )
+            return None, None, None, None, None
+
+    async def async_pre_call_hook(
+        self,
+        user_api_key_dict: UserAPIKeyAuth,
+        cache: DualCache,
+        data: dict,
+        call_type: Literal[
+            "completion",
+            "text_completion",
+            "embeddings",
+            "image_generation",
+            "moderation",
+            "audio_transcription",
+            "pass_through_endpoint",
+            "rerank",
+        ],
+    ) -> Optional[
+        Union[Exception, str, dict]
+    ]:  # raise exception if invalid, return a str for the user to receive - if rejected, or return a modified dictionary for passing into litellm
+        """
+        - For a model group
+        - Check if tpm/rpm available
+        - Raise RateLimitError if no tpm/rpm available
+        """
+        if "model" in data:
+            key_priority: Optional[str] = user_api_key_dict.metadata.get(
+                "priority", None
+            )
+            available_tpm, available_rpm, model_tpm, model_rpm, active_projects = (
+                await self.check_available_usage(
+                    model=data["model"], priority=key_priority
+                )
+            )
+            ### CHECK TPM ###
+            if available_tpm is not None and available_tpm == 0:
+                raise HTTPException(
+                    status_code=429,
+                    detail={
+                        "error": "Key={} over available TPM={}. Model TPM={}, Active keys={}".format(
+                            user_api_key_dict.api_key,
+                            available_tpm,
+                            model_tpm,
+                            active_projects,
+                        )
+                    },
+                )
+            ### CHECK RPM ###
+            elif available_rpm is not None and available_rpm == 0:
+                raise HTTPException(
+                    status_code=429,
+                    detail={
+                        "error": "Key={} over available RPM={}. Model RPM={}, Active keys={}".format(
+                            user_api_key_dict.api_key,
+                            available_rpm,
+                            model_rpm,
+                            active_projects,
+                        )
+                    },
+                )
+            elif available_rpm is not None or available_tpm is not None:
+                ## UPDATE CACHE WITH ACTIVE PROJECT
+                asyncio.create_task(
+                    self.internal_usage_cache.async_set_cache_sadd(  # this is a set
+                        model=data["model"],  # type: ignore
+                        value=[user_api_key_dict.token or "default_key"],
+                    )
+                )
+        return None
+
+    async def async_post_call_success_hook(
+        self, data: dict, user_api_key_dict: UserAPIKeyAuth, response
+    ):
+        try:
+            if isinstance(response, ModelResponse):
+                model_info = self.llm_router.get_model_info(
+                    id=response._hidden_params["model_id"]
+                )
+                assert (
+                    model_info is not None
+                ), "Model info for model with id={} is None".format(
+                    response._hidden_params["model_id"]
+                )
+                key_priority: Optional[str] = user_api_key_dict.metadata.get(
+                    "priority", None
+                )
+                available_tpm, available_rpm, model_tpm, model_rpm, active_projects = (
+                    await self.check_available_usage(
+                        model=model_info["model_name"], priority=key_priority
+                    )
+                )
+                response._hidden_params["additional_headers"] = (
+                    {  # Add additional response headers - easier debugging
+                        "x-litellm-model_group": model_info["model_name"],
+                        "x-ratelimit-remaining-litellm-project-tokens": available_tpm,
+                        "x-ratelimit-remaining-litellm-project-requests": available_rpm,
+                        "x-ratelimit-remaining-model-tokens": model_tpm,
+                        "x-ratelimit-remaining-model-requests": model_rpm,
+                        "x-ratelimit-current-active-projects": active_projects,
+                    }
+                )
+
+                return response
+            return await super().async_post_call_success_hook(
+                data=data,
+                user_api_key_dict=user_api_key_dict,
+                response=response,
+            )
+        except Exception as e:
+            verbose_proxy_logger.exception(
+                "litellm.proxy.hooks.dynamic_rate_limiter.py::async_post_call_success_hook(): Exception occured - {}".format(
+                    str(e)
+                )
+            )
+            return response