diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/custom_batch_logger.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/integrations/custom_batch_logger.py | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/custom_batch_logger.py b/.venv/lib/python3.12/site-packages/litellm/integrations/custom_batch_logger.py new file mode 100644 index 00000000..3cfdf82c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/integrations/custom_batch_logger.py @@ -0,0 +1,59 @@ +""" +Custom Logger that handles batching logic + +Use this if you want your logs to be stored in memory and flushed periodically. +""" + +import asyncio +import time +from typing import List, Optional + +import litellm +from litellm._logging import verbose_logger +from litellm.integrations.custom_logger import CustomLogger + + +class CustomBatchLogger(CustomLogger): + + def __init__( + self, + flush_lock: Optional[asyncio.Lock] = None, + batch_size: Optional[int] = None, + flush_interval: Optional[int] = None, + **kwargs, + ) -> None: + """ + Args: + flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching + """ + self.log_queue: List = [] + self.flush_interval = flush_interval or litellm.DEFAULT_FLUSH_INTERVAL_SECONDS + self.batch_size: int = batch_size or litellm.DEFAULT_BATCH_SIZE + self.last_flush_time = time.time() + self.flush_lock = flush_lock + + super().__init__(**kwargs) + + async def periodic_flush(self): + while True: + await asyncio.sleep(self.flush_interval) + verbose_logger.debug( + f"CustomLogger periodic flush after {self.flush_interval} seconds" + ) + await self.flush_queue() + + async def flush_queue(self): + if self.flush_lock is None: + return + + async with self.flush_lock: + if self.log_queue: + verbose_logger.debug( + "CustomLogger: Flushing batch of %s events", len(self.log_queue) + ) + await self.async_send_batch() + self.log_queue.clear() + self.last_flush_time = time.time() + + async def async_send_batch(self, *args, **kwargs): + pass |