about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/Readme.md13
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/batching_handler.py82
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/slack_alerting.py1822
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/utils.py92
4 files changed, 2009 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/Readme.md b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/Readme.md
new file mode 100644
index 00000000..f28f7150
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/Readme.md
@@ -0,0 +1,13 @@
+# Slack Alerting on LiteLLM Gateway 
+
+This folder contains the Slack Alerting integration for LiteLLM Gateway. 
+
+## Folder Structure 
+
+- `slack_alerting.py`: This is the main file that handles sending different types of alerts
+- `batching_handler.py`: Handles Batching + sending Httpx Post requests to slack. Slack alerts are sent every 10s or when events are greater than X events. Done to ensure litellm has good performance under high traffic
+- `types.py`: This file contains the AlertType enum which is used to define the different types of alerts that can be sent to Slack.
+- `utils.py`: This file contains common utils used specifically for slack alerting
+
+## Further Reading
+- [Doc setting up Alerting on LiteLLM Proxy (Gateway)](https://docs.litellm.ai/docs/proxy/alerting)
\ No newline at end of file
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/batching_handler.py b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/batching_handler.py
new file mode 100644
index 00000000..e35cf61d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/batching_handler.py
@@ -0,0 +1,82 @@
+"""
+Handles Batching + sending Httpx Post requests to slack 
+
+Slack alerts are sent every 10s or when events are greater than X events 
+
+see custom_batch_logger.py for more details / defaults 
+"""
+
+from typing import TYPE_CHECKING, Any
+
+from litellm._logging import verbose_proxy_logger
+
+if TYPE_CHECKING:
+    from .slack_alerting import SlackAlerting as _SlackAlerting
+
+    SlackAlertingType = _SlackAlerting
+else:
+    SlackAlertingType = Any
+
+
+def squash_payloads(queue):
+
+    squashed = {}
+    if len(queue) == 0:
+        return squashed
+    if len(queue) == 1:
+        return {"key": {"item": queue[0], "count": 1}}
+
+    for item in queue:
+        url = item["url"]
+        alert_type = item["alert_type"]
+        _key = (url, alert_type)
+
+        if _key in squashed:
+            squashed[_key]["count"] += 1
+            # Merge the payloads
+
+        else:
+            squashed[_key] = {"item": item, "count": 1}
+
+    return squashed
+
+
+def _print_alerting_payload_warning(
+    payload: dict, slackAlertingInstance: SlackAlertingType
+):
+    """
+    Print the payload to the console when
+    slackAlertingInstance.alerting_args.log_to_console is True
+
+    Relevant issue: https://github.com/BerriAI/litellm/issues/7372
+    """
+    if slackAlertingInstance.alerting_args.log_to_console is True:
+        verbose_proxy_logger.warning(payload)
+
+
+async def send_to_webhook(slackAlertingInstance: SlackAlertingType, item, count):
+    """
+    Send a single slack alert to the webhook
+    """
+    import json
+
+    payload = item.get("payload", {})
+    try:
+        if count > 1:
+            payload["text"] = f"[Num Alerts: {count}]\n\n{payload['text']}"
+
+        response = await slackAlertingInstance.async_http_handler.post(
+            url=item["url"],
+            headers=item["headers"],
+            data=json.dumps(payload),
+        )
+        if response.status_code != 200:
+            verbose_proxy_logger.debug(
+                f"Error sending slack alert to url={item['url']}. Error={response.text}"
+            )
+    except Exception as e:
+        verbose_proxy_logger.debug(f"Error sending slack alert: {str(e)}")
+    finally:
+        _print_alerting_payload_warning(
+            payload, slackAlertingInstance=slackAlertingInstance
+        )
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/slack_alerting.py b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/slack_alerting.py
new file mode 100644
index 00000000..a2e62647
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/slack_alerting.py
@@ -0,0 +1,1822 @@
+#### What this does ####
+#    Class for sending Slack Alerts #
+import asyncio
+import datetime
+import os
+import random
+import time
+from datetime import timedelta
+from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union
+
+from openai import APIError
+
+import litellm
+import litellm.litellm_core_utils
+import litellm.litellm_core_utils.litellm_logging
+import litellm.types
+from litellm._logging import verbose_logger, verbose_proxy_logger
+from litellm.caching.caching import DualCache
+from litellm.integrations.custom_batch_logger import CustomBatchLogger
+from litellm.litellm_core_utils.duration_parser import duration_in_seconds
+from litellm.litellm_core_utils.exception_mapping_utils import (
+    _add_key_name_and_team_to_alert,
+)
+from litellm.llms.custom_httpx.http_handler import (
+    get_async_httpx_client,
+    httpxSpecialProvider,
+)
+from litellm.proxy._types import AlertType, CallInfo, VirtualKeyEvent, WebhookEvent
+from litellm.types.integrations.slack_alerting import *
+
+from ..email_templates.templates import *
+from .batching_handler import send_to_webhook, squash_payloads
+from .utils import _add_langfuse_trace_id_to_alert, process_slack_alerting_variables
+
+if TYPE_CHECKING:
+    from litellm.router import Router as _Router
+
+    Router = _Router
+else:
+    Router = Any
+
+
+class SlackAlerting(CustomBatchLogger):
+    """
+    Class for sending Slack Alerts
+    """
+
+    # Class variables or attributes
+    def __init__(
+        self,
+        internal_usage_cache: Optional[DualCache] = None,
+        alerting_threshold: Optional[
+            float
+        ] = None,  # threshold for slow / hanging llm responses (in seconds)
+        alerting: Optional[List] = [],
+        alert_types: List[AlertType] = DEFAULT_ALERT_TYPES,
+        alert_to_webhook_url: Optional[
+            Dict[AlertType, Union[List[str], str]]
+        ] = None,  # if user wants to separate alerts to diff channels
+        alerting_args={},
+        default_webhook_url: Optional[str] = None,
+        **kwargs,
+    ):
+        if alerting_threshold is None:
+            alerting_threshold = 300
+        self.alerting_threshold = alerting_threshold
+        self.alerting = alerting
+        self.alert_types = alert_types
+        self.internal_usage_cache = internal_usage_cache or DualCache()
+        self.async_http_handler = get_async_httpx_client(
+            llm_provider=httpxSpecialProvider.LoggingCallback
+        )
+        self.alert_to_webhook_url = process_slack_alerting_variables(
+            alert_to_webhook_url=alert_to_webhook_url
+        )
+        self.is_running = False
+        self.alerting_args = SlackAlertingArgs(**alerting_args)
+        self.default_webhook_url = default_webhook_url
+        self.flush_lock = asyncio.Lock()
+        super().__init__(**kwargs, flush_lock=self.flush_lock)
+
+    def update_values(
+        self,
+        alerting: Optional[List] = None,
+        alerting_threshold: Optional[float] = None,
+        alert_types: Optional[List[AlertType]] = None,
+        alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]] = None,
+        alerting_args: Optional[Dict] = None,
+        llm_router: Optional[Router] = None,
+    ):
+        if alerting is not None:
+            self.alerting = alerting
+            asyncio.create_task(self.periodic_flush())
+        if alerting_threshold is not None:
+            self.alerting_threshold = alerting_threshold
+        if alert_types is not None:
+            self.alert_types = alert_types
+        if alerting_args is not None:
+            self.alerting_args = SlackAlertingArgs(**alerting_args)
+        if alert_to_webhook_url is not None:
+            # update the dict
+            if self.alert_to_webhook_url is None:
+                self.alert_to_webhook_url = process_slack_alerting_variables(
+                    alert_to_webhook_url=alert_to_webhook_url
+                )
+            else:
+                _new_values = (
+                    process_slack_alerting_variables(
+                        alert_to_webhook_url=alert_to_webhook_url
+                    )
+                    or {}
+                )
+                self.alert_to_webhook_url.update(_new_values)
+        if llm_router is not None:
+            self.llm_router = llm_router
+
+    async def deployment_in_cooldown(self):
+        pass
+
+    async def deployment_removed_from_cooldown(self):
+        pass
+
+    def _all_possible_alert_types(self):
+        # used by the UI to show all supported alert types
+        # Note: This is not the alerts the user has configured, instead it's all possible alert types a user can select
+        # return list of all values AlertType enum
+        return list(AlertType)
+
+    def _response_taking_too_long_callback_helper(
+        self,
+        kwargs,  # kwargs to completion
+        start_time,
+        end_time,  # start/end time
+    ):
+        try:
+            time_difference = end_time - start_time
+            # Convert the timedelta to float (in seconds)
+            time_difference_float = time_difference.total_seconds()
+            litellm_params = kwargs.get("litellm_params", {})
+            model = kwargs.get("model", "")
+            api_base = litellm.get_api_base(model=model, optional_params=litellm_params)
+            messages = kwargs.get("messages", None)
+            # if messages does not exist fallback to "input"
+            if messages is None:
+                messages = kwargs.get("input", None)
+
+            # only use first 100 chars for alerting
+            _messages = str(messages)[:100]
+
+            return time_difference_float, model, api_base, _messages
+        except Exception as e:
+            raise e
+
+    def _get_deployment_latencies_to_alert(self, metadata=None):
+        if metadata is None:
+            return None
+
+        if "_latency_per_deployment" in metadata:
+            # Translate model_id to -> api_base
+            # _latency_per_deployment is a dictionary that looks like this:
+            """
+            _latency_per_deployment: {
+                api_base: 0.01336697916666667
+            }
+            """
+            _message_to_send = ""
+            _deployment_latencies = metadata["_latency_per_deployment"]
+            if len(_deployment_latencies) == 0:
+                return None
+            _deployment_latency_map: Optional[dict] = None
+            try:
+                # try sorting deployments by latency
+                _deployment_latencies = sorted(
+                    _deployment_latencies.items(), key=lambda x: x[1]
+                )
+                _deployment_latency_map = dict(_deployment_latencies)
+            except Exception:
+                pass
+
+            if _deployment_latency_map is None:
+                return
+
+            for api_base, latency in _deployment_latency_map.items():
+                _message_to_send += f"\n{api_base}: {round(latency,2)}s"
+            _message_to_send = "```" + _message_to_send + "```"
+            return _message_to_send
+
+    async def response_taking_too_long_callback(
+        self,
+        kwargs,  # kwargs to completion
+        completion_response,  # response from completion
+        start_time,
+        end_time,  # start/end time
+    ):
+        if self.alerting is None or self.alert_types is None:
+            return
+
+        time_difference_float, model, api_base, messages = (
+            self._response_taking_too_long_callback_helper(
+                kwargs=kwargs,
+                start_time=start_time,
+                end_time=end_time,
+            )
+        )
+        if litellm.turn_off_message_logging or litellm.redact_messages_in_exceptions:
+            messages = "Message not logged. litellm.redact_messages_in_exceptions=True"
+        request_info = f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`"
+        slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {self.alerting_threshold}s`"
+        alerting_metadata: dict = {}
+        if time_difference_float > self.alerting_threshold:
+            # add deployment latencies to alert
+            if (
+                kwargs is not None
+                and "litellm_params" in kwargs
+                and "metadata" in kwargs["litellm_params"]
+            ):
+                _metadata: dict = kwargs["litellm_params"]["metadata"]
+                request_info = _add_key_name_and_team_to_alert(
+                    request_info=request_info, metadata=_metadata
+                )
+
+                _deployment_latency_map = self._get_deployment_latencies_to_alert(
+                    metadata=_metadata
+                )
+                if _deployment_latency_map is not None:
+                    request_info += (
+                        f"\nAvailable Deployment Latencies\n{_deployment_latency_map}"
+                    )
+
+                if "alerting_metadata" in _metadata:
+                    alerting_metadata = _metadata["alerting_metadata"]
+            await self.send_alert(
+                message=slow_message + request_info,
+                level="Low",
+                alert_type=AlertType.llm_too_slow,
+                alerting_metadata=alerting_metadata,
+            )
+
+    async def async_update_daily_reports(
+        self, deployment_metrics: DeploymentMetrics
+    ) -> int:
+        """
+        Store the perf by deployment in cache
+        - Number of failed requests per deployment
+        - Latency / output tokens per deployment
+
+        'deployment_id:daily_metrics:failed_requests'
+        'deployment_id:daily_metrics:latency_per_output_token'
+
+        Returns
+            int - count of metrics set (1 - if just latency, 2 - if failed + latency)
+        """
+
+        return_val = 0
+        try:
+            ## FAILED REQUESTS ##
+            if deployment_metrics.failed_request:
+                await self.internal_usage_cache.async_increment_cache(
+                    key="{}:{}".format(
+                        deployment_metrics.id,
+                        SlackAlertingCacheKeys.failed_requests_key.value,
+                    ),
+                    value=1,
+                    parent_otel_span=None,  # no attached request, this is a background operation
+                )
+
+                return_val += 1
+
+            ## LATENCY ##
+            if deployment_metrics.latency_per_output_token is not None:
+                await self.internal_usage_cache.async_increment_cache(
+                    key="{}:{}".format(
+                        deployment_metrics.id, SlackAlertingCacheKeys.latency_key.value
+                    ),
+                    value=deployment_metrics.latency_per_output_token,
+                    parent_otel_span=None,  # no attached request, this is a background operation
+                )
+
+                return_val += 1
+
+            return return_val
+        except Exception:
+            return 0
+
+    async def send_daily_reports(self, router) -> bool:  # noqa: PLR0915
+        """
+        Send a daily report on:
+        - Top 5 deployments with most failed requests
+        - Top 5 slowest deployments (normalized by latency/output tokens)
+
+        Get the value from redis cache (if available) or in-memory and send it
+
+        Cleanup:
+        - reset values in cache -> prevent memory leak
+
+        Returns:
+            True -> if successfuly sent
+            False -> if not sent
+        """
+
+        ids = router.get_model_ids()
+
+        # get keys
+        failed_request_keys = [
+            "{}:{}".format(id, SlackAlertingCacheKeys.failed_requests_key.value)
+            for id in ids
+        ]
+        latency_keys = [
+            "{}:{}".format(id, SlackAlertingCacheKeys.latency_key.value) for id in ids
+        ]
+
+        combined_metrics_keys = failed_request_keys + latency_keys  # reduce cache calls
+
+        combined_metrics_values = await self.internal_usage_cache.async_batch_get_cache(
+            keys=combined_metrics_keys
+        )  # [1, 2, None, ..]
+
+        if combined_metrics_values is None:
+            return False
+
+        all_none = True
+        for val in combined_metrics_values:
+            if val is not None and val > 0:
+                all_none = False
+                break
+
+        if all_none:
+            return False
+
+        failed_request_values = combined_metrics_values[
+            : len(failed_request_keys)
+        ]  # # [1, 2, None, ..]
+        latency_values = combined_metrics_values[len(failed_request_keys) :]
+
+        # find top 5 failed
+        ## Replace None values with a placeholder value (-1 in this case)
+        placeholder_value = 0
+        replaced_failed_values = [
+            value if value is not None else placeholder_value
+            for value in failed_request_values
+        ]
+
+        ## Get the indices of top 5 keys with the highest numerical values (ignoring None and 0 values)
+        top_5_failed = sorted(
+            range(len(replaced_failed_values)),
+            key=lambda i: replaced_failed_values[i],
+            reverse=True,
+        )[:5]
+        top_5_failed = [
+            index for index in top_5_failed if replaced_failed_values[index] > 0
+        ]
+
+        # find top 5 slowest
+        # Replace None values with a placeholder value (-1 in this case)
+        placeholder_value = 0
+        replaced_slowest_values = [
+            value if value is not None else placeholder_value
+            for value in latency_values
+        ]
+
+        # Get the indices of top 5 values with the highest numerical values (ignoring None and 0 values)
+        top_5_slowest = sorted(
+            range(len(replaced_slowest_values)),
+            key=lambda i: replaced_slowest_values[i],
+            reverse=True,
+        )[:5]
+        top_5_slowest = [
+            index for index in top_5_slowest if replaced_slowest_values[index] > 0
+        ]
+
+        # format alert -> return the litellm model name + api base
+        message = f"\n\nTime: `{time.time()}`s\nHere are today's key metrics šŸ“ˆ: \n\n"
+
+        message += "\n\n*ā—ļø Top Deployments with Most Failed Requests:*\n\n"
+        if not top_5_failed:
+            message += "\tNone\n"
+        for i in range(len(top_5_failed)):
+            key = failed_request_keys[top_5_failed[i]].split(":")[0]
+            _deployment = router.get_model_info(key)
+            if isinstance(_deployment, dict):
+                deployment_name = _deployment["litellm_params"].get("model", "")
+            else:
+                return False
+
+            api_base = litellm.get_api_base(
+                model=deployment_name,
+                optional_params=(
+                    _deployment["litellm_params"] if _deployment is not None else {}
+                ),
+            )
+            if api_base is None:
+                api_base = ""
+            value = replaced_failed_values[top_5_failed[i]]
+            message += f"\t{i+1}. Deployment: `{deployment_name}`, Failed Requests: `{value}`,  API Base: `{api_base}`\n"
+
+        message += "\n\n*šŸ˜… Top Slowest Deployments:*\n\n"
+        if not top_5_slowest:
+            message += "\tNone\n"
+        for i in range(len(top_5_slowest)):
+            key = latency_keys[top_5_slowest[i]].split(":")[0]
+            _deployment = router.get_model_info(key)
+            if _deployment is not None:
+                deployment_name = _deployment["litellm_params"].get("model", "")
+            else:
+                deployment_name = ""
+            api_base = litellm.get_api_base(
+                model=deployment_name,
+                optional_params=(
+                    _deployment["litellm_params"] if _deployment is not None else {}
+                ),
+            )
+            value = round(replaced_slowest_values[top_5_slowest[i]], 3)
+            message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency per output token: `{value}s/token`,  API Base: `{api_base}`\n\n"
+
+        # cache cleanup -> reset values to 0
+        latency_cache_keys = [(key, 0) for key in latency_keys]
+        failed_request_cache_keys = [(key, 0) for key in failed_request_keys]
+        combined_metrics_cache_keys = latency_cache_keys + failed_request_cache_keys
+        await self.internal_usage_cache.async_set_cache_pipeline(
+            cache_list=combined_metrics_cache_keys
+        )
+
+        message += f"\n\nNext Run is at: `{time.time() + self.alerting_args.daily_report_frequency}`s"
+
+        # send alert
+        await self.send_alert(
+            message=message,
+            level="Low",
+            alert_type=AlertType.daily_reports,
+            alerting_metadata={},
+        )
+
+        return True
+
+    async def response_taking_too_long(
+        self,
+        start_time: Optional[datetime.datetime] = None,
+        end_time: Optional[datetime.datetime] = None,
+        type: Literal["hanging_request", "slow_response"] = "hanging_request",
+        request_data: Optional[dict] = None,
+    ):
+        if self.alerting is None or self.alert_types is None:
+            return
+        model: str = ""
+        if request_data is not None:
+            model = request_data.get("model", "")
+            messages = request_data.get("messages", None)
+            if messages is None:
+                # if messages does not exist fallback to "input"
+                messages = request_data.get("input", None)
+
+            # try casting messages to str and get the first 100 characters, else mark as None
+            try:
+                messages = str(messages)
+                messages = messages[:100]
+            except Exception:
+                messages = ""
+
+            if (
+                litellm.turn_off_message_logging
+                or litellm.redact_messages_in_exceptions
+            ):
+                messages = (
+                    "Message not logged. litellm.redact_messages_in_exceptions=True"
+                )
+            request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`"
+        else:
+            request_info = ""
+
+        if type == "hanging_request":
+            await asyncio.sleep(
+                self.alerting_threshold
+            )  # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests
+            alerting_metadata: dict = {}
+            if await self._request_is_completed(request_data=request_data) is True:
+                return
+
+            if request_data is not None:
+                if request_data.get("deployment", None) is not None and isinstance(
+                    request_data["deployment"], dict
+                ):
+                    _api_base = litellm.get_api_base(
+                        model=model,
+                        optional_params=request_data["deployment"].get(
+                            "litellm_params", {}
+                        ),
+                    )
+
+                    if _api_base is None:
+                        _api_base = ""
+
+                    request_info += f"\nAPI Base: {_api_base}"
+                elif request_data.get("metadata", None) is not None and isinstance(
+                    request_data["metadata"], dict
+                ):
+                    # In hanging requests sometime it has not made it to the point where the deployment is passed to the `request_data``
+                    # in that case we fallback to the api base set in the request metadata
+                    _metadata: dict = request_data["metadata"]
+                    _api_base = _metadata.get("api_base", "")
+
+                    request_info = _add_key_name_and_team_to_alert(
+                        request_info=request_info, metadata=_metadata
+                    )
+
+                    if _api_base is None:
+                        _api_base = ""
+
+                    if "alerting_metadata" in _metadata:
+                        alerting_metadata = _metadata["alerting_metadata"]
+                    request_info += f"\nAPI Base: `{_api_base}`"
+                # only alert hanging responses if they have not been marked as success
+                alerting_message = (
+                    f"`Requests are hanging - {self.alerting_threshold}s+ request time`"
+                )
+
+                if "langfuse" in litellm.success_callback:
+                    langfuse_url = await _add_langfuse_trace_id_to_alert(
+                        request_data=request_data,
+                    )
+
+                    if langfuse_url is not None:
+                        request_info += "\n🪢 Langfuse Trace: {}".format(langfuse_url)
+
+                # add deployment latencies to alert
+                _deployment_latency_map = self._get_deployment_latencies_to_alert(
+                    metadata=request_data.get("metadata", {})
+                )
+                if _deployment_latency_map is not None:
+                    request_info += f"\nDeployment Latencies\n{_deployment_latency_map}"
+
+                await self.send_alert(
+                    message=alerting_message + request_info,
+                    level="Medium",
+                    alert_type=AlertType.llm_requests_hanging,
+                    alerting_metadata=alerting_metadata,
+                )
+
+    async def failed_tracking_alert(self, error_message: str, failing_model: str):
+        """
+        Raise alert when tracking failed for specific model
+
+        Args:
+            error_message (str): Error message
+            failing_model (str): Model that failed tracking
+        """
+        if self.alerting is None or self.alert_types is None:
+            # do nothing if alerting is not switched on
+            return
+        if "failed_tracking_spend" not in self.alert_types:
+            return
+
+        _cache: DualCache = self.internal_usage_cache
+        message = "Failed Tracking Cost for " + error_message
+        _cache_key = "budget_alerts:failed_tracking:{}".format(failing_model)
+        result = await _cache.async_get_cache(key=_cache_key)
+        if result is None:
+            await self.send_alert(
+                message=message,
+                level="High",
+                alert_type=AlertType.failed_tracking_spend,
+                alerting_metadata={},
+            )
+            await _cache.async_set_cache(
+                key=_cache_key,
+                value="SENT",
+                ttl=self.alerting_args.budget_alert_ttl,
+            )
+
+    async def budget_alerts(  # noqa: PLR0915
+        self,
+        type: Literal[
+            "token_budget",
+            "soft_budget",
+            "user_budget",
+            "team_budget",
+            "proxy_budget",
+            "projected_limit_exceeded",
+        ],
+        user_info: CallInfo,
+    ):
+        ## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727
+        # - Alert once within 24hr period
+        # - Cache this information
+        # - Don't re-alert, if alert already sent
+        _cache: DualCache = self.internal_usage_cache
+
+        if self.alerting is None or self.alert_types is None:
+            # do nothing if alerting is not switched on
+            return
+        if "budget_alerts" not in self.alert_types:
+            return
+        _id: Optional[str] = "default_id"  # used for caching
+        user_info_json = user_info.model_dump(exclude_none=True)
+        user_info_str = self._get_user_info_str(user_info)
+        event: Optional[
+            Literal[
+                "budget_crossed",
+                "threshold_crossed",
+                "projected_limit_exceeded",
+                "soft_budget_crossed",
+            ]
+        ] = None
+        event_group: Optional[
+            Literal["internal_user", "team", "key", "proxy", "customer"]
+        ] = None
+        event_message: str = ""
+        webhook_event: Optional[WebhookEvent] = None
+        if type == "proxy_budget":
+            event_group = "proxy"
+            event_message += "Proxy Budget: "
+        elif type == "soft_budget":
+            event_group = "proxy"
+            event_message += "Soft Budget Crossed: "
+        elif type == "user_budget":
+            event_group = "internal_user"
+            event_message += "User Budget: "
+            _id = user_info.user_id or _id
+        elif type == "team_budget":
+            event_group = "team"
+            event_message += "Team Budget: "
+            _id = user_info.team_id or _id
+        elif type == "token_budget":
+            event_group = "key"
+            event_message += "Key Budget: "
+            _id = user_info.token
+        elif type == "projected_limit_exceeded":
+            event_group = "key"
+            event_message += "Key Budget: Projected Limit Exceeded"
+            event = "projected_limit_exceeded"
+            _id = user_info.token
+
+        # percent of max_budget left to spend
+        if user_info.max_budget is None and user_info.soft_budget is None:
+            return
+        percent_left: float = 0
+        if user_info.max_budget is not None:
+            if user_info.max_budget > 0:
+                percent_left = (
+                    user_info.max_budget - user_info.spend
+                ) / user_info.max_budget
+
+        # check if crossed budget
+        if user_info.max_budget is not None:
+            if user_info.spend >= user_info.max_budget:
+                event = "budget_crossed"
+                event_message += (
+                    f"Budget Crossed\n Total Budget:`{user_info.max_budget}`"
+                )
+            elif percent_left <= 0.05:
+                event = "threshold_crossed"
+                event_message += "5% Threshold Crossed "
+            elif percent_left <= 0.15:
+                event = "threshold_crossed"
+                event_message += "15% Threshold Crossed"
+        elif user_info.soft_budget is not None:
+            if user_info.spend >= user_info.soft_budget:
+                event = "soft_budget_crossed"
+        if event is not None and event_group is not None:
+            _cache_key = "budget_alerts:{}:{}".format(event, _id)
+            result = await _cache.async_get_cache(key=_cache_key)
+            if result is None:
+                webhook_event = WebhookEvent(
+                    event=event,
+                    event_group=event_group,
+                    event_message=event_message,
+                    **user_info_json,
+                )
+                await self.send_alert(
+                    message=event_message + "\n\n" + user_info_str,
+                    level="High",
+                    alert_type=AlertType.budget_alerts,
+                    user_info=webhook_event,
+                    alerting_metadata={},
+                )
+                await _cache.async_set_cache(
+                    key=_cache_key,
+                    value="SENT",
+                    ttl=self.alerting_args.budget_alert_ttl,
+                )
+
+            return
+        return
+
+    def _get_user_info_str(self, user_info: CallInfo) -> str:
+        """
+        Create a standard message for a budget alert
+        """
+        _all_fields_as_dict = user_info.model_dump(exclude_none=True)
+        _all_fields_as_dict.pop("token")
+        msg = ""
+        for k, v in _all_fields_as_dict.items():
+            msg += f"*{k}:* `{v}`\n"
+
+        return msg
+
+    async def customer_spend_alert(
+        self,
+        token: Optional[str],
+        key_alias: Optional[str],
+        end_user_id: Optional[str],
+        response_cost: Optional[float],
+        max_budget: Optional[float],
+    ):
+        if (
+            self.alerting is not None
+            and "webhook" in self.alerting
+            and end_user_id is not None
+            and token is not None
+            and response_cost is not None
+        ):
+            # log customer spend
+            event = WebhookEvent(
+                spend=response_cost,
+                max_budget=max_budget,
+                token=token,
+                customer_id=end_user_id,
+                user_id=None,
+                team_id=None,
+                user_email=None,
+                key_alias=key_alias,
+                projected_exceeded_date=None,
+                projected_spend=None,
+                event="spend_tracked",
+                event_group="customer",
+                event_message="Customer spend tracked. Customer={}, spend={}".format(
+                    end_user_id, response_cost
+                ),
+            )
+
+            await self.send_webhook_alert(webhook_event=event)
+
+    def _count_outage_alerts(self, alerts: List[int]) -> str:
+        """
+        Parameters:
+        - alerts: List[int] -> list of error codes (either 408 or 500+)
+
+        Returns:
+        - str -> formatted string. This is an alert message, giving a human-friendly description of the errors.
+        """
+        error_breakdown = {"Timeout Errors": 0, "API Errors": 0, "Unknown Errors": 0}
+        for alert in alerts:
+            if alert == 408:
+                error_breakdown["Timeout Errors"] += 1
+            elif alert >= 500:
+                error_breakdown["API Errors"] += 1
+            else:
+                error_breakdown["Unknown Errors"] += 1
+
+        error_msg = ""
+        for key, value in error_breakdown.items():
+            if value > 0:
+                error_msg += "\n{}: {}\n".format(key, value)
+
+        return error_msg
+
+    def _outage_alert_msg_factory(
+        self,
+        alert_type: Literal["Major", "Minor"],
+        key: Literal["Model", "Region"],
+        key_val: str,
+        provider: str,
+        api_base: Optional[str],
+        outage_value: BaseOutageModel,
+    ) -> str:
+        """Format an alert message for slack"""
+        headers = {f"{key} Name": key_val, "Provider": provider}
+        if api_base is not None:
+            headers["API Base"] = api_base  # type: ignore
+
+        headers_str = "\n"
+        for k, v in headers.items():
+            headers_str += f"*{k}:* `{v}`\n"
+        return f"""\n\n
+*āš ļø {alert_type} Service Outage*
+
+{headers_str}
+
+*Errors:*
+{self._count_outage_alerts(alerts=outage_value["alerts"])}
+
+*Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n
+"""
+
+    async def region_outage_alerts(
+        self,
+        exception: APIError,
+        deployment_id: str,
+    ) -> None:
+        """
+        Send slack alert if specific provider region is having an outage.
+
+        Track for 408 (Timeout) and >=500 Error codes
+        """
+        ## CREATE (PROVIDER+REGION) ID ##
+        if self.llm_router is None:
+            return
+
+        deployment = self.llm_router.get_deployment(model_id=deployment_id)
+
+        if deployment is None:
+            return
+
+        model = deployment.litellm_params.model
+        ### GET PROVIDER ###
+        provider = deployment.litellm_params.custom_llm_provider
+        if provider is None:
+            model, provider, _, _ = litellm.get_llm_provider(model=model)
+
+        ### GET REGION ###
+        region_name = deployment.litellm_params.region_name
+        if region_name is None:
+            region_name = litellm.utils._get_model_region(
+                custom_llm_provider=provider, litellm_params=deployment.litellm_params
+            )
+
+        if region_name is None:
+            return
+
+        ### UNIQUE CACHE KEY ###
+        cache_key = provider + region_name
+
+        outage_value: Optional[ProviderRegionOutageModel] = (
+            await self.internal_usage_cache.async_get_cache(key=cache_key)
+        )
+
+        if (
+            getattr(exception, "status_code", None) is None
+            or (
+                exception.status_code != 408  # type: ignore
+                and exception.status_code < 500  # type: ignore
+            )
+            or self.llm_router is None
+        ):
+            return
+
+        if outage_value is None:
+            _deployment_set = set()
+            _deployment_set.add(deployment_id)
+            outage_value = ProviderRegionOutageModel(
+                provider_region_id=cache_key,
+                alerts=[exception.status_code],  # type: ignore
+                minor_alert_sent=False,
+                major_alert_sent=False,
+                last_updated_at=time.time(),
+                deployment_ids=_deployment_set,
+            )
+
+            ## add to cache ##
+            await self.internal_usage_cache.async_set_cache(
+                key=cache_key,
+                value=outage_value,
+                ttl=self.alerting_args.region_outage_alert_ttl,
+            )
+            return
+
+        if len(outage_value["alerts"]) < self.alerting_args.max_outage_alert_list_size:
+            outage_value["alerts"].append(exception.status_code)  # type: ignore
+        else:  # prevent memory leaks
+            pass
+        _deployment_set = outage_value["deployment_ids"]
+        _deployment_set.add(deployment_id)
+        outage_value["deployment_ids"] = _deployment_set
+        outage_value["last_updated_at"] = time.time()
+
+        ## MINOR OUTAGE ALERT SENT ##
+        if (
+            outage_value["minor_alert_sent"] is False
+            and len(outage_value["alerts"])
+            >= self.alerting_args.minor_outage_alert_threshold
+            and len(_deployment_set) > 1  # make sure it's not just 1 bad deployment
+        ):
+            msg = self._outage_alert_msg_factory(
+                alert_type="Minor",
+                key="Region",
+                key_val=region_name,
+                api_base=None,
+                outage_value=outage_value,
+                provider=provider,
+            )
+            # send minor alert
+            await self.send_alert(
+                message=msg,
+                level="Medium",
+                alert_type=AlertType.outage_alerts,
+                alerting_metadata={},
+            )
+            # set to true
+            outage_value["minor_alert_sent"] = True
+
+        ## MAJOR OUTAGE ALERT SENT ##
+        elif (
+            outage_value["major_alert_sent"] is False
+            and len(outage_value["alerts"])
+            >= self.alerting_args.major_outage_alert_threshold
+            and len(_deployment_set) > 1  # make sure it's not just 1 bad deployment
+        ):
+            msg = self._outage_alert_msg_factory(
+                alert_type="Major",
+                key="Region",
+                key_val=region_name,
+                api_base=None,
+                outage_value=outage_value,
+                provider=provider,
+            )
+
+            # send minor alert
+            await self.send_alert(
+                message=msg,
+                level="High",
+                alert_type=AlertType.outage_alerts,
+                alerting_metadata={},
+            )
+            # set to true
+            outage_value["major_alert_sent"] = True
+
+        ## update cache ##
+        await self.internal_usage_cache.async_set_cache(
+            key=cache_key, value=outage_value
+        )
+
+    async def outage_alerts(
+        self,
+        exception: APIError,
+        deployment_id: str,
+    ) -> None:
+        """
+        Send slack alert if model is badly configured / having an outage (408, 401, 429, >=500).
+
+        key = model_id
+
+        value = {
+        - model_id
+        - threshold
+        - alerts []
+        }
+
+        ttl = 1hr
+        max_alerts_size = 10
+        """
+        try:
+            outage_value: Optional[OutageModel] = await self.internal_usage_cache.async_get_cache(key=deployment_id)  # type: ignore
+            if (
+                getattr(exception, "status_code", None) is None
+                or (
+                    exception.status_code != 408  # type: ignore
+                    and exception.status_code < 500  # type: ignore
+                )
+                or self.llm_router is None
+            ):
+                return
+
+            ### EXTRACT MODEL DETAILS ###
+            deployment = self.llm_router.get_deployment(model_id=deployment_id)
+            if deployment is None:
+                return
+
+            model = deployment.litellm_params.model
+            provider = deployment.litellm_params.custom_llm_provider
+            if provider is None:
+                try:
+                    model, provider, _, _ = litellm.get_llm_provider(model=model)
+                except Exception:
+                    provider = ""
+            api_base = litellm.get_api_base(
+                model=model, optional_params=deployment.litellm_params
+            )
+
+            if outage_value is None:
+                outage_value = OutageModel(
+                    model_id=deployment_id,
+                    alerts=[exception.status_code],  # type: ignore
+                    minor_alert_sent=False,
+                    major_alert_sent=False,
+                    last_updated_at=time.time(),
+                )
+
+                ## add to cache ##
+                await self.internal_usage_cache.async_set_cache(
+                    key=deployment_id,
+                    value=outage_value,
+                    ttl=self.alerting_args.outage_alert_ttl,
+                )
+                return
+
+            if (
+                len(outage_value["alerts"])
+                < self.alerting_args.max_outage_alert_list_size
+            ):
+                outage_value["alerts"].append(exception.status_code)  # type: ignore
+            else:  # prevent memory leaks
+                pass
+
+            outage_value["last_updated_at"] = time.time()
+
+            ## MINOR OUTAGE ALERT SENT ##
+            if (
+                outage_value["minor_alert_sent"] is False
+                and len(outage_value["alerts"])
+                >= self.alerting_args.minor_outage_alert_threshold
+            ):
+                msg = self._outage_alert_msg_factory(
+                    alert_type="Minor",
+                    key="Model",
+                    key_val=model,
+                    api_base=api_base,
+                    outage_value=outage_value,
+                    provider=provider,
+                )
+                # send minor alert
+                await self.send_alert(
+                    message=msg,
+                    level="Medium",
+                    alert_type=AlertType.outage_alerts,
+                    alerting_metadata={},
+                )
+                # set to true
+                outage_value["minor_alert_sent"] = True
+            elif (
+                outage_value["major_alert_sent"] is False
+                and len(outage_value["alerts"])
+                >= self.alerting_args.major_outage_alert_threshold
+            ):
+                msg = self._outage_alert_msg_factory(
+                    alert_type="Major",
+                    key="Model",
+                    key_val=model,
+                    api_base=api_base,
+                    outage_value=outage_value,
+                    provider=provider,
+                )
+                # send minor alert
+                await self.send_alert(
+                    message=msg,
+                    level="High",
+                    alert_type=AlertType.outage_alerts,
+                    alerting_metadata={},
+                )
+                # set to true
+                outage_value["major_alert_sent"] = True
+
+            ## update cache ##
+            await self.internal_usage_cache.async_set_cache(
+                key=deployment_id, value=outage_value
+            )
+        except Exception:
+            pass
+
+    async def model_added_alert(
+        self, model_name: str, litellm_model_name: str, passed_model_info: Any
+    ):
+        base_model_from_user = getattr(passed_model_info, "base_model", None)
+        model_info = {}
+        base_model = ""
+        if base_model_from_user is not None:
+            model_info = litellm.model_cost.get(base_model_from_user, {})
+            base_model = f"Base Model: `{base_model_from_user}`\n"
+        else:
+            model_info = litellm.model_cost.get(litellm_model_name, {})
+        model_info_str = ""
+        for k, v in model_info.items():
+            if k == "input_cost_per_token" or k == "output_cost_per_token":
+                # when converting to string it should not be 1.63e-06
+                v = "{:.8f}".format(v)
+
+            model_info_str += f"{k}: {v}\n"
+
+        message = f"""
+*šŸš… New Model Added*
+Model Name: `{model_name}`
+{base_model}
+
+Usage OpenAI Python SDK:
+```
+import openai
+client = openai.OpenAI(
+    api_key="your_api_key",
+    base_url={os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")}
+)
+
+response = client.chat.completions.create(
+    model="{model_name}", # model to send to the proxy
+    messages = [
+        {{
+            "role": "user",
+            "content": "this is a test request, write a short poem"
+        }}
+    ]
+)
+```
+
+Model Info: 
+```
+{model_info_str}
+```
+"""
+
+        alert_val = self.send_alert(
+            message=message,
+            level="Low",
+            alert_type=AlertType.new_model_added,
+            alerting_metadata={},
+        )
+
+        if alert_val is not None and asyncio.iscoroutine(alert_val):
+            await alert_val
+
+    async def model_removed_alert(self, model_name: str):
+        pass
+
+    async def send_webhook_alert(self, webhook_event: WebhookEvent) -> bool:
+        """
+        Sends structured alert to webhook, if set.
+
+        Currently only implemented for budget alerts
+
+        Returns -> True if sent, False if not.
+
+        Raises Exception
+            - if WEBHOOK_URL is not set
+        """
+
+        webhook_url = os.getenv("WEBHOOK_URL", None)
+        if webhook_url is None:
+            raise Exception("Missing webhook_url from environment")
+
+        payload = webhook_event.model_dump_json()
+        headers = {"Content-type": "application/json"}
+
+        response = await self.async_http_handler.post(
+            url=webhook_url,
+            headers=headers,
+            data=payload,
+        )
+        if response.status_code == 200:
+            return True
+        else:
+            print("Error sending webhook alert. Error=", response.text)  # noqa
+
+        return False
+
+    async def _check_if_using_premium_email_feature(
+        self,
+        premium_user: bool,
+        email_logo_url: Optional[str] = None,
+        email_support_contact: Optional[str] = None,
+    ):
+        from litellm.proxy.proxy_server import CommonProxyErrors, premium_user
+
+        if premium_user is not True:
+            if email_logo_url is not None or email_support_contact is not None:
+                raise ValueError(
+                    f"Trying to Customize Email Alerting\n {CommonProxyErrors.not_premium_user.value}"
+                )
+        return
+
+    async def send_key_created_or_user_invited_email(
+        self, webhook_event: WebhookEvent
+    ) -> bool:
+        try:
+            from litellm.proxy.utils import send_email
+
+            if self.alerting is None or "email" not in self.alerting:
+                # do nothing if user does not want email alerts
+                verbose_proxy_logger.error(
+                    "Error sending email alert - 'email' not in self.alerting %s",
+                    self.alerting,
+                )
+                return False
+            from litellm.proxy.proxy_server import premium_user, prisma_client
+
+            email_logo_url = os.getenv(
+                "SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None)
+            )
+            email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None)
+            await self._check_if_using_premium_email_feature(
+                premium_user, email_logo_url, email_support_contact
+            )
+            if email_logo_url is None:
+                email_logo_url = LITELLM_LOGO_URL
+            if email_support_contact is None:
+                email_support_contact = LITELLM_SUPPORT_CONTACT
+
+            event_name = webhook_event.event_message
+            recipient_email = webhook_event.user_email
+            recipient_user_id = webhook_event.user_id
+            if (
+                recipient_email is None
+                and recipient_user_id is not None
+                and prisma_client is not None
+            ):
+                user_row = await prisma_client.db.litellm_usertable.find_unique(
+                    where={"user_id": recipient_user_id}
+                )
+
+                if user_row is not None:
+                    recipient_email = user_row.user_email
+
+            key_token = webhook_event.token
+            key_budget = webhook_event.max_budget
+            base_url = os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")
+
+            email_html_content = "Alert from LiteLLM Server"
+            if recipient_email is None:
+                verbose_proxy_logger.error(
+                    "Trying to send email alert to no recipient",
+                    extra=webhook_event.dict(),
+                )
+
+            if webhook_event.event == "key_created":
+                email_html_content = KEY_CREATED_EMAIL_TEMPLATE.format(
+                    email_logo_url=email_logo_url,
+                    recipient_email=recipient_email,
+                    key_budget=key_budget,
+                    key_token=key_token,
+                    base_url=base_url,
+                    email_support_contact=email_support_contact,
+                )
+            elif webhook_event.event == "internal_user_created":
+                # GET TEAM NAME
+                team_id = webhook_event.team_id
+                team_name = "Default Team"
+                if team_id is not None and prisma_client is not None:
+                    team_row = await prisma_client.db.litellm_teamtable.find_unique(
+                        where={"team_id": team_id}
+                    )
+                    if team_row is not None:
+                        team_name = team_row.team_alias or "-"
+                email_html_content = USER_INVITED_EMAIL_TEMPLATE.format(
+                    email_logo_url=email_logo_url,
+                    recipient_email=recipient_email,
+                    team_name=team_name,
+                    base_url=base_url,
+                    email_support_contact=email_support_contact,
+                )
+            else:
+                verbose_proxy_logger.error(
+                    "Trying to send email alert on unknown webhook event",
+                    extra=webhook_event.model_dump(),
+                )
+
+            webhook_event.model_dump_json()
+            email_event = {
+                "to": recipient_email,
+                "subject": f"LiteLLM: {event_name}",
+                "html": email_html_content,
+            }
+
+            await send_email(
+                receiver_email=email_event["to"],
+                subject=email_event["subject"],
+                html=email_event["html"],
+            )
+
+            return True
+
+        except Exception as e:
+            verbose_proxy_logger.error("Error sending email alert %s", str(e))
+            return False
+
+    async def send_email_alert_using_smtp(
+        self, webhook_event: WebhookEvent, alert_type: str
+    ) -> bool:
+        """
+        Sends structured Email alert to an SMTP server
+
+        Currently only implemented for budget alerts
+
+        Returns -> True if sent, False if not.
+        """
+        from litellm.proxy.proxy_server import premium_user
+        from litellm.proxy.utils import send_email
+
+        email_logo_url = os.getenv(
+            "SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None)
+        )
+        email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None)
+        await self._check_if_using_premium_email_feature(
+            premium_user, email_logo_url, email_support_contact
+        )
+
+        if email_logo_url is None:
+            email_logo_url = LITELLM_LOGO_URL
+        if email_support_contact is None:
+            email_support_contact = LITELLM_SUPPORT_CONTACT
+
+        event_name = webhook_event.event_message
+        recipient_email = webhook_event.user_email
+        user_name = webhook_event.user_id
+        max_budget = webhook_event.max_budget
+        email_html_content = "Alert from LiteLLM Server"
+        if recipient_email is None:
+            verbose_proxy_logger.error(
+                "Trying to send email alert to no recipient", extra=webhook_event.dict()
+            )
+
+        if webhook_event.event == "budget_crossed":
+            email_html_content = f"""
+            <img src="{email_logo_url}" alt="LiteLLM Logo" width="150" height="50" />
+
+            <p> Hi {user_name}, <br/>
+
+            Your LLM API usage this month has reached your account's <b> monthly budget of ${max_budget} </b> <br /> <br />
+
+            API requests will be rejected until either (a) you increase your monthly budget or (b) your monthly usage resets at the beginning of the next calendar month. <br /> <br />
+
+            If you have any questions, please send an email to {email_support_contact} <br /> <br />
+
+            Best, <br />
+            The LiteLLM team <br />
+            """
+
+        webhook_event.model_dump_json()
+        email_event = {
+            "to": recipient_email,
+            "subject": f"LiteLLM: {event_name}",
+            "html": email_html_content,
+        }
+
+        await send_email(
+            receiver_email=email_event["to"],
+            subject=email_event["subject"],
+            html=email_event["html"],
+        )
+        if webhook_event.event_group == "team":
+            from litellm.integrations.email_alerting import send_team_budget_alert
+
+            await send_team_budget_alert(webhook_event=webhook_event)
+
+        return False
+
+    async def send_alert(
+        self,
+        message: str,
+        level: Literal["Low", "Medium", "High"],
+        alert_type: AlertType,
+        alerting_metadata: dict,
+        user_info: Optional[WebhookEvent] = None,
+        **kwargs,
+    ):
+        """
+        Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298
+
+        - Responses taking too long
+        - Requests are hanging
+        - Calls are failing
+        - DB Read/Writes are failing
+        - Proxy Close to max budget
+        - Key Close to max budget
+
+        Parameters:
+            level: str - Low|Medium|High - if calls might fail (Medium) or are failing (High); Currently, no alerts would be 'Low'.
+            message: str - what is the alert about
+        """
+        if self.alerting is None:
+            return
+
+        if (
+            "webhook" in self.alerting
+            and alert_type == "budget_alerts"
+            and user_info is not None
+        ):
+            await self.send_webhook_alert(webhook_event=user_info)
+
+        if (
+            "email" in self.alerting
+            and alert_type == "budget_alerts"
+            and user_info is not None
+        ):
+            # only send budget alerts over Email
+            await self.send_email_alert_using_smtp(
+                webhook_event=user_info, alert_type=alert_type
+            )
+
+        if "slack" not in self.alerting:
+            return
+        if alert_type not in self.alert_types:
+            return
+
+        from datetime import datetime
+
+        # Get the current timestamp
+        current_time = datetime.now().strftime("%H:%M:%S")
+        _proxy_base_url = os.getenv("PROXY_BASE_URL", None)
+        if alert_type == "daily_reports" or alert_type == "new_model_added":
+            formatted_message = message
+        else:
+            formatted_message = (
+                f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}"
+            )
+
+        if kwargs:
+            for key, value in kwargs.items():
+                formatted_message += f"\n\n{key}: `{value}`\n\n"
+        if alerting_metadata:
+            for key, value in alerting_metadata.items():
+                formatted_message += f"\n\n*Alerting Metadata*: \n{key}: `{value}`\n\n"
+        if _proxy_base_url is not None:
+            formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`"
+
+        # check if we find the slack webhook url in self.alert_to_webhook_url
+        if (
+            self.alert_to_webhook_url is not None
+            and alert_type in self.alert_to_webhook_url
+        ):
+            slack_webhook_url: Optional[Union[str, List[str]]] = (
+                self.alert_to_webhook_url[alert_type]
+            )
+        elif self.default_webhook_url is not None:
+            slack_webhook_url = self.default_webhook_url
+        else:
+            slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL", None)
+
+        if slack_webhook_url is None:
+            raise ValueError("Missing SLACK_WEBHOOK_URL from environment")
+        payload = {"text": formatted_message}
+        headers = {"Content-type": "application/json"}
+
+        if isinstance(slack_webhook_url, list):
+            for url in slack_webhook_url:
+                self.log_queue.append(
+                    {
+                        "url": url,
+                        "headers": headers,
+                        "payload": payload,
+                        "alert_type": alert_type,
+                    }
+                )
+        else:
+            self.log_queue.append(
+                {
+                    "url": slack_webhook_url,
+                    "headers": headers,
+                    "payload": payload,
+                    "alert_type": alert_type,
+                }
+            )
+
+        if len(self.log_queue) >= self.batch_size:
+            await self.flush_queue()
+
+    async def async_send_batch(self):
+        if not self.log_queue:
+            return
+
+        squashed_queue = squash_payloads(self.log_queue)
+        tasks = [
+            send_to_webhook(
+                slackAlertingInstance=self, item=item["item"], count=item["count"]
+            )
+            for item in squashed_queue.values()
+        ]
+        await asyncio.gather(*tasks)
+        self.log_queue.clear()
+
+    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
+        """Log deployment latency"""
+        try:
+            if "daily_reports" in self.alert_types:
+                litellm_params = kwargs.get("litellm_params", {}) or {}
+                model_info = litellm_params.get("model_info", {}) or {}
+                model_id = model_info.get("id", "") or ""
+                response_s: timedelta = end_time - start_time
+
+                final_value = response_s
+
+                if isinstance(response_obj, litellm.ModelResponse) and (
+                    hasattr(response_obj, "usage")
+                    and response_obj.usage is not None  # type: ignore
+                    and hasattr(response_obj.usage, "completion_tokens")  # type: ignore
+                ):
+                    completion_tokens = response_obj.usage.completion_tokens  # type: ignore
+                    if completion_tokens is not None and completion_tokens > 0:
+                        final_value = float(
+                            response_s.total_seconds() / completion_tokens
+                        )
+                if isinstance(final_value, timedelta):
+                    final_value = final_value.total_seconds()
+
+                await self.async_update_daily_reports(
+                    DeploymentMetrics(
+                        id=model_id,
+                        failed_request=False,
+                        latency_per_output_token=final_value,
+                        updated_at=litellm.utils.get_utc_datetime(),
+                    )
+                )
+        except Exception as e:
+            verbose_proxy_logger.error(
+                f"[Non-Blocking Error] Slack Alerting: Got error in logging LLM deployment latency: {str(e)}"
+            )
+            pass
+
+    async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
+        """Log failure + deployment latency"""
+        _litellm_params = kwargs.get("litellm_params", {})
+        _model_info = _litellm_params.get("model_info", {}) or {}
+        model_id = _model_info.get("id", "")
+        try:
+            if "daily_reports" in self.alert_types:
+                try:
+                    await self.async_update_daily_reports(
+                        DeploymentMetrics(
+                            id=model_id,
+                            failed_request=True,
+                            latency_per_output_token=None,
+                            updated_at=litellm.utils.get_utc_datetime(),
+                        )
+                    )
+                except Exception as e:
+                    verbose_logger.debug(f"Exception raises -{str(e)}")
+
+            if isinstance(kwargs.get("exception", ""), APIError):
+                if "outage_alerts" in self.alert_types:
+                    await self.outage_alerts(
+                        exception=kwargs["exception"],
+                        deployment_id=model_id,
+                    )
+
+                if "region_outage_alerts" in self.alert_types:
+                    await self.region_outage_alerts(
+                        exception=kwargs["exception"], deployment_id=model_id
+                    )
+        except Exception:
+            pass
+
+    async def _run_scheduler_helper(self, llm_router) -> bool:
+        """
+        Returns:
+        - True -> report sent
+        - False -> report not sent
+        """
+        report_sent_bool = False
+
+        report_sent = await self.internal_usage_cache.async_get_cache(
+            key=SlackAlertingCacheKeys.report_sent_key.value,
+            parent_otel_span=None,
+        )  # None | float
+
+        current_time = time.time()
+
+        if report_sent is None:
+            await self.internal_usage_cache.async_set_cache(
+                key=SlackAlertingCacheKeys.report_sent_key.value,
+                value=current_time,
+            )
+        elif isinstance(report_sent, float):
+            # Check if current time - interval >= time last sent
+            interval_seconds = self.alerting_args.daily_report_frequency
+
+            if current_time - report_sent >= interval_seconds:
+                # Sneak in the reporting logic here
+                await self.send_daily_reports(router=llm_router)
+                # Also, don't forget to update the report_sent time after sending the report!
+                await self.internal_usage_cache.async_set_cache(
+                    key=SlackAlertingCacheKeys.report_sent_key.value,
+                    value=current_time,
+                )
+                report_sent_bool = True
+
+        return report_sent_bool
+
+    async def _run_scheduled_daily_report(self, llm_router: Optional[Any] = None):
+        """
+        If 'daily_reports' enabled
+
+        Ping redis cache every 5 minutes to check if we should send the report
+
+        If yes -> call send_daily_report()
+        """
+        if llm_router is None or self.alert_types is None:
+            return
+
+        if "daily_reports" in self.alert_types:
+            while True:
+                await self._run_scheduler_helper(llm_router=llm_router)
+                interval = random.randint(
+                    self.alerting_args.report_check_interval - 3,
+                    self.alerting_args.report_check_interval + 3,
+                )  # shuffle to prevent collisions
+                await asyncio.sleep(interval)
+        return
+
+    async def send_weekly_spend_report(
+        self,
+        time_range: str = "7d",
+    ):
+        """
+        Send a spend report for a configurable time range.
+
+        Args:
+            time_range: A string specifying the time range for the report, e.g., "1d", "7d", "30d"
+        """
+        if self.alerting is None or "spend_reports" not in self.alert_types:
+            return
+
+        try:
+            from litellm.proxy.spend_tracking.spend_management_endpoints import (
+                _get_spend_report_for_time_range,
+            )
+
+            # Parse the time range
+            days = int(time_range[:-1])
+            if time_range[-1].lower() != "d":
+                raise ValueError("Time range must be specified in days, e.g., '7d'")
+
+            todays_date = datetime.datetime.now().date()
+            start_date = todays_date - datetime.timedelta(days=days)
+
+            _event_cache_key = f"weekly_spend_report_sent_{start_date.strftime('%Y-%m-%d')}_{todays_date.strftime('%Y-%m-%d')}"
+            if await self.internal_usage_cache.async_get_cache(key=_event_cache_key):
+                return
+
+            _resp = await _get_spend_report_for_time_range(
+                start_date=start_date.strftime("%Y-%m-%d"),
+                end_date=todays_date.strftime("%Y-%m-%d"),
+            )
+            if _resp is None or _resp == ([], []):
+                return
+
+            spend_per_team, spend_per_tag = _resp
+
+            _spend_message = f"*šŸ’ø Spend Report for `{start_date.strftime('%m-%d-%Y')} - {todays_date.strftime('%m-%d-%Y')}` ({days} days)*\n"
+
+            if spend_per_team is not None:
+                _spend_message += "\n*Team Spend Report:*\n"
+                for spend in spend_per_team:
+                    _team_spend = round(float(spend["total_spend"]), 4)
+                    _spend_message += (
+                        f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n"
+                    )
+
+            if spend_per_tag is not None:
+                _spend_message += "\n*Tag Spend Report:*\n"
+                for spend in spend_per_tag:
+                    _tag_spend = round(float(spend["total_spend"]), 4)
+                    _spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n"
+
+            await self.send_alert(
+                message=_spend_message,
+                level="Low",
+                alert_type=AlertType.spend_reports,
+                alerting_metadata={},
+            )
+
+            await self.internal_usage_cache.async_set_cache(
+                key=_event_cache_key,
+                value="SENT",
+                ttl=duration_in_seconds(time_range),
+            )
+
+        except ValueError as ve:
+            verbose_proxy_logger.error(f"Invalid time range format: {ve}")
+        except Exception as e:
+            verbose_proxy_logger.error(f"Error sending spend report: {e}")
+
+    async def send_monthly_spend_report(self):
+        """ """
+        try:
+            from calendar import monthrange
+
+            from litellm.proxy.spend_tracking.spend_management_endpoints import (
+                _get_spend_report_for_time_range,
+            )
+
+            todays_date = datetime.datetime.now().date()
+            first_day_of_month = todays_date.replace(day=1)
+            _, last_day_of_month = monthrange(todays_date.year, todays_date.month)
+            last_day_of_month = first_day_of_month + datetime.timedelta(
+                days=last_day_of_month - 1
+            )
+
+            _event_cache_key = f"monthly_spend_report_sent_{first_day_of_month.strftime('%Y-%m-%d')}_{last_day_of_month.strftime('%Y-%m-%d')}"
+            if await self.internal_usage_cache.async_get_cache(key=_event_cache_key):
+                return
+
+            _resp = await _get_spend_report_for_time_range(
+                start_date=first_day_of_month.strftime("%Y-%m-%d"),
+                end_date=last_day_of_month.strftime("%Y-%m-%d"),
+            )
+
+            if _resp is None or _resp == ([], []):
+                return
+
+            monthly_spend_per_team, monthly_spend_per_tag = _resp
+
+            _spend_message = f"*šŸ’ø Monthly Spend Report for `{first_day_of_month.strftime('%m-%d-%Y')} - {last_day_of_month.strftime('%m-%d-%Y')}` *\n"
+
+            if monthly_spend_per_team is not None:
+                _spend_message += "\n*Team Spend Report:*\n"
+                for spend in monthly_spend_per_team:
+                    _team_spend = spend["total_spend"]
+                    _team_spend = float(_team_spend)
+                    # round to 4 decimal places
+                    _team_spend = round(_team_spend, 4)
+                    _spend_message += (
+                        f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n"
+                    )
+
+            if monthly_spend_per_tag is not None:
+                _spend_message += "\n*Tag Spend Report:*\n"
+                for spend in monthly_spend_per_tag:
+                    _tag_spend = spend["total_spend"]
+                    _tag_spend = float(_tag_spend)
+                    # round to 4 decimal places
+                    _tag_spend = round(_tag_spend, 4)
+                    _spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n"
+
+            await self.send_alert(
+                message=_spend_message,
+                level="Low",
+                alert_type=AlertType.spend_reports,
+                alerting_metadata={},
+            )
+
+            await self.internal_usage_cache.async_set_cache(
+                key=_event_cache_key,
+                value="SENT",
+                ttl=(30 * 24 * 60 * 60),  # 1 month
+            )
+
+        except Exception as e:
+            verbose_proxy_logger.exception("Error sending weekly spend report %s", e)
+
+    async def send_fallback_stats_from_prometheus(self):
+        """
+        Helper to send fallback statistics from prometheus server -> to slack
+
+        This runs once per day and sends an overview of all the fallback statistics
+        """
+        try:
+            from litellm.integrations.prometheus_helpers.prometheus_api import (
+                get_fallback_metric_from_prometheus,
+            )
+
+            # call prometheuslogger.
+            falllback_success_info_prometheus = (
+                await get_fallback_metric_from_prometheus()
+            )
+
+            fallback_message = (
+                f"*Fallback Statistics:*\n{falllback_success_info_prometheus}"
+            )
+
+            await self.send_alert(
+                message=fallback_message,
+                level="Low",
+                alert_type=AlertType.fallback_reports,
+                alerting_metadata={},
+            )
+
+        except Exception as e:
+            verbose_proxy_logger.error("Error sending weekly spend report %s", e)
+
+        pass
+
+    async def send_virtual_key_event_slack(
+        self,
+        key_event: VirtualKeyEvent,
+        alert_type: AlertType,
+        event_name: str,
+    ):
+        """
+        Handles sending Virtual Key related alerts
+
+        Example:
+        - New Virtual Key Created
+        - Internal User Updated
+        - Team Created, Updated, Deleted
+        """
+        try:
+
+            message = f"`{event_name}`\n"
+
+            key_event_dict = key_event.model_dump()
+
+            # Add Created by information first
+            message += "*Action Done by:*\n"
+            for key, value in key_event_dict.items():
+                if "created_by" in key:
+                    message += f"{key}: `{value}`\n"
+
+            # Add args sent to function in the alert
+            message += "\n*Arguments passed:*\n"
+            request_kwargs = key_event.request_kwargs
+            for key, value in request_kwargs.items():
+                if key == "user_api_key_dict":
+                    continue
+                message += f"{key}: `{value}`\n"
+
+            await self.send_alert(
+                message=message,
+                level="High",
+                alert_type=alert_type,
+                alerting_metadata={},
+            )
+
+        except Exception as e:
+            verbose_proxy_logger.error(
+                "Error sending send_virtual_key_event_slack %s", e
+            )
+
+        return
+
+    async def _request_is_completed(self, request_data: Optional[dict]) -> bool:
+        """
+        Returns True if the request is completed - either as a success or failure
+        """
+        if request_data is None:
+            return False
+
+        if (
+            request_data.get("litellm_status", "") != "success"
+            and request_data.get("litellm_status", "") != "fail"
+        ):
+            ## CHECK IF CACHE IS UPDATED
+            litellm_call_id = request_data.get("litellm_call_id", "")
+            status: Optional[str] = await self.internal_usage_cache.async_get_cache(
+                key="request_status:{}".format(litellm_call_id), local_only=True
+            )
+            if status is not None and (status == "success" or status == "fail"):
+                return True
+        return False
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/utils.py b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/utils.py
new file mode 100644
index 00000000..0dc8bae5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/utils.py
@@ -0,0 +1,92 @@
+"""
+Utils used for slack alerting
+"""
+
+import asyncio
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from litellm.proxy._types import AlertType
+from litellm.secret_managers.main import get_secret
+
+if TYPE_CHECKING:
+    from litellm.litellm_core_utils.litellm_logging import Logging as _Logging
+
+    Logging = _Logging
+else:
+    Logging = Any
+
+
+def process_slack_alerting_variables(
+    alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]]
+) -> Optional[Dict[AlertType, Union[List[str], str]]]:
+    """
+    process alert_to_webhook_url
+    - check if any urls are set as os.environ/SLACK_WEBHOOK_URL_1 read env var and set the correct value
+    """
+    if alert_to_webhook_url is None:
+        return None
+
+    for alert_type, webhook_urls in alert_to_webhook_url.items():
+        if isinstance(webhook_urls, list):
+            _webhook_values: List[str] = []
+            for webhook_url in webhook_urls:
+                if "os.environ/" in webhook_url:
+                    _env_value = get_secret(secret_name=webhook_url)
+                    if not isinstance(_env_value, str):
+                        raise ValueError(
+                            f"Invalid webhook url value for: {webhook_url}. Got type={type(_env_value)}"
+                        )
+                    _webhook_values.append(_env_value)
+                else:
+                    _webhook_values.append(webhook_url)
+
+            alert_to_webhook_url[alert_type] = _webhook_values
+        else:
+            _webhook_value_str: str = webhook_urls
+            if "os.environ/" in webhook_urls:
+                _env_value = get_secret(secret_name=webhook_urls)
+                if not isinstance(_env_value, str):
+                    raise ValueError(
+                        f"Invalid webhook url value for: {webhook_urls}. Got type={type(_env_value)}"
+                    )
+                _webhook_value_str = _env_value
+            else:
+                _webhook_value_str = webhook_urls
+
+            alert_to_webhook_url[alert_type] = _webhook_value_str
+
+    return alert_to_webhook_url
+
+
+async def _add_langfuse_trace_id_to_alert(
+    request_data: Optional[dict] = None,
+) -> Optional[str]:
+    """
+    Returns langfuse trace url
+
+    - check:
+    -> existing_trace_id
+    -> trace_id
+    -> litellm_call_id
+    """
+    # do nothing for now
+    if (
+        request_data is not None
+        and request_data.get("litellm_logging_obj", None) is not None
+    ):
+        trace_id: Optional[str] = None
+        litellm_logging_obj: Logging = request_data["litellm_logging_obj"]
+
+        for _ in range(3):
+            trace_id = litellm_logging_obj._get_trace_id(service_name="langfuse")
+            if trace_id is not None:
+                break
+            await asyncio.sleep(3)  # wait 3s before retrying for trace id
+
+        _langfuse_object = litellm_logging_obj._get_callback_object(
+            service_name="langfuse"
+        )
+        if _langfuse_object is not None:
+            base_url = _langfuse_object.Langfuse.base_url
+            return f"{base_url}/trace/{trace_id}"
+    return None