aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/custom_batch_logger.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/custom_batch_logger.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/custom_batch_logger.py59
1 files changed, 59 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/custom_batch_logger.py b/.venv/lib/python3.12/site-packages/litellm/integrations/custom_batch_logger.py
new file mode 100644
index 00000000..3cfdf82c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/custom_batch_logger.py
@@ -0,0 +1,59 @@
+"""
+Custom Logger that handles batching logic
+
+Use this if you want your logs to be stored in memory and flushed periodically.
+"""
+
+import asyncio
+import time
+from typing import List, Optional
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.integrations.custom_logger import CustomLogger
+
+
+class CustomBatchLogger(CustomLogger):
+
+ def __init__(
+ self,
+ flush_lock: Optional[asyncio.Lock] = None,
+ batch_size: Optional[int] = None,
+ flush_interval: Optional[int] = None,
+ **kwargs,
+ ) -> None:
+ """
+ Args:
+ flush_lock (Optional[asyncio.Lock], optional): Lock to use when flushing the queue. Defaults to None. Only used for custom loggers that do batching
+ """
+ self.log_queue: List = []
+ self.flush_interval = flush_interval or litellm.DEFAULT_FLUSH_INTERVAL_SECONDS
+ self.batch_size: int = batch_size or litellm.DEFAULT_BATCH_SIZE
+ self.last_flush_time = time.time()
+ self.flush_lock = flush_lock
+
+ super().__init__(**kwargs)
+
+ async def periodic_flush(self):
+ while True:
+ await asyncio.sleep(self.flush_interval)
+ verbose_logger.debug(
+ f"CustomLogger periodic flush after {self.flush_interval} seconds"
+ )
+ await self.flush_queue()
+
+ async def flush_queue(self):
+ if self.flush_lock is None:
+ return
+
+ async with self.flush_lock:
+ if self.log_queue:
+ verbose_logger.debug(
+ "CustomLogger: Flushing batch of %s events", len(self.log_queue)
+ )
+ await self.async_send_batch()
+ self.log_queue.clear()
+ self.last_flush_time = time.time()
+
+ async def async_send_batch(self, *args, **kwargs):
+ pass