diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/proxy/hooks/dynamic_rate_limiter.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/proxy/hooks/dynamic_rate_limiter.py | 298 |
1 files changed, 298 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/proxy/hooks/dynamic_rate_limiter.py b/.venv/lib/python3.12/site-packages/litellm/proxy/hooks/dynamic_rate_limiter.py new file mode 100644 index 00000000..15a9bc1b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/proxy/hooks/dynamic_rate_limiter.py @@ -0,0 +1,298 @@ +# What is this? +## Allocates dynamic tpm/rpm quota for a project based on current traffic +## Tracks num active projects per minute + +import asyncio +import os +from typing import List, Literal, Optional, Tuple, Union + +from fastapi import HTTPException + +import litellm +from litellm import ModelResponse, Router +from litellm._logging import verbose_proxy_logger +from litellm.caching.caching import DualCache +from litellm.integrations.custom_logger import CustomLogger +from litellm.proxy._types import UserAPIKeyAuth +from litellm.types.router import ModelGroupInfo +from litellm.utils import get_utc_datetime + + +class DynamicRateLimiterCache: + """ + Thin wrapper on DualCache for this file. + + Track number of active projects calling a model. + """ + + def __init__(self, cache: DualCache) -> None: + self.cache = cache + self.ttl = 60 # 1 min ttl + + async def async_get_cache(self, model: str) -> Optional[int]: + dt = get_utc_datetime() + current_minute = dt.strftime("%H-%M") + key_name = "{}:{}".format(current_minute, model) + _response = await self.cache.async_get_cache(key=key_name) + response: Optional[int] = None + if _response is not None: + response = len(_response) + return response + + async def async_set_cache_sadd(self, model: str, value: List): + """ + Add value to set. + + Parameters: + - model: str, the name of the model group + - value: str, the team id + + Returns: + - None + + Raises: + - Exception, if unable to connect to cache client (if redis caching enabled) + """ + try: + dt = get_utc_datetime() + current_minute = dt.strftime("%H-%M") + + key_name = "{}:{}".format(current_minute, model) + await self.cache.async_set_cache_sadd( + key=key_name, value=value, ttl=self.ttl + ) + except Exception as e: + verbose_proxy_logger.exception( + "litellm.proxy.hooks.dynamic_rate_limiter.py::async_set_cache_sadd(): Exception occured - {}".format( + str(e) + ) + ) + raise e + + +class _PROXY_DynamicRateLimitHandler(CustomLogger): + + # Class variables or attributes + def __init__(self, internal_usage_cache: DualCache): + self.internal_usage_cache = DynamicRateLimiterCache(cache=internal_usage_cache) + + def update_variables(self, llm_router: Router): + self.llm_router = llm_router + + async def check_available_usage( + self, model: str, priority: Optional[str] = None + ) -> Tuple[ + Optional[int], Optional[int], Optional[int], Optional[int], Optional[int] + ]: + """ + For a given model, get its available tpm + + Params: + - model: str, the name of the model in the router model_list + - priority: Optional[str], the priority for the request. + + Returns + - Tuple[available_tpm, available_tpm, model_tpm, model_rpm, active_projects] + - available_tpm: int or null - always 0 or positive. + - available_tpm: int or null - always 0 or positive. + - remaining_model_tpm: int or null. If available tpm is int, then this will be too. + - remaining_model_rpm: int or null. If available rpm is int, then this will be too. + - active_projects: int or null + """ + try: + weight: float = 1 + if ( + litellm.priority_reservation is None + or priority not in litellm.priority_reservation + ): + verbose_proxy_logger.error( + "Priority Reservation not set. priority={}, but litellm.priority_reservation is {}.".format( + priority, litellm.priority_reservation + ) + ) + elif priority is not None and litellm.priority_reservation is not None: + if os.getenv("LITELLM_LICENSE", None) is None: + verbose_proxy_logger.error( + "PREMIUM FEATURE: Reserving tpm/rpm by priority is a premium feature. Please add a 'LITELLM_LICENSE' to your .env to enable this.\nGet a license: https://docs.litellm.ai/docs/proxy/enterprise." + ) + else: + weight = litellm.priority_reservation[priority] + + active_projects = await self.internal_usage_cache.async_get_cache( + model=model + ) + current_model_tpm, current_model_rpm = ( + await self.llm_router.get_model_group_usage(model_group=model) + ) + model_group_info: Optional[ModelGroupInfo] = ( + self.llm_router.get_model_group_info(model_group=model) + ) + total_model_tpm: Optional[int] = None + total_model_rpm: Optional[int] = None + if model_group_info is not None: + if model_group_info.tpm is not None: + total_model_tpm = model_group_info.tpm + if model_group_info.rpm is not None: + total_model_rpm = model_group_info.rpm + + remaining_model_tpm: Optional[int] = None + if total_model_tpm is not None and current_model_tpm is not None: + remaining_model_tpm = total_model_tpm - current_model_tpm + elif total_model_tpm is not None: + remaining_model_tpm = total_model_tpm + + remaining_model_rpm: Optional[int] = None + if total_model_rpm is not None and current_model_rpm is not None: + remaining_model_rpm = total_model_rpm - current_model_rpm + elif total_model_rpm is not None: + remaining_model_rpm = total_model_rpm + + available_tpm: Optional[int] = None + + if remaining_model_tpm is not None: + if active_projects is not None: + available_tpm = int(remaining_model_tpm * weight / active_projects) + else: + available_tpm = int(remaining_model_tpm * weight) + + if available_tpm is not None and available_tpm < 0: + available_tpm = 0 + + available_rpm: Optional[int] = None + + if remaining_model_rpm is not None: + if active_projects is not None: + available_rpm = int(remaining_model_rpm * weight / active_projects) + else: + available_rpm = int(remaining_model_rpm * weight) + + if available_rpm is not None and available_rpm < 0: + available_rpm = 0 + return ( + available_tpm, + available_rpm, + remaining_model_tpm, + remaining_model_rpm, + active_projects, + ) + except Exception as e: + verbose_proxy_logger.exception( + "litellm.proxy.hooks.dynamic_rate_limiter.py::check_available_usage: Exception occurred - {}".format( + str(e) + ) + ) + return None, None, None, None, None + + async def async_pre_call_hook( + self, + user_api_key_dict: UserAPIKeyAuth, + cache: DualCache, + data: dict, + call_type: Literal[ + "completion", + "text_completion", + "embeddings", + "image_generation", + "moderation", + "audio_transcription", + "pass_through_endpoint", + "rerank", + ], + ) -> Optional[ + Union[Exception, str, dict] + ]: # raise exception if invalid, return a str for the user to receive - if rejected, or return a modified dictionary for passing into litellm + """ + - For a model group + - Check if tpm/rpm available + - Raise RateLimitError if no tpm/rpm available + """ + if "model" in data: + key_priority: Optional[str] = user_api_key_dict.metadata.get( + "priority", None + ) + available_tpm, available_rpm, model_tpm, model_rpm, active_projects = ( + await self.check_available_usage( + model=data["model"], priority=key_priority + ) + ) + ### CHECK TPM ### + if available_tpm is not None and available_tpm == 0: + raise HTTPException( + status_code=429, + detail={ + "error": "Key={} over available TPM={}. Model TPM={}, Active keys={}".format( + user_api_key_dict.api_key, + available_tpm, + model_tpm, + active_projects, + ) + }, + ) + ### CHECK RPM ### + elif available_rpm is not None and available_rpm == 0: + raise HTTPException( + status_code=429, + detail={ + "error": "Key={} over available RPM={}. Model RPM={}, Active keys={}".format( + user_api_key_dict.api_key, + available_rpm, + model_rpm, + active_projects, + ) + }, + ) + elif available_rpm is not None or available_tpm is not None: + ## UPDATE CACHE WITH ACTIVE PROJECT + asyncio.create_task( + self.internal_usage_cache.async_set_cache_sadd( # this is a set + model=data["model"], # type: ignore + value=[user_api_key_dict.token or "default_key"], + ) + ) + return None + + async def async_post_call_success_hook( + self, data: dict, user_api_key_dict: UserAPIKeyAuth, response + ): + try: + if isinstance(response, ModelResponse): + model_info = self.llm_router.get_model_info( + id=response._hidden_params["model_id"] + ) + assert ( + model_info is not None + ), "Model info for model with id={} is None".format( + response._hidden_params["model_id"] + ) + key_priority: Optional[str] = user_api_key_dict.metadata.get( + "priority", None + ) + available_tpm, available_rpm, model_tpm, model_rpm, active_projects = ( + await self.check_available_usage( + model=model_info["model_name"], priority=key_priority + ) + ) + response._hidden_params["additional_headers"] = ( + { # Add additional response headers - easier debugging + "x-litellm-model_group": model_info["model_name"], + "x-ratelimit-remaining-litellm-project-tokens": available_tpm, + "x-ratelimit-remaining-litellm-project-requests": available_rpm, + "x-ratelimit-remaining-model-tokens": model_tpm, + "x-ratelimit-remaining-model-requests": model_rpm, + "x-ratelimit-current-active-projects": active_projects, + } + ) + + return response + return await super().async_post_call_success_hook( + data=data, + user_api_key_dict=user_api_key_dict, + response=response, + ) + except Exception as e: + verbose_proxy_logger.exception( + "litellm.proxy.hooks.dynamic_rate_limiter.py::async_post_call_success_hook(): Exception occured - {}".format( + str(e) + ) + ) + return response |