aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/Readme.md13
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/batching_handler.py82
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/slack_alerting.py1822
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/utils.py92
4 files changed, 2009 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/Readme.md b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/Readme.md
new file mode 100644
index 00000000..f28f7150
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/Readme.md
@@ -0,0 +1,13 @@
+# Slack Alerting on LiteLLM Gateway
+
+This folder contains the Slack Alerting integration for LiteLLM Gateway.
+
+## Folder Structure
+
+- `slack_alerting.py`: This is the main file that handles sending different types of alerts
+- `batching_handler.py`: Handles Batching + sending Httpx Post requests to slack. Slack alerts are sent every 10s or when events are greater than X events. Done to ensure litellm has good performance under high traffic
+- `types.py`: This file contains the AlertType enum which is used to define the different types of alerts that can be sent to Slack.
+- `utils.py`: This file contains common utils used specifically for slack alerting
+
+## Further Reading
+- [Doc setting up Alerting on LiteLLM Proxy (Gateway)](https://docs.litellm.ai/docs/proxy/alerting) \ No newline at end of file
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/batching_handler.py b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/batching_handler.py
new file mode 100644
index 00000000..e35cf61d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/batching_handler.py
@@ -0,0 +1,82 @@
+"""
+Handles Batching + sending Httpx Post requests to slack
+
+Slack alerts are sent every 10s or when events are greater than X events
+
+see custom_batch_logger.py for more details / defaults
+"""
+
+from typing import TYPE_CHECKING, Any
+
+from litellm._logging import verbose_proxy_logger
+
+if TYPE_CHECKING:
+ from .slack_alerting import SlackAlerting as _SlackAlerting
+
+ SlackAlertingType = _SlackAlerting
+else:
+ SlackAlertingType = Any
+
+
+def squash_payloads(queue):
+
+ squashed = {}
+ if len(queue) == 0:
+ return squashed
+ if len(queue) == 1:
+ return {"key": {"item": queue[0], "count": 1}}
+
+ for item in queue:
+ url = item["url"]
+ alert_type = item["alert_type"]
+ _key = (url, alert_type)
+
+ if _key in squashed:
+ squashed[_key]["count"] += 1
+ # Merge the payloads
+
+ else:
+ squashed[_key] = {"item": item, "count": 1}
+
+ return squashed
+
+
+def _print_alerting_payload_warning(
+ payload: dict, slackAlertingInstance: SlackAlertingType
+):
+ """
+ Print the payload to the console when
+ slackAlertingInstance.alerting_args.log_to_console is True
+
+ Relevant issue: https://github.com/BerriAI/litellm/issues/7372
+ """
+ if slackAlertingInstance.alerting_args.log_to_console is True:
+ verbose_proxy_logger.warning(payload)
+
+
+async def send_to_webhook(slackAlertingInstance: SlackAlertingType, item, count):
+ """
+ Send a single slack alert to the webhook
+ """
+ import json
+
+ payload = item.get("payload", {})
+ try:
+ if count > 1:
+ payload["text"] = f"[Num Alerts: {count}]\n\n{payload['text']}"
+
+ response = await slackAlertingInstance.async_http_handler.post(
+ url=item["url"],
+ headers=item["headers"],
+ data=json.dumps(payload),
+ )
+ if response.status_code != 200:
+ verbose_proxy_logger.debug(
+ f"Error sending slack alert to url={item['url']}. Error={response.text}"
+ )
+ except Exception as e:
+ verbose_proxy_logger.debug(f"Error sending slack alert: {str(e)}")
+ finally:
+ _print_alerting_payload_warning(
+ payload, slackAlertingInstance=slackAlertingInstance
+ )
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/slack_alerting.py b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/slack_alerting.py
new file mode 100644
index 00000000..a2e62647
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/slack_alerting.py
@@ -0,0 +1,1822 @@
+#### What this does ####
+# Class for sending Slack Alerts #
+import asyncio
+import datetime
+import os
+import random
+import time
+from datetime import timedelta
+from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union
+
+from openai import APIError
+
+import litellm
+import litellm.litellm_core_utils
+import litellm.litellm_core_utils.litellm_logging
+import litellm.types
+from litellm._logging import verbose_logger, verbose_proxy_logger
+from litellm.caching.caching import DualCache
+from litellm.integrations.custom_batch_logger import CustomBatchLogger
+from litellm.litellm_core_utils.duration_parser import duration_in_seconds
+from litellm.litellm_core_utils.exception_mapping_utils import (
+ _add_key_name_and_team_to_alert,
+)
+from litellm.llms.custom_httpx.http_handler import (
+ get_async_httpx_client,
+ httpxSpecialProvider,
+)
+from litellm.proxy._types import AlertType, CallInfo, VirtualKeyEvent, WebhookEvent
+from litellm.types.integrations.slack_alerting import *
+
+from ..email_templates.templates import *
+from .batching_handler import send_to_webhook, squash_payloads
+from .utils import _add_langfuse_trace_id_to_alert, process_slack_alerting_variables
+
+if TYPE_CHECKING:
+ from litellm.router import Router as _Router
+
+ Router = _Router
+else:
+ Router = Any
+
+
+class SlackAlerting(CustomBatchLogger):
+ """
+ Class for sending Slack Alerts
+ """
+
+ # Class variables or attributes
+ def __init__(
+ self,
+ internal_usage_cache: Optional[DualCache] = None,
+ alerting_threshold: Optional[
+ float
+ ] = None, # threshold for slow / hanging llm responses (in seconds)
+ alerting: Optional[List] = [],
+ alert_types: List[AlertType] = DEFAULT_ALERT_TYPES,
+ alert_to_webhook_url: Optional[
+ Dict[AlertType, Union[List[str], str]]
+ ] = None, # if user wants to separate alerts to diff channels
+ alerting_args={},
+ default_webhook_url: Optional[str] = None,
+ **kwargs,
+ ):
+ if alerting_threshold is None:
+ alerting_threshold = 300
+ self.alerting_threshold = alerting_threshold
+ self.alerting = alerting
+ self.alert_types = alert_types
+ self.internal_usage_cache = internal_usage_cache or DualCache()
+ self.async_http_handler = get_async_httpx_client(
+ llm_provider=httpxSpecialProvider.LoggingCallback
+ )
+ self.alert_to_webhook_url = process_slack_alerting_variables(
+ alert_to_webhook_url=alert_to_webhook_url
+ )
+ self.is_running = False
+ self.alerting_args = SlackAlertingArgs(**alerting_args)
+ self.default_webhook_url = default_webhook_url
+ self.flush_lock = asyncio.Lock()
+ super().__init__(**kwargs, flush_lock=self.flush_lock)
+
+ def update_values(
+ self,
+ alerting: Optional[List] = None,
+ alerting_threshold: Optional[float] = None,
+ alert_types: Optional[List[AlertType]] = None,
+ alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]] = None,
+ alerting_args: Optional[Dict] = None,
+ llm_router: Optional[Router] = None,
+ ):
+ if alerting is not None:
+ self.alerting = alerting
+ asyncio.create_task(self.periodic_flush())
+ if alerting_threshold is not None:
+ self.alerting_threshold = alerting_threshold
+ if alert_types is not None:
+ self.alert_types = alert_types
+ if alerting_args is not None:
+ self.alerting_args = SlackAlertingArgs(**alerting_args)
+ if alert_to_webhook_url is not None:
+ # update the dict
+ if self.alert_to_webhook_url is None:
+ self.alert_to_webhook_url = process_slack_alerting_variables(
+ alert_to_webhook_url=alert_to_webhook_url
+ )
+ else:
+ _new_values = (
+ process_slack_alerting_variables(
+ alert_to_webhook_url=alert_to_webhook_url
+ )
+ or {}
+ )
+ self.alert_to_webhook_url.update(_new_values)
+ if llm_router is not None:
+ self.llm_router = llm_router
+
+ async def deployment_in_cooldown(self):
+ pass
+
+ async def deployment_removed_from_cooldown(self):
+ pass
+
+ def _all_possible_alert_types(self):
+ # used by the UI to show all supported alert types
+ # Note: This is not the alerts the user has configured, instead it's all possible alert types a user can select
+ # return list of all values AlertType enum
+ return list(AlertType)
+
+ def _response_taking_too_long_callback_helper(
+ self,
+ kwargs, # kwargs to completion
+ start_time,
+ end_time, # start/end time
+ ):
+ try:
+ time_difference = end_time - start_time
+ # Convert the timedelta to float (in seconds)
+ time_difference_float = time_difference.total_seconds()
+ litellm_params = kwargs.get("litellm_params", {})
+ model = kwargs.get("model", "")
+ api_base = litellm.get_api_base(model=model, optional_params=litellm_params)
+ messages = kwargs.get("messages", None)
+ # if messages does not exist fallback to "input"
+ if messages is None:
+ messages = kwargs.get("input", None)
+
+ # only use first 100 chars for alerting
+ _messages = str(messages)[:100]
+
+ return time_difference_float, model, api_base, _messages
+ except Exception as e:
+ raise e
+
+ def _get_deployment_latencies_to_alert(self, metadata=None):
+ if metadata is None:
+ return None
+
+ if "_latency_per_deployment" in metadata:
+ # Translate model_id to -> api_base
+ # _latency_per_deployment is a dictionary that looks like this:
+ """
+ _latency_per_deployment: {
+ api_base: 0.01336697916666667
+ }
+ """
+ _message_to_send = ""
+ _deployment_latencies = metadata["_latency_per_deployment"]
+ if len(_deployment_latencies) == 0:
+ return None
+ _deployment_latency_map: Optional[dict] = None
+ try:
+ # try sorting deployments by latency
+ _deployment_latencies = sorted(
+ _deployment_latencies.items(), key=lambda x: x[1]
+ )
+ _deployment_latency_map = dict(_deployment_latencies)
+ except Exception:
+ pass
+
+ if _deployment_latency_map is None:
+ return
+
+ for api_base, latency in _deployment_latency_map.items():
+ _message_to_send += f"\n{api_base}: {round(latency,2)}s"
+ _message_to_send = "```" + _message_to_send + "```"
+ return _message_to_send
+
+ async def response_taking_too_long_callback(
+ self,
+ kwargs, # kwargs to completion
+ completion_response, # response from completion
+ start_time,
+ end_time, # start/end time
+ ):
+ if self.alerting is None or self.alert_types is None:
+ return
+
+ time_difference_float, model, api_base, messages = (
+ self._response_taking_too_long_callback_helper(
+ kwargs=kwargs,
+ start_time=start_time,
+ end_time=end_time,
+ )
+ )
+ if litellm.turn_off_message_logging or litellm.redact_messages_in_exceptions:
+ messages = "Message not logged. litellm.redact_messages_in_exceptions=True"
+ request_info = f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`"
+ slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {self.alerting_threshold}s`"
+ alerting_metadata: dict = {}
+ if time_difference_float > self.alerting_threshold:
+ # add deployment latencies to alert
+ if (
+ kwargs is not None
+ and "litellm_params" in kwargs
+ and "metadata" in kwargs["litellm_params"]
+ ):
+ _metadata: dict = kwargs["litellm_params"]["metadata"]
+ request_info = _add_key_name_and_team_to_alert(
+ request_info=request_info, metadata=_metadata
+ )
+
+ _deployment_latency_map = self._get_deployment_latencies_to_alert(
+ metadata=_metadata
+ )
+ if _deployment_latency_map is not None:
+ request_info += (
+ f"\nAvailable Deployment Latencies\n{_deployment_latency_map}"
+ )
+
+ if "alerting_metadata" in _metadata:
+ alerting_metadata = _metadata["alerting_metadata"]
+ await self.send_alert(
+ message=slow_message + request_info,
+ level="Low",
+ alert_type=AlertType.llm_too_slow,
+ alerting_metadata=alerting_metadata,
+ )
+
+ async def async_update_daily_reports(
+ self, deployment_metrics: DeploymentMetrics
+ ) -> int:
+ """
+ Store the perf by deployment in cache
+ - Number of failed requests per deployment
+ - Latency / output tokens per deployment
+
+ 'deployment_id:daily_metrics:failed_requests'
+ 'deployment_id:daily_metrics:latency_per_output_token'
+
+ Returns
+ int - count of metrics set (1 - if just latency, 2 - if failed + latency)
+ """
+
+ return_val = 0
+ try:
+ ## FAILED REQUESTS ##
+ if deployment_metrics.failed_request:
+ await self.internal_usage_cache.async_increment_cache(
+ key="{}:{}".format(
+ deployment_metrics.id,
+ SlackAlertingCacheKeys.failed_requests_key.value,
+ ),
+ value=1,
+ parent_otel_span=None, # no attached request, this is a background operation
+ )
+
+ return_val += 1
+
+ ## LATENCY ##
+ if deployment_metrics.latency_per_output_token is not None:
+ await self.internal_usage_cache.async_increment_cache(
+ key="{}:{}".format(
+ deployment_metrics.id, SlackAlertingCacheKeys.latency_key.value
+ ),
+ value=deployment_metrics.latency_per_output_token,
+ parent_otel_span=None, # no attached request, this is a background operation
+ )
+
+ return_val += 1
+
+ return return_val
+ except Exception:
+ return 0
+
+ async def send_daily_reports(self, router) -> bool: # noqa: PLR0915
+ """
+ Send a daily report on:
+ - Top 5 deployments with most failed requests
+ - Top 5 slowest deployments (normalized by latency/output tokens)
+
+ Get the value from redis cache (if available) or in-memory and send it
+
+ Cleanup:
+ - reset values in cache -> prevent memory leak
+
+ Returns:
+ True -> if successfuly sent
+ False -> if not sent
+ """
+
+ ids = router.get_model_ids()
+
+ # get keys
+ failed_request_keys = [
+ "{}:{}".format(id, SlackAlertingCacheKeys.failed_requests_key.value)
+ for id in ids
+ ]
+ latency_keys = [
+ "{}:{}".format(id, SlackAlertingCacheKeys.latency_key.value) for id in ids
+ ]
+
+ combined_metrics_keys = failed_request_keys + latency_keys # reduce cache calls
+
+ combined_metrics_values = await self.internal_usage_cache.async_batch_get_cache(
+ keys=combined_metrics_keys
+ ) # [1, 2, None, ..]
+
+ if combined_metrics_values is None:
+ return False
+
+ all_none = True
+ for val in combined_metrics_values:
+ if val is not None and val > 0:
+ all_none = False
+ break
+
+ if all_none:
+ return False
+
+ failed_request_values = combined_metrics_values[
+ : len(failed_request_keys)
+ ] # # [1, 2, None, ..]
+ latency_values = combined_metrics_values[len(failed_request_keys) :]
+
+ # find top 5 failed
+ ## Replace None values with a placeholder value (-1 in this case)
+ placeholder_value = 0
+ replaced_failed_values = [
+ value if value is not None else placeholder_value
+ for value in failed_request_values
+ ]
+
+ ## Get the indices of top 5 keys with the highest numerical values (ignoring None and 0 values)
+ top_5_failed = sorted(
+ range(len(replaced_failed_values)),
+ key=lambda i: replaced_failed_values[i],
+ reverse=True,
+ )[:5]
+ top_5_failed = [
+ index for index in top_5_failed if replaced_failed_values[index] > 0
+ ]
+
+ # find top 5 slowest
+ # Replace None values with a placeholder value (-1 in this case)
+ placeholder_value = 0
+ replaced_slowest_values = [
+ value if value is not None else placeholder_value
+ for value in latency_values
+ ]
+
+ # Get the indices of top 5 values with the highest numerical values (ignoring None and 0 values)
+ top_5_slowest = sorted(
+ range(len(replaced_slowest_values)),
+ key=lambda i: replaced_slowest_values[i],
+ reverse=True,
+ )[:5]
+ top_5_slowest = [
+ index for index in top_5_slowest if replaced_slowest_values[index] > 0
+ ]
+
+ # format alert -> return the litellm model name + api base
+ message = f"\n\nTime: `{time.time()}`s\nHere are today's key metrics šŸ“ˆ: \n\n"
+
+ message += "\n\n*ā—ļø Top Deployments with Most Failed Requests:*\n\n"
+ if not top_5_failed:
+ message += "\tNone\n"
+ for i in range(len(top_5_failed)):
+ key = failed_request_keys[top_5_failed[i]].split(":")[0]
+ _deployment = router.get_model_info(key)
+ if isinstance(_deployment, dict):
+ deployment_name = _deployment["litellm_params"].get("model", "")
+ else:
+ return False
+
+ api_base = litellm.get_api_base(
+ model=deployment_name,
+ optional_params=(
+ _deployment["litellm_params"] if _deployment is not None else {}
+ ),
+ )
+ if api_base is None:
+ api_base = ""
+ value = replaced_failed_values[top_5_failed[i]]
+ message += f"\t{i+1}. Deployment: `{deployment_name}`, Failed Requests: `{value}`, API Base: `{api_base}`\n"
+
+ message += "\n\n*šŸ˜… Top Slowest Deployments:*\n\n"
+ if not top_5_slowest:
+ message += "\tNone\n"
+ for i in range(len(top_5_slowest)):
+ key = latency_keys[top_5_slowest[i]].split(":")[0]
+ _deployment = router.get_model_info(key)
+ if _deployment is not None:
+ deployment_name = _deployment["litellm_params"].get("model", "")
+ else:
+ deployment_name = ""
+ api_base = litellm.get_api_base(
+ model=deployment_name,
+ optional_params=(
+ _deployment["litellm_params"] if _deployment is not None else {}
+ ),
+ )
+ value = round(replaced_slowest_values[top_5_slowest[i]], 3)
+ message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency per output token: `{value}s/token`, API Base: `{api_base}`\n\n"
+
+ # cache cleanup -> reset values to 0
+ latency_cache_keys = [(key, 0) for key in latency_keys]
+ failed_request_cache_keys = [(key, 0) for key in failed_request_keys]
+ combined_metrics_cache_keys = latency_cache_keys + failed_request_cache_keys
+ await self.internal_usage_cache.async_set_cache_pipeline(
+ cache_list=combined_metrics_cache_keys
+ )
+
+ message += f"\n\nNext Run is at: `{time.time() + self.alerting_args.daily_report_frequency}`s"
+
+ # send alert
+ await self.send_alert(
+ message=message,
+ level="Low",
+ alert_type=AlertType.daily_reports,
+ alerting_metadata={},
+ )
+
+ return True
+
+ async def response_taking_too_long(
+ self,
+ start_time: Optional[datetime.datetime] = None,
+ end_time: Optional[datetime.datetime] = None,
+ type: Literal["hanging_request", "slow_response"] = "hanging_request",
+ request_data: Optional[dict] = None,
+ ):
+ if self.alerting is None or self.alert_types is None:
+ return
+ model: str = ""
+ if request_data is not None:
+ model = request_data.get("model", "")
+ messages = request_data.get("messages", None)
+ if messages is None:
+ # if messages does not exist fallback to "input"
+ messages = request_data.get("input", None)
+
+ # try casting messages to str and get the first 100 characters, else mark as None
+ try:
+ messages = str(messages)
+ messages = messages[:100]
+ except Exception:
+ messages = ""
+
+ if (
+ litellm.turn_off_message_logging
+ or litellm.redact_messages_in_exceptions
+ ):
+ messages = (
+ "Message not logged. litellm.redact_messages_in_exceptions=True"
+ )
+ request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`"
+ else:
+ request_info = ""
+
+ if type == "hanging_request":
+ await asyncio.sleep(
+ self.alerting_threshold
+ ) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests
+ alerting_metadata: dict = {}
+ if await self._request_is_completed(request_data=request_data) is True:
+ return
+
+ if request_data is not None:
+ if request_data.get("deployment", None) is not None and isinstance(
+ request_data["deployment"], dict
+ ):
+ _api_base = litellm.get_api_base(
+ model=model,
+ optional_params=request_data["deployment"].get(
+ "litellm_params", {}
+ ),
+ )
+
+ if _api_base is None:
+ _api_base = ""
+
+ request_info += f"\nAPI Base: {_api_base}"
+ elif request_data.get("metadata", None) is not None and isinstance(
+ request_data["metadata"], dict
+ ):
+ # In hanging requests sometime it has not made it to the point where the deployment is passed to the `request_data``
+ # in that case we fallback to the api base set in the request metadata
+ _metadata: dict = request_data["metadata"]
+ _api_base = _metadata.get("api_base", "")
+
+ request_info = _add_key_name_and_team_to_alert(
+ request_info=request_info, metadata=_metadata
+ )
+
+ if _api_base is None:
+ _api_base = ""
+
+ if "alerting_metadata" in _metadata:
+ alerting_metadata = _metadata["alerting_metadata"]
+ request_info += f"\nAPI Base: `{_api_base}`"
+ # only alert hanging responses if they have not been marked as success
+ alerting_message = (
+ f"`Requests are hanging - {self.alerting_threshold}s+ request time`"
+ )
+
+ if "langfuse" in litellm.success_callback:
+ langfuse_url = await _add_langfuse_trace_id_to_alert(
+ request_data=request_data,
+ )
+
+ if langfuse_url is not None:
+ request_info += "\n🪢 Langfuse Trace: {}".format(langfuse_url)
+
+ # add deployment latencies to alert
+ _deployment_latency_map = self._get_deployment_latencies_to_alert(
+ metadata=request_data.get("metadata", {})
+ )
+ if _deployment_latency_map is not None:
+ request_info += f"\nDeployment Latencies\n{_deployment_latency_map}"
+
+ await self.send_alert(
+ message=alerting_message + request_info,
+ level="Medium",
+ alert_type=AlertType.llm_requests_hanging,
+ alerting_metadata=alerting_metadata,
+ )
+
+ async def failed_tracking_alert(self, error_message: str, failing_model: str):
+ """
+ Raise alert when tracking failed for specific model
+
+ Args:
+ error_message (str): Error message
+ failing_model (str): Model that failed tracking
+ """
+ if self.alerting is None or self.alert_types is None:
+ # do nothing if alerting is not switched on
+ return
+ if "failed_tracking_spend" not in self.alert_types:
+ return
+
+ _cache: DualCache = self.internal_usage_cache
+ message = "Failed Tracking Cost for " + error_message
+ _cache_key = "budget_alerts:failed_tracking:{}".format(failing_model)
+ result = await _cache.async_get_cache(key=_cache_key)
+ if result is None:
+ await self.send_alert(
+ message=message,
+ level="High",
+ alert_type=AlertType.failed_tracking_spend,
+ alerting_metadata={},
+ )
+ await _cache.async_set_cache(
+ key=_cache_key,
+ value="SENT",
+ ttl=self.alerting_args.budget_alert_ttl,
+ )
+
+ async def budget_alerts( # noqa: PLR0915
+ self,
+ type: Literal[
+ "token_budget",
+ "soft_budget",
+ "user_budget",
+ "team_budget",
+ "proxy_budget",
+ "projected_limit_exceeded",
+ ],
+ user_info: CallInfo,
+ ):
+ ## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727
+ # - Alert once within 24hr period
+ # - Cache this information
+ # - Don't re-alert, if alert already sent
+ _cache: DualCache = self.internal_usage_cache
+
+ if self.alerting is None or self.alert_types is None:
+ # do nothing if alerting is not switched on
+ return
+ if "budget_alerts" not in self.alert_types:
+ return
+ _id: Optional[str] = "default_id" # used for caching
+ user_info_json = user_info.model_dump(exclude_none=True)
+ user_info_str = self._get_user_info_str(user_info)
+ event: Optional[
+ Literal[
+ "budget_crossed",
+ "threshold_crossed",
+ "projected_limit_exceeded",
+ "soft_budget_crossed",
+ ]
+ ] = None
+ event_group: Optional[
+ Literal["internal_user", "team", "key", "proxy", "customer"]
+ ] = None
+ event_message: str = ""
+ webhook_event: Optional[WebhookEvent] = None
+ if type == "proxy_budget":
+ event_group = "proxy"
+ event_message += "Proxy Budget: "
+ elif type == "soft_budget":
+ event_group = "proxy"
+ event_message += "Soft Budget Crossed: "
+ elif type == "user_budget":
+ event_group = "internal_user"
+ event_message += "User Budget: "
+ _id = user_info.user_id or _id
+ elif type == "team_budget":
+ event_group = "team"
+ event_message += "Team Budget: "
+ _id = user_info.team_id or _id
+ elif type == "token_budget":
+ event_group = "key"
+ event_message += "Key Budget: "
+ _id = user_info.token
+ elif type == "projected_limit_exceeded":
+ event_group = "key"
+ event_message += "Key Budget: Projected Limit Exceeded"
+ event = "projected_limit_exceeded"
+ _id = user_info.token
+
+ # percent of max_budget left to spend
+ if user_info.max_budget is None and user_info.soft_budget is None:
+ return
+ percent_left: float = 0
+ if user_info.max_budget is not None:
+ if user_info.max_budget > 0:
+ percent_left = (
+ user_info.max_budget - user_info.spend
+ ) / user_info.max_budget
+
+ # check if crossed budget
+ if user_info.max_budget is not None:
+ if user_info.spend >= user_info.max_budget:
+ event = "budget_crossed"
+ event_message += (
+ f"Budget Crossed\n Total Budget:`{user_info.max_budget}`"
+ )
+ elif percent_left <= 0.05:
+ event = "threshold_crossed"
+ event_message += "5% Threshold Crossed "
+ elif percent_left <= 0.15:
+ event = "threshold_crossed"
+ event_message += "15% Threshold Crossed"
+ elif user_info.soft_budget is not None:
+ if user_info.spend >= user_info.soft_budget:
+ event = "soft_budget_crossed"
+ if event is not None and event_group is not None:
+ _cache_key = "budget_alerts:{}:{}".format(event, _id)
+ result = await _cache.async_get_cache(key=_cache_key)
+ if result is None:
+ webhook_event = WebhookEvent(
+ event=event,
+ event_group=event_group,
+ event_message=event_message,
+ **user_info_json,
+ )
+ await self.send_alert(
+ message=event_message + "\n\n" + user_info_str,
+ level="High",
+ alert_type=AlertType.budget_alerts,
+ user_info=webhook_event,
+ alerting_metadata={},
+ )
+ await _cache.async_set_cache(
+ key=_cache_key,
+ value="SENT",
+ ttl=self.alerting_args.budget_alert_ttl,
+ )
+
+ return
+ return
+
+ def _get_user_info_str(self, user_info: CallInfo) -> str:
+ """
+ Create a standard message for a budget alert
+ """
+ _all_fields_as_dict = user_info.model_dump(exclude_none=True)
+ _all_fields_as_dict.pop("token")
+ msg = ""
+ for k, v in _all_fields_as_dict.items():
+ msg += f"*{k}:* `{v}`\n"
+
+ return msg
+
+ async def customer_spend_alert(
+ self,
+ token: Optional[str],
+ key_alias: Optional[str],
+ end_user_id: Optional[str],
+ response_cost: Optional[float],
+ max_budget: Optional[float],
+ ):
+ if (
+ self.alerting is not None
+ and "webhook" in self.alerting
+ and end_user_id is not None
+ and token is not None
+ and response_cost is not None
+ ):
+ # log customer spend
+ event = WebhookEvent(
+ spend=response_cost,
+ max_budget=max_budget,
+ token=token,
+ customer_id=end_user_id,
+ user_id=None,
+ team_id=None,
+ user_email=None,
+ key_alias=key_alias,
+ projected_exceeded_date=None,
+ projected_spend=None,
+ event="spend_tracked",
+ event_group="customer",
+ event_message="Customer spend tracked. Customer={}, spend={}".format(
+ end_user_id, response_cost
+ ),
+ )
+
+ await self.send_webhook_alert(webhook_event=event)
+
+ def _count_outage_alerts(self, alerts: List[int]) -> str:
+ """
+ Parameters:
+ - alerts: List[int] -> list of error codes (either 408 or 500+)
+
+ Returns:
+ - str -> formatted string. This is an alert message, giving a human-friendly description of the errors.
+ """
+ error_breakdown = {"Timeout Errors": 0, "API Errors": 0, "Unknown Errors": 0}
+ for alert in alerts:
+ if alert == 408:
+ error_breakdown["Timeout Errors"] += 1
+ elif alert >= 500:
+ error_breakdown["API Errors"] += 1
+ else:
+ error_breakdown["Unknown Errors"] += 1
+
+ error_msg = ""
+ for key, value in error_breakdown.items():
+ if value > 0:
+ error_msg += "\n{}: {}\n".format(key, value)
+
+ return error_msg
+
+ def _outage_alert_msg_factory(
+ self,
+ alert_type: Literal["Major", "Minor"],
+ key: Literal["Model", "Region"],
+ key_val: str,
+ provider: str,
+ api_base: Optional[str],
+ outage_value: BaseOutageModel,
+ ) -> str:
+ """Format an alert message for slack"""
+ headers = {f"{key} Name": key_val, "Provider": provider}
+ if api_base is not None:
+ headers["API Base"] = api_base # type: ignore
+
+ headers_str = "\n"
+ for k, v in headers.items():
+ headers_str += f"*{k}:* `{v}`\n"
+ return f"""\n\n
+*āš ļø {alert_type} Service Outage*
+
+{headers_str}
+
+*Errors:*
+{self._count_outage_alerts(alerts=outage_value["alerts"])}
+
+*Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n
+"""
+
+ async def region_outage_alerts(
+ self,
+ exception: APIError,
+ deployment_id: str,
+ ) -> None:
+ """
+ Send slack alert if specific provider region is having an outage.
+
+ Track for 408 (Timeout) and >=500 Error codes
+ """
+ ## CREATE (PROVIDER+REGION) ID ##
+ if self.llm_router is None:
+ return
+
+ deployment = self.llm_router.get_deployment(model_id=deployment_id)
+
+ if deployment is None:
+ return
+
+ model = deployment.litellm_params.model
+ ### GET PROVIDER ###
+ provider = deployment.litellm_params.custom_llm_provider
+ if provider is None:
+ model, provider, _, _ = litellm.get_llm_provider(model=model)
+
+ ### GET REGION ###
+ region_name = deployment.litellm_params.region_name
+ if region_name is None:
+ region_name = litellm.utils._get_model_region(
+ custom_llm_provider=provider, litellm_params=deployment.litellm_params
+ )
+
+ if region_name is None:
+ return
+
+ ### UNIQUE CACHE KEY ###
+ cache_key = provider + region_name
+
+ outage_value: Optional[ProviderRegionOutageModel] = (
+ await self.internal_usage_cache.async_get_cache(key=cache_key)
+ )
+
+ if (
+ getattr(exception, "status_code", None) is None
+ or (
+ exception.status_code != 408 # type: ignore
+ and exception.status_code < 500 # type: ignore
+ )
+ or self.llm_router is None
+ ):
+ return
+
+ if outage_value is None:
+ _deployment_set = set()
+ _deployment_set.add(deployment_id)
+ outage_value = ProviderRegionOutageModel(
+ provider_region_id=cache_key,
+ alerts=[exception.status_code], # type: ignore
+ minor_alert_sent=False,
+ major_alert_sent=False,
+ last_updated_at=time.time(),
+ deployment_ids=_deployment_set,
+ )
+
+ ## add to cache ##
+ await self.internal_usage_cache.async_set_cache(
+ key=cache_key,
+ value=outage_value,
+ ttl=self.alerting_args.region_outage_alert_ttl,
+ )
+ return
+
+ if len(outage_value["alerts"]) < self.alerting_args.max_outage_alert_list_size:
+ outage_value["alerts"].append(exception.status_code) # type: ignore
+ else: # prevent memory leaks
+ pass
+ _deployment_set = outage_value["deployment_ids"]
+ _deployment_set.add(deployment_id)
+ outage_value["deployment_ids"] = _deployment_set
+ outage_value["last_updated_at"] = time.time()
+
+ ## MINOR OUTAGE ALERT SENT ##
+ if (
+ outage_value["minor_alert_sent"] is False
+ and len(outage_value["alerts"])
+ >= self.alerting_args.minor_outage_alert_threshold
+ and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment
+ ):
+ msg = self._outage_alert_msg_factory(
+ alert_type="Minor",
+ key="Region",
+ key_val=region_name,
+ api_base=None,
+ outage_value=outage_value,
+ provider=provider,
+ )
+ # send minor alert
+ await self.send_alert(
+ message=msg,
+ level="Medium",
+ alert_type=AlertType.outage_alerts,
+ alerting_metadata={},
+ )
+ # set to true
+ outage_value["minor_alert_sent"] = True
+
+ ## MAJOR OUTAGE ALERT SENT ##
+ elif (
+ outage_value["major_alert_sent"] is False
+ and len(outage_value["alerts"])
+ >= self.alerting_args.major_outage_alert_threshold
+ and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment
+ ):
+ msg = self._outage_alert_msg_factory(
+ alert_type="Major",
+ key="Region",
+ key_val=region_name,
+ api_base=None,
+ outage_value=outage_value,
+ provider=provider,
+ )
+
+ # send minor alert
+ await self.send_alert(
+ message=msg,
+ level="High",
+ alert_type=AlertType.outage_alerts,
+ alerting_metadata={},
+ )
+ # set to true
+ outage_value["major_alert_sent"] = True
+
+ ## update cache ##
+ await self.internal_usage_cache.async_set_cache(
+ key=cache_key, value=outage_value
+ )
+
+ async def outage_alerts(
+ self,
+ exception: APIError,
+ deployment_id: str,
+ ) -> None:
+ """
+ Send slack alert if model is badly configured / having an outage (408, 401, 429, >=500).
+
+ key = model_id
+
+ value = {
+ - model_id
+ - threshold
+ - alerts []
+ }
+
+ ttl = 1hr
+ max_alerts_size = 10
+ """
+ try:
+ outage_value: Optional[OutageModel] = await self.internal_usage_cache.async_get_cache(key=deployment_id) # type: ignore
+ if (
+ getattr(exception, "status_code", None) is None
+ or (
+ exception.status_code != 408 # type: ignore
+ and exception.status_code < 500 # type: ignore
+ )
+ or self.llm_router is None
+ ):
+ return
+
+ ### EXTRACT MODEL DETAILS ###
+ deployment = self.llm_router.get_deployment(model_id=deployment_id)
+ if deployment is None:
+ return
+
+ model = deployment.litellm_params.model
+ provider = deployment.litellm_params.custom_llm_provider
+ if provider is None:
+ try:
+ model, provider, _, _ = litellm.get_llm_provider(model=model)
+ except Exception:
+ provider = ""
+ api_base = litellm.get_api_base(
+ model=model, optional_params=deployment.litellm_params
+ )
+
+ if outage_value is None:
+ outage_value = OutageModel(
+ model_id=deployment_id,
+ alerts=[exception.status_code], # type: ignore
+ minor_alert_sent=False,
+ major_alert_sent=False,
+ last_updated_at=time.time(),
+ )
+
+ ## add to cache ##
+ await self.internal_usage_cache.async_set_cache(
+ key=deployment_id,
+ value=outage_value,
+ ttl=self.alerting_args.outage_alert_ttl,
+ )
+ return
+
+ if (
+ len(outage_value["alerts"])
+ < self.alerting_args.max_outage_alert_list_size
+ ):
+ outage_value["alerts"].append(exception.status_code) # type: ignore
+ else: # prevent memory leaks
+ pass
+
+ outage_value["last_updated_at"] = time.time()
+
+ ## MINOR OUTAGE ALERT SENT ##
+ if (
+ outage_value["minor_alert_sent"] is False
+ and len(outage_value["alerts"])
+ >= self.alerting_args.minor_outage_alert_threshold
+ ):
+ msg = self._outage_alert_msg_factory(
+ alert_type="Minor",
+ key="Model",
+ key_val=model,
+ api_base=api_base,
+ outage_value=outage_value,
+ provider=provider,
+ )
+ # send minor alert
+ await self.send_alert(
+ message=msg,
+ level="Medium",
+ alert_type=AlertType.outage_alerts,
+ alerting_metadata={},
+ )
+ # set to true
+ outage_value["minor_alert_sent"] = True
+ elif (
+ outage_value["major_alert_sent"] is False
+ and len(outage_value["alerts"])
+ >= self.alerting_args.major_outage_alert_threshold
+ ):
+ msg = self._outage_alert_msg_factory(
+ alert_type="Major",
+ key="Model",
+ key_val=model,
+ api_base=api_base,
+ outage_value=outage_value,
+ provider=provider,
+ )
+ # send minor alert
+ await self.send_alert(
+ message=msg,
+ level="High",
+ alert_type=AlertType.outage_alerts,
+ alerting_metadata={},
+ )
+ # set to true
+ outage_value["major_alert_sent"] = True
+
+ ## update cache ##
+ await self.internal_usage_cache.async_set_cache(
+ key=deployment_id, value=outage_value
+ )
+ except Exception:
+ pass
+
+ async def model_added_alert(
+ self, model_name: str, litellm_model_name: str, passed_model_info: Any
+ ):
+ base_model_from_user = getattr(passed_model_info, "base_model", None)
+ model_info = {}
+ base_model = ""
+ if base_model_from_user is not None:
+ model_info = litellm.model_cost.get(base_model_from_user, {})
+ base_model = f"Base Model: `{base_model_from_user}`\n"
+ else:
+ model_info = litellm.model_cost.get(litellm_model_name, {})
+ model_info_str = ""
+ for k, v in model_info.items():
+ if k == "input_cost_per_token" or k == "output_cost_per_token":
+ # when converting to string it should not be 1.63e-06
+ v = "{:.8f}".format(v)
+
+ model_info_str += f"{k}: {v}\n"
+
+ message = f"""
+*šŸš… New Model Added*
+Model Name: `{model_name}`
+{base_model}
+
+Usage OpenAI Python SDK:
+```
+import openai
+client = openai.OpenAI(
+ api_key="your_api_key",
+ base_url={os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")}
+)
+
+response = client.chat.completions.create(
+ model="{model_name}", # model to send to the proxy
+ messages = [
+ {{
+ "role": "user",
+ "content": "this is a test request, write a short poem"
+ }}
+ ]
+)
+```
+
+Model Info:
+```
+{model_info_str}
+```
+"""
+
+ alert_val = self.send_alert(
+ message=message,
+ level="Low",
+ alert_type=AlertType.new_model_added,
+ alerting_metadata={},
+ )
+
+ if alert_val is not None and asyncio.iscoroutine(alert_val):
+ await alert_val
+
+ async def model_removed_alert(self, model_name: str):
+ pass
+
+ async def send_webhook_alert(self, webhook_event: WebhookEvent) -> bool:
+ """
+ Sends structured alert to webhook, if set.
+
+ Currently only implemented for budget alerts
+
+ Returns -> True if sent, False if not.
+
+ Raises Exception
+ - if WEBHOOK_URL is not set
+ """
+
+ webhook_url = os.getenv("WEBHOOK_URL", None)
+ if webhook_url is None:
+ raise Exception("Missing webhook_url from environment")
+
+ payload = webhook_event.model_dump_json()
+ headers = {"Content-type": "application/json"}
+
+ response = await self.async_http_handler.post(
+ url=webhook_url,
+ headers=headers,
+ data=payload,
+ )
+ if response.status_code == 200:
+ return True
+ else:
+ print("Error sending webhook alert. Error=", response.text) # noqa
+
+ return False
+
+ async def _check_if_using_premium_email_feature(
+ self,
+ premium_user: bool,
+ email_logo_url: Optional[str] = None,
+ email_support_contact: Optional[str] = None,
+ ):
+ from litellm.proxy.proxy_server import CommonProxyErrors, premium_user
+
+ if premium_user is not True:
+ if email_logo_url is not None or email_support_contact is not None:
+ raise ValueError(
+ f"Trying to Customize Email Alerting\n {CommonProxyErrors.not_premium_user.value}"
+ )
+ return
+
+ async def send_key_created_or_user_invited_email(
+ self, webhook_event: WebhookEvent
+ ) -> bool:
+ try:
+ from litellm.proxy.utils import send_email
+
+ if self.alerting is None or "email" not in self.alerting:
+ # do nothing if user does not want email alerts
+ verbose_proxy_logger.error(
+ "Error sending email alert - 'email' not in self.alerting %s",
+ self.alerting,
+ )
+ return False
+ from litellm.proxy.proxy_server import premium_user, prisma_client
+
+ email_logo_url = os.getenv(
+ "SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None)
+ )
+ email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None)
+ await self._check_if_using_premium_email_feature(
+ premium_user, email_logo_url, email_support_contact
+ )
+ if email_logo_url is None:
+ email_logo_url = LITELLM_LOGO_URL
+ if email_support_contact is None:
+ email_support_contact = LITELLM_SUPPORT_CONTACT
+
+ event_name = webhook_event.event_message
+ recipient_email = webhook_event.user_email
+ recipient_user_id = webhook_event.user_id
+ if (
+ recipient_email is None
+ and recipient_user_id is not None
+ and prisma_client is not None
+ ):
+ user_row = await prisma_client.db.litellm_usertable.find_unique(
+ where={"user_id": recipient_user_id}
+ )
+
+ if user_row is not None:
+ recipient_email = user_row.user_email
+
+ key_token = webhook_event.token
+ key_budget = webhook_event.max_budget
+ base_url = os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")
+
+ email_html_content = "Alert from LiteLLM Server"
+ if recipient_email is None:
+ verbose_proxy_logger.error(
+ "Trying to send email alert to no recipient",
+ extra=webhook_event.dict(),
+ )
+
+ if webhook_event.event == "key_created":
+ email_html_content = KEY_CREATED_EMAIL_TEMPLATE.format(
+ email_logo_url=email_logo_url,
+ recipient_email=recipient_email,
+ key_budget=key_budget,
+ key_token=key_token,
+ base_url=base_url,
+ email_support_contact=email_support_contact,
+ )
+ elif webhook_event.event == "internal_user_created":
+ # GET TEAM NAME
+ team_id = webhook_event.team_id
+ team_name = "Default Team"
+ if team_id is not None and prisma_client is not None:
+ team_row = await prisma_client.db.litellm_teamtable.find_unique(
+ where={"team_id": team_id}
+ )
+ if team_row is not None:
+ team_name = team_row.team_alias or "-"
+ email_html_content = USER_INVITED_EMAIL_TEMPLATE.format(
+ email_logo_url=email_logo_url,
+ recipient_email=recipient_email,
+ team_name=team_name,
+ base_url=base_url,
+ email_support_contact=email_support_contact,
+ )
+ else:
+ verbose_proxy_logger.error(
+ "Trying to send email alert on unknown webhook event",
+ extra=webhook_event.model_dump(),
+ )
+
+ webhook_event.model_dump_json()
+ email_event = {
+ "to": recipient_email,
+ "subject": f"LiteLLM: {event_name}",
+ "html": email_html_content,
+ }
+
+ await send_email(
+ receiver_email=email_event["to"],
+ subject=email_event["subject"],
+ html=email_event["html"],
+ )
+
+ return True
+
+ except Exception as e:
+ verbose_proxy_logger.error("Error sending email alert %s", str(e))
+ return False
+
+ async def send_email_alert_using_smtp(
+ self, webhook_event: WebhookEvent, alert_type: str
+ ) -> bool:
+ """
+ Sends structured Email alert to an SMTP server
+
+ Currently only implemented for budget alerts
+
+ Returns -> True if sent, False if not.
+ """
+ from litellm.proxy.proxy_server import premium_user
+ from litellm.proxy.utils import send_email
+
+ email_logo_url = os.getenv(
+ "SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None)
+ )
+ email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None)
+ await self._check_if_using_premium_email_feature(
+ premium_user, email_logo_url, email_support_contact
+ )
+
+ if email_logo_url is None:
+ email_logo_url = LITELLM_LOGO_URL
+ if email_support_contact is None:
+ email_support_contact = LITELLM_SUPPORT_CONTACT
+
+ event_name = webhook_event.event_message
+ recipient_email = webhook_event.user_email
+ user_name = webhook_event.user_id
+ max_budget = webhook_event.max_budget
+ email_html_content = "Alert from LiteLLM Server"
+ if recipient_email is None:
+ verbose_proxy_logger.error(
+ "Trying to send email alert to no recipient", extra=webhook_event.dict()
+ )
+
+ if webhook_event.event == "budget_crossed":
+ email_html_content = f"""
+ <img src="{email_logo_url}" alt="LiteLLM Logo" width="150" height="50" />
+
+ <p> Hi {user_name}, <br/>
+
+ Your LLM API usage this month has reached your account's <b> monthly budget of ${max_budget} </b> <br /> <br />
+
+ API requests will be rejected until either (a) you increase your monthly budget or (b) your monthly usage resets at the beginning of the next calendar month. <br /> <br />
+
+ If you have any questions, please send an email to {email_support_contact} <br /> <br />
+
+ Best, <br />
+ The LiteLLM team <br />
+ """
+
+ webhook_event.model_dump_json()
+ email_event = {
+ "to": recipient_email,
+ "subject": f"LiteLLM: {event_name}",
+ "html": email_html_content,
+ }
+
+ await send_email(
+ receiver_email=email_event["to"],
+ subject=email_event["subject"],
+ html=email_event["html"],
+ )
+ if webhook_event.event_group == "team":
+ from litellm.integrations.email_alerting import send_team_budget_alert
+
+ await send_team_budget_alert(webhook_event=webhook_event)
+
+ return False
+
+ async def send_alert(
+ self,
+ message: str,
+ level: Literal["Low", "Medium", "High"],
+ alert_type: AlertType,
+ alerting_metadata: dict,
+ user_info: Optional[WebhookEvent] = None,
+ **kwargs,
+ ):
+ """
+ Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298
+
+ - Responses taking too long
+ - Requests are hanging
+ - Calls are failing
+ - DB Read/Writes are failing
+ - Proxy Close to max budget
+ - Key Close to max budget
+
+ Parameters:
+ level: str - Low|Medium|High - if calls might fail (Medium) or are failing (High); Currently, no alerts would be 'Low'.
+ message: str - what is the alert about
+ """
+ if self.alerting is None:
+ return
+
+ if (
+ "webhook" in self.alerting
+ and alert_type == "budget_alerts"
+ and user_info is not None
+ ):
+ await self.send_webhook_alert(webhook_event=user_info)
+
+ if (
+ "email" in self.alerting
+ and alert_type == "budget_alerts"
+ and user_info is not None
+ ):
+ # only send budget alerts over Email
+ await self.send_email_alert_using_smtp(
+ webhook_event=user_info, alert_type=alert_type
+ )
+
+ if "slack" not in self.alerting:
+ return
+ if alert_type not in self.alert_types:
+ return
+
+ from datetime import datetime
+
+ # Get the current timestamp
+ current_time = datetime.now().strftime("%H:%M:%S")
+ _proxy_base_url = os.getenv("PROXY_BASE_URL", None)
+ if alert_type == "daily_reports" or alert_type == "new_model_added":
+ formatted_message = message
+ else:
+ formatted_message = (
+ f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}"
+ )
+
+ if kwargs:
+ for key, value in kwargs.items():
+ formatted_message += f"\n\n{key}: `{value}`\n\n"
+ if alerting_metadata:
+ for key, value in alerting_metadata.items():
+ formatted_message += f"\n\n*Alerting Metadata*: \n{key}: `{value}`\n\n"
+ if _proxy_base_url is not None:
+ formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`"
+
+ # check if we find the slack webhook url in self.alert_to_webhook_url
+ if (
+ self.alert_to_webhook_url is not None
+ and alert_type in self.alert_to_webhook_url
+ ):
+ slack_webhook_url: Optional[Union[str, List[str]]] = (
+ self.alert_to_webhook_url[alert_type]
+ )
+ elif self.default_webhook_url is not None:
+ slack_webhook_url = self.default_webhook_url
+ else:
+ slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL", None)
+
+ if slack_webhook_url is None:
+ raise ValueError("Missing SLACK_WEBHOOK_URL from environment")
+ payload = {"text": formatted_message}
+ headers = {"Content-type": "application/json"}
+
+ if isinstance(slack_webhook_url, list):
+ for url in slack_webhook_url:
+ self.log_queue.append(
+ {
+ "url": url,
+ "headers": headers,
+ "payload": payload,
+ "alert_type": alert_type,
+ }
+ )
+ else:
+ self.log_queue.append(
+ {
+ "url": slack_webhook_url,
+ "headers": headers,
+ "payload": payload,
+ "alert_type": alert_type,
+ }
+ )
+
+ if len(self.log_queue) >= self.batch_size:
+ await self.flush_queue()
+
+ async def async_send_batch(self):
+ if not self.log_queue:
+ return
+
+ squashed_queue = squash_payloads(self.log_queue)
+ tasks = [
+ send_to_webhook(
+ slackAlertingInstance=self, item=item["item"], count=item["count"]
+ )
+ for item in squashed_queue.values()
+ ]
+ await asyncio.gather(*tasks)
+ self.log_queue.clear()
+
+ async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
+ """Log deployment latency"""
+ try:
+ if "daily_reports" in self.alert_types:
+ litellm_params = kwargs.get("litellm_params", {}) or {}
+ model_info = litellm_params.get("model_info", {}) or {}
+ model_id = model_info.get("id", "") or ""
+ response_s: timedelta = end_time - start_time
+
+ final_value = response_s
+
+ if isinstance(response_obj, litellm.ModelResponse) and (
+ hasattr(response_obj, "usage")
+ and response_obj.usage is not None # type: ignore
+ and hasattr(response_obj.usage, "completion_tokens") # type: ignore
+ ):
+ completion_tokens = response_obj.usage.completion_tokens # type: ignore
+ if completion_tokens is not None and completion_tokens > 0:
+ final_value = float(
+ response_s.total_seconds() / completion_tokens
+ )
+ if isinstance(final_value, timedelta):
+ final_value = final_value.total_seconds()
+
+ await self.async_update_daily_reports(
+ DeploymentMetrics(
+ id=model_id,
+ failed_request=False,
+ latency_per_output_token=final_value,
+ updated_at=litellm.utils.get_utc_datetime(),
+ )
+ )
+ except Exception as e:
+ verbose_proxy_logger.error(
+ f"[Non-Blocking Error] Slack Alerting: Got error in logging LLM deployment latency: {str(e)}"
+ )
+ pass
+
+ async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
+ """Log failure + deployment latency"""
+ _litellm_params = kwargs.get("litellm_params", {})
+ _model_info = _litellm_params.get("model_info", {}) or {}
+ model_id = _model_info.get("id", "")
+ try:
+ if "daily_reports" in self.alert_types:
+ try:
+ await self.async_update_daily_reports(
+ DeploymentMetrics(
+ id=model_id,
+ failed_request=True,
+ latency_per_output_token=None,
+ updated_at=litellm.utils.get_utc_datetime(),
+ )
+ )
+ except Exception as e:
+ verbose_logger.debug(f"Exception raises -{str(e)}")
+
+ if isinstance(kwargs.get("exception", ""), APIError):
+ if "outage_alerts" in self.alert_types:
+ await self.outage_alerts(
+ exception=kwargs["exception"],
+ deployment_id=model_id,
+ )
+
+ if "region_outage_alerts" in self.alert_types:
+ await self.region_outage_alerts(
+ exception=kwargs["exception"], deployment_id=model_id
+ )
+ except Exception:
+ pass
+
+ async def _run_scheduler_helper(self, llm_router) -> bool:
+ """
+ Returns:
+ - True -> report sent
+ - False -> report not sent
+ """
+ report_sent_bool = False
+
+ report_sent = await self.internal_usage_cache.async_get_cache(
+ key=SlackAlertingCacheKeys.report_sent_key.value,
+ parent_otel_span=None,
+ ) # None | float
+
+ current_time = time.time()
+
+ if report_sent is None:
+ await self.internal_usage_cache.async_set_cache(
+ key=SlackAlertingCacheKeys.report_sent_key.value,
+ value=current_time,
+ )
+ elif isinstance(report_sent, float):
+ # Check if current time - interval >= time last sent
+ interval_seconds = self.alerting_args.daily_report_frequency
+
+ if current_time - report_sent >= interval_seconds:
+ # Sneak in the reporting logic here
+ await self.send_daily_reports(router=llm_router)
+ # Also, don't forget to update the report_sent time after sending the report!
+ await self.internal_usage_cache.async_set_cache(
+ key=SlackAlertingCacheKeys.report_sent_key.value,
+ value=current_time,
+ )
+ report_sent_bool = True
+
+ return report_sent_bool
+
+ async def _run_scheduled_daily_report(self, llm_router: Optional[Any] = None):
+ """
+ If 'daily_reports' enabled
+
+ Ping redis cache every 5 minutes to check if we should send the report
+
+ If yes -> call send_daily_report()
+ """
+ if llm_router is None or self.alert_types is None:
+ return
+
+ if "daily_reports" in self.alert_types:
+ while True:
+ await self._run_scheduler_helper(llm_router=llm_router)
+ interval = random.randint(
+ self.alerting_args.report_check_interval - 3,
+ self.alerting_args.report_check_interval + 3,
+ ) # shuffle to prevent collisions
+ await asyncio.sleep(interval)
+ return
+
+ async def send_weekly_spend_report(
+ self,
+ time_range: str = "7d",
+ ):
+ """
+ Send a spend report for a configurable time range.
+
+ Args:
+ time_range: A string specifying the time range for the report, e.g., "1d", "7d", "30d"
+ """
+ if self.alerting is None or "spend_reports" not in self.alert_types:
+ return
+
+ try:
+ from litellm.proxy.spend_tracking.spend_management_endpoints import (
+ _get_spend_report_for_time_range,
+ )
+
+ # Parse the time range
+ days = int(time_range[:-1])
+ if time_range[-1].lower() != "d":
+ raise ValueError("Time range must be specified in days, e.g., '7d'")
+
+ todays_date = datetime.datetime.now().date()
+ start_date = todays_date - datetime.timedelta(days=days)
+
+ _event_cache_key = f"weekly_spend_report_sent_{start_date.strftime('%Y-%m-%d')}_{todays_date.strftime('%Y-%m-%d')}"
+ if await self.internal_usage_cache.async_get_cache(key=_event_cache_key):
+ return
+
+ _resp = await _get_spend_report_for_time_range(
+ start_date=start_date.strftime("%Y-%m-%d"),
+ end_date=todays_date.strftime("%Y-%m-%d"),
+ )
+ if _resp is None or _resp == ([], []):
+ return
+
+ spend_per_team, spend_per_tag = _resp
+
+ _spend_message = f"*šŸ’ø Spend Report for `{start_date.strftime('%m-%d-%Y')} - {todays_date.strftime('%m-%d-%Y')}` ({days} days)*\n"
+
+ if spend_per_team is not None:
+ _spend_message += "\n*Team Spend Report:*\n"
+ for spend in spend_per_team:
+ _team_spend = round(float(spend["total_spend"]), 4)
+ _spend_message += (
+ f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n"
+ )
+
+ if spend_per_tag is not None:
+ _spend_message += "\n*Tag Spend Report:*\n"
+ for spend in spend_per_tag:
+ _tag_spend = round(float(spend["total_spend"]), 4)
+ _spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n"
+
+ await self.send_alert(
+ message=_spend_message,
+ level="Low",
+ alert_type=AlertType.spend_reports,
+ alerting_metadata={},
+ )
+
+ await self.internal_usage_cache.async_set_cache(
+ key=_event_cache_key,
+ value="SENT",
+ ttl=duration_in_seconds(time_range),
+ )
+
+ except ValueError as ve:
+ verbose_proxy_logger.error(f"Invalid time range format: {ve}")
+ except Exception as e:
+ verbose_proxy_logger.error(f"Error sending spend report: {e}")
+
+ async def send_monthly_spend_report(self):
+ """ """
+ try:
+ from calendar import monthrange
+
+ from litellm.proxy.spend_tracking.spend_management_endpoints import (
+ _get_spend_report_for_time_range,
+ )
+
+ todays_date = datetime.datetime.now().date()
+ first_day_of_month = todays_date.replace(day=1)
+ _, last_day_of_month = monthrange(todays_date.year, todays_date.month)
+ last_day_of_month = first_day_of_month + datetime.timedelta(
+ days=last_day_of_month - 1
+ )
+
+ _event_cache_key = f"monthly_spend_report_sent_{first_day_of_month.strftime('%Y-%m-%d')}_{last_day_of_month.strftime('%Y-%m-%d')}"
+ if await self.internal_usage_cache.async_get_cache(key=_event_cache_key):
+ return
+
+ _resp = await _get_spend_report_for_time_range(
+ start_date=first_day_of_month.strftime("%Y-%m-%d"),
+ end_date=last_day_of_month.strftime("%Y-%m-%d"),
+ )
+
+ if _resp is None or _resp == ([], []):
+ return
+
+ monthly_spend_per_team, monthly_spend_per_tag = _resp
+
+ _spend_message = f"*šŸ’ø Monthly Spend Report for `{first_day_of_month.strftime('%m-%d-%Y')} - {last_day_of_month.strftime('%m-%d-%Y')}` *\n"
+
+ if monthly_spend_per_team is not None:
+ _spend_message += "\n*Team Spend Report:*\n"
+ for spend in monthly_spend_per_team:
+ _team_spend = spend["total_spend"]
+ _team_spend = float(_team_spend)
+ # round to 4 decimal places
+ _team_spend = round(_team_spend, 4)
+ _spend_message += (
+ f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n"
+ )
+
+ if monthly_spend_per_tag is not None:
+ _spend_message += "\n*Tag Spend Report:*\n"
+ for spend in monthly_spend_per_tag:
+ _tag_spend = spend["total_spend"]
+ _tag_spend = float(_tag_spend)
+ # round to 4 decimal places
+ _tag_spend = round(_tag_spend, 4)
+ _spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n"
+
+ await self.send_alert(
+ message=_spend_message,
+ level="Low",
+ alert_type=AlertType.spend_reports,
+ alerting_metadata={},
+ )
+
+ await self.internal_usage_cache.async_set_cache(
+ key=_event_cache_key,
+ value="SENT",
+ ttl=(30 * 24 * 60 * 60), # 1 month
+ )
+
+ except Exception as e:
+ verbose_proxy_logger.exception("Error sending weekly spend report %s", e)
+
+ async def send_fallback_stats_from_prometheus(self):
+ """
+ Helper to send fallback statistics from prometheus server -> to slack
+
+ This runs once per day and sends an overview of all the fallback statistics
+ """
+ try:
+ from litellm.integrations.prometheus_helpers.prometheus_api import (
+ get_fallback_metric_from_prometheus,
+ )
+
+ # call prometheuslogger.
+ falllback_success_info_prometheus = (
+ await get_fallback_metric_from_prometheus()
+ )
+
+ fallback_message = (
+ f"*Fallback Statistics:*\n{falllback_success_info_prometheus}"
+ )
+
+ await self.send_alert(
+ message=fallback_message,
+ level="Low",
+ alert_type=AlertType.fallback_reports,
+ alerting_metadata={},
+ )
+
+ except Exception as e:
+ verbose_proxy_logger.error("Error sending weekly spend report %s", e)
+
+ pass
+
+ async def send_virtual_key_event_slack(
+ self,
+ key_event: VirtualKeyEvent,
+ alert_type: AlertType,
+ event_name: str,
+ ):
+ """
+ Handles sending Virtual Key related alerts
+
+ Example:
+ - New Virtual Key Created
+ - Internal User Updated
+ - Team Created, Updated, Deleted
+ """
+ try:
+
+ message = f"`{event_name}`\n"
+
+ key_event_dict = key_event.model_dump()
+
+ # Add Created by information first
+ message += "*Action Done by:*\n"
+ for key, value in key_event_dict.items():
+ if "created_by" in key:
+ message += f"{key}: `{value}`\n"
+
+ # Add args sent to function in the alert
+ message += "\n*Arguments passed:*\n"
+ request_kwargs = key_event.request_kwargs
+ for key, value in request_kwargs.items():
+ if key == "user_api_key_dict":
+ continue
+ message += f"{key}: `{value}`\n"
+
+ await self.send_alert(
+ message=message,
+ level="High",
+ alert_type=alert_type,
+ alerting_metadata={},
+ )
+
+ except Exception as e:
+ verbose_proxy_logger.error(
+ "Error sending send_virtual_key_event_slack %s", e
+ )
+
+ return
+
+ async def _request_is_completed(self, request_data: Optional[dict]) -> bool:
+ """
+ Returns True if the request is completed - either as a success or failure
+ """
+ if request_data is None:
+ return False
+
+ if (
+ request_data.get("litellm_status", "") != "success"
+ and request_data.get("litellm_status", "") != "fail"
+ ):
+ ## CHECK IF CACHE IS UPDATED
+ litellm_call_id = request_data.get("litellm_call_id", "")
+ status: Optional[str] = await self.internal_usage_cache.async_get_cache(
+ key="request_status:{}".format(litellm_call_id), local_only=True
+ )
+ if status is not None and (status == "success" or status == "fail"):
+ return True
+ return False
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/utils.py b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/utils.py
new file mode 100644
index 00000000..0dc8bae5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/SlackAlerting/utils.py
@@ -0,0 +1,92 @@
+"""
+Utils used for slack alerting
+"""
+
+import asyncio
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+from litellm.proxy._types import AlertType
+from litellm.secret_managers.main import get_secret
+
+if TYPE_CHECKING:
+ from litellm.litellm_core_utils.litellm_logging import Logging as _Logging
+
+ Logging = _Logging
+else:
+ Logging = Any
+
+
+def process_slack_alerting_variables(
+ alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]]
+) -> Optional[Dict[AlertType, Union[List[str], str]]]:
+ """
+ process alert_to_webhook_url
+ - check if any urls are set as os.environ/SLACK_WEBHOOK_URL_1 read env var and set the correct value
+ """
+ if alert_to_webhook_url is None:
+ return None
+
+ for alert_type, webhook_urls in alert_to_webhook_url.items():
+ if isinstance(webhook_urls, list):
+ _webhook_values: List[str] = []
+ for webhook_url in webhook_urls:
+ if "os.environ/" in webhook_url:
+ _env_value = get_secret(secret_name=webhook_url)
+ if not isinstance(_env_value, str):
+ raise ValueError(
+ f"Invalid webhook url value for: {webhook_url}. Got type={type(_env_value)}"
+ )
+ _webhook_values.append(_env_value)
+ else:
+ _webhook_values.append(webhook_url)
+
+ alert_to_webhook_url[alert_type] = _webhook_values
+ else:
+ _webhook_value_str: str = webhook_urls
+ if "os.environ/" in webhook_urls:
+ _env_value = get_secret(secret_name=webhook_urls)
+ if not isinstance(_env_value, str):
+ raise ValueError(
+ f"Invalid webhook url value for: {webhook_urls}. Got type={type(_env_value)}"
+ )
+ _webhook_value_str = _env_value
+ else:
+ _webhook_value_str = webhook_urls
+
+ alert_to_webhook_url[alert_type] = _webhook_value_str
+
+ return alert_to_webhook_url
+
+
+async def _add_langfuse_trace_id_to_alert(
+ request_data: Optional[dict] = None,
+) -> Optional[str]:
+ """
+ Returns langfuse trace url
+
+ - check:
+ -> existing_trace_id
+ -> trace_id
+ -> litellm_call_id
+ """
+ # do nothing for now
+ if (
+ request_data is not None
+ and request_data.get("litellm_logging_obj", None) is not None
+ ):
+ trace_id: Optional[str] = None
+ litellm_logging_obj: Logging = request_data["litellm_logging_obj"]
+
+ for _ in range(3):
+ trace_id = litellm_logging_obj._get_trace_id(service_name="langfuse")
+ if trace_id is not None:
+ break
+ await asyncio.sleep(3) # wait 3s before retrying for trace id
+
+ _langfuse_object = litellm_logging_obj._get_callback_object(
+ service_name="langfuse"
+ )
+ if _langfuse_object is not None:
+ base_url = _langfuse_object.Langfuse.base_url
+ return f"{base_url}/trace/{trace_id}"
+ return None