diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/scheduler.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/scheduler.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/scheduler.py | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/scheduler.py b/.venv/lib/python3.12/site-packages/litellm/scheduler.py new file mode 100644 index 00000000..23346e98 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/scheduler.py @@ -0,0 +1,137 @@ +import enum +import heapq +from typing import Optional + +from pydantic import BaseModel + +from litellm import print_verbose +from litellm.caching.caching import DualCache, RedisCache + + +class SchedulerCacheKeys(enum.Enum): + queue = "scheduler:queue" + default_in_memory_ttl = 5 # cache queue in-memory for 5s when redis cache available + + +class DefaultPriorities(enum.Enum): + High = 0 + Medium = 128 + Low = 255 + + +class FlowItem(BaseModel): + priority: int # Priority between 0 and 255 + request_id: str + model_name: str + + +class Scheduler: + cache: DualCache + + def __init__( + self, + polling_interval: Optional[float] = None, + redis_cache: Optional[RedisCache] = None, + ): + """ + polling_interval: float or null - frequency of polling queue. Default is 3ms. + """ + self.queue: list = [] + default_in_memory_ttl: Optional[float] = None + if redis_cache is not None: + # if redis-cache available frequently poll that instead of using in-memory. + default_in_memory_ttl = SchedulerCacheKeys.default_in_memory_ttl.value + self.cache = DualCache( + redis_cache=redis_cache, default_in_memory_ttl=default_in_memory_ttl + ) + self.polling_interval = polling_interval or 0.03 # default to 3ms + + async def add_request(self, request: FlowItem): + # We use the priority directly, as lower values indicate higher priority + # get the queue + queue = await self.get_queue(model_name=request.model_name) + # update the queue + heapq.heappush(queue, (request.priority, request.request_id)) + + # save the queue + await self.save_queue(queue=queue, model_name=request.model_name) + + async def poll(self, id: str, model_name: str, health_deployments: list) -> bool: + """ + Return if request can be processed. + + Returns: + - True: + * If healthy deployments are available + * OR If request at the top of queue + - False: + * If no healthy deployments available + * AND request not at the top of queue + """ + queue = await self.get_queue(model_name=model_name) + if not queue: + raise Exception( + "Incorrectly setup. Queue is invalid. Queue={}".format(queue) + ) + + # ------------ + # Setup values + # ------------ + + print_verbose(f"len(health_deployments): {len(health_deployments)}") + if len(health_deployments) == 0: + print_verbose(f"queue: {queue}, seeking id={id}") + # Check if the id is at the top of the heap + if queue[0][1] == id: + # Remove the item from the queue + heapq.heappop(queue) + print_verbose(f"Popped id: {id}") + return True + else: + return False + + return True + + async def peek(self, id: str, model_name: str, health_deployments: list) -> bool: + """Return if the id is at the top of the queue. Don't pop the value from heap.""" + queue = await self.get_queue(model_name=model_name) + if not queue: + raise Exception( + "Incorrectly setup. Queue is invalid. Queue={}".format(queue) + ) + + # ------------ + # Setup values + # ------------ + + # Check if the id is at the top of the heap + if queue[0][1] == id: + return True + + return False + + def get_queue_status(self): + """Get the status of items in the queue""" + return self.queue + + async def get_queue(self, model_name: str) -> list: + """ + Return a queue for that specific model group + """ + if self.cache is not None: + _cache_key = "{}:{}".format(SchedulerCacheKeys.queue.value, model_name) + response = await self.cache.async_get_cache(key=_cache_key) + if response is None or not isinstance(response, list): + return [] + elif isinstance(response, list): + return response + return self.queue + + async def save_queue(self, queue: list, model_name: str) -> None: + """ + Save the updated queue of the model group + """ + if self.cache is not None: + _cache_key = "{}:{}".format(SchedulerCacheKeys.queue.value, model_name) + await self.cache.async_set_cache(key=_cache_key, value=queue) + return None |