#### What this does #### # Class for sending Slack Alerts # import asyncio import datetime import os import random import time from datetime import timedelta from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union from openai import APIError import litellm import litellm.litellm_core_utils import litellm.litellm_core_utils.litellm_logging import litellm.types from litellm._logging import verbose_logger, verbose_proxy_logger from litellm.caching.caching import DualCache from litellm.integrations.custom_batch_logger import CustomBatchLogger from litellm.litellm_core_utils.duration_parser import duration_in_seconds from litellm.litellm_core_utils.exception_mapping_utils import ( _add_key_name_and_team_to_alert, ) from litellm.llms.custom_httpx.http_handler import ( get_async_httpx_client, httpxSpecialProvider, ) from litellm.proxy._types import AlertType, CallInfo, VirtualKeyEvent, WebhookEvent from litellm.types.integrations.slack_alerting import * from ..email_templates.templates import * from .batching_handler import send_to_webhook, squash_payloads from .utils import _add_langfuse_trace_id_to_alert, process_slack_alerting_variables if TYPE_CHECKING: from litellm.router import Router as _Router Router = _Router else: Router = Any class SlackAlerting(CustomBatchLogger): """ Class for sending Slack Alerts """ # Class variables or attributes def __init__( self, internal_usage_cache: Optional[DualCache] = None, alerting_threshold: Optional[ float ] = None, # threshold for slow / hanging llm responses (in seconds) alerting: Optional[List] = [], alert_types: List[AlertType] = DEFAULT_ALERT_TYPES, alert_to_webhook_url: Optional[ Dict[AlertType, Union[List[str], str]] ] = None, # if user wants to separate alerts to diff channels alerting_args={}, default_webhook_url: Optional[str] = None, **kwargs, ): if alerting_threshold is None: alerting_threshold = 300 self.alerting_threshold = alerting_threshold self.alerting = alerting self.alert_types = alert_types self.internal_usage_cache = internal_usage_cache or DualCache() self.async_http_handler = get_async_httpx_client( llm_provider=httpxSpecialProvider.LoggingCallback ) self.alert_to_webhook_url = process_slack_alerting_variables( alert_to_webhook_url=alert_to_webhook_url ) self.is_running = False self.alerting_args = SlackAlertingArgs(**alerting_args) self.default_webhook_url = default_webhook_url self.flush_lock = asyncio.Lock() super().__init__(**kwargs, flush_lock=self.flush_lock) def update_values( self, alerting: Optional[List] = None, alerting_threshold: Optional[float] = None, alert_types: Optional[List[AlertType]] = None, alert_to_webhook_url: Optional[Dict[AlertType, Union[List[str], str]]] = None, alerting_args: Optional[Dict] = None, llm_router: Optional[Router] = None, ): if alerting is not None: self.alerting = alerting asyncio.create_task(self.periodic_flush()) if alerting_threshold is not None: self.alerting_threshold = alerting_threshold if alert_types is not None: self.alert_types = alert_types if alerting_args is not None: self.alerting_args = SlackAlertingArgs(**alerting_args) if alert_to_webhook_url is not None: # update the dict if self.alert_to_webhook_url is None: self.alert_to_webhook_url = process_slack_alerting_variables( alert_to_webhook_url=alert_to_webhook_url ) else: _new_values = ( process_slack_alerting_variables( alert_to_webhook_url=alert_to_webhook_url ) or {} ) self.alert_to_webhook_url.update(_new_values) if llm_router is not None: self.llm_router = llm_router async def deployment_in_cooldown(self): pass async def deployment_removed_from_cooldown(self): pass def _all_possible_alert_types(self): # used by the UI to show all supported alert types # Note: This is not the alerts the user has configured, instead it's all possible alert types a user can select # return list of all values AlertType enum return list(AlertType) def _response_taking_too_long_callback_helper( self, kwargs, # kwargs to completion start_time, end_time, # start/end time ): try: time_difference = end_time - start_time # Convert the timedelta to float (in seconds) time_difference_float = time_difference.total_seconds() litellm_params = kwargs.get("litellm_params", {}) model = kwargs.get("model", "") api_base = litellm.get_api_base(model=model, optional_params=litellm_params) messages = kwargs.get("messages", None) # if messages does not exist fallback to "input" if messages is None: messages = kwargs.get("input", None) # only use first 100 chars for alerting _messages = str(messages)[:100] return time_difference_float, model, api_base, _messages except Exception as e: raise e def _get_deployment_latencies_to_alert(self, metadata=None): if metadata is None: return None if "_latency_per_deployment" in metadata: # Translate model_id to -> api_base # _latency_per_deployment is a dictionary that looks like this: """ _latency_per_deployment: { api_base: 0.01336697916666667 } """ _message_to_send = "" _deployment_latencies = metadata["_latency_per_deployment"] if len(_deployment_latencies) == 0: return None _deployment_latency_map: Optional[dict] = None try: # try sorting deployments by latency _deployment_latencies = sorted( _deployment_latencies.items(), key=lambda x: x[1] ) _deployment_latency_map = dict(_deployment_latencies) except Exception: pass if _deployment_latency_map is None: return for api_base, latency in _deployment_latency_map.items(): _message_to_send += f"\n{api_base}: {round(latency,2)}s" _message_to_send = "```" + _message_to_send + "```" return _message_to_send async def response_taking_too_long_callback( self, kwargs, # kwargs to completion completion_response, # response from completion start_time, end_time, # start/end time ): if self.alerting is None or self.alert_types is None: return time_difference_float, model, api_base, messages = ( self._response_taking_too_long_callback_helper( kwargs=kwargs, start_time=start_time, end_time=end_time, ) ) if litellm.turn_off_message_logging or litellm.redact_messages_in_exceptions: messages = "Message not logged. litellm.redact_messages_in_exceptions=True" request_info = f"\nRequest Model: `{model}`\nAPI Base: `{api_base}`\nMessages: `{messages}`" slow_message = f"`Responses are slow - {round(time_difference_float,2)}s response time > Alerting threshold: {self.alerting_threshold}s`" alerting_metadata: dict = {} if time_difference_float > self.alerting_threshold: # add deployment latencies to alert if ( kwargs is not None and "litellm_params" in kwargs and "metadata" in kwargs["litellm_params"] ): _metadata: dict = kwargs["litellm_params"]["metadata"] request_info = _add_key_name_and_team_to_alert( request_info=request_info, metadata=_metadata ) _deployment_latency_map = self._get_deployment_latencies_to_alert( metadata=_metadata ) if _deployment_latency_map is not None: request_info += ( f"\nAvailable Deployment Latencies\n{_deployment_latency_map}" ) if "alerting_metadata" in _metadata: alerting_metadata = _metadata["alerting_metadata"] await self.send_alert( message=slow_message + request_info, level="Low", alert_type=AlertType.llm_too_slow, alerting_metadata=alerting_metadata, ) async def async_update_daily_reports( self, deployment_metrics: DeploymentMetrics ) -> int: """ Store the perf by deployment in cache - Number of failed requests per deployment - Latency / output tokens per deployment 'deployment_id:daily_metrics:failed_requests' 'deployment_id:daily_metrics:latency_per_output_token' Returns int - count of metrics set (1 - if just latency, 2 - if failed + latency) """ return_val = 0 try: ## FAILED REQUESTS ## if deployment_metrics.failed_request: await self.internal_usage_cache.async_increment_cache( key="{}:{}".format( deployment_metrics.id, SlackAlertingCacheKeys.failed_requests_key.value, ), value=1, parent_otel_span=None, # no attached request, this is a background operation ) return_val += 1 ## LATENCY ## if deployment_metrics.latency_per_output_token is not None: await self.internal_usage_cache.async_increment_cache( key="{}:{}".format( deployment_metrics.id, SlackAlertingCacheKeys.latency_key.value ), value=deployment_metrics.latency_per_output_token, parent_otel_span=None, # no attached request, this is a background operation ) return_val += 1 return return_val except Exception: return 0 async def send_daily_reports(self, router) -> bool: # noqa: PLR0915 """ Send a daily report on: - Top 5 deployments with most failed requests - Top 5 slowest deployments (normalized by latency/output tokens) Get the value from redis cache (if available) or in-memory and send it Cleanup: - reset values in cache -> prevent memory leak Returns: True -> if successfuly sent False -> if not sent """ ids = router.get_model_ids() # get keys failed_request_keys = [ "{}:{}".format(id, SlackAlertingCacheKeys.failed_requests_key.value) for id in ids ] latency_keys = [ "{}:{}".format(id, SlackAlertingCacheKeys.latency_key.value) for id in ids ] combined_metrics_keys = failed_request_keys + latency_keys # reduce cache calls combined_metrics_values = await self.internal_usage_cache.async_batch_get_cache( keys=combined_metrics_keys ) # [1, 2, None, ..] if combined_metrics_values is None: return False all_none = True for val in combined_metrics_values: if val is not None and val > 0: all_none = False break if all_none: return False failed_request_values = combined_metrics_values[ : len(failed_request_keys) ] # # [1, 2, None, ..] latency_values = combined_metrics_values[len(failed_request_keys) :] # find top 5 failed ## Replace None values with a placeholder value (-1 in this case) placeholder_value = 0 replaced_failed_values = [ value if value is not None else placeholder_value for value in failed_request_values ] ## Get the indices of top 5 keys with the highest numerical values (ignoring None and 0 values) top_5_failed = sorted( range(len(replaced_failed_values)), key=lambda i: replaced_failed_values[i], reverse=True, )[:5] top_5_failed = [ index for index in top_5_failed if replaced_failed_values[index] > 0 ] # find top 5 slowest # Replace None values with a placeholder value (-1 in this case) placeholder_value = 0 replaced_slowest_values = [ value if value is not None else placeholder_value for value in latency_values ] # Get the indices of top 5 values with the highest numerical values (ignoring None and 0 values) top_5_slowest = sorted( range(len(replaced_slowest_values)), key=lambda i: replaced_slowest_values[i], reverse=True, )[:5] top_5_slowest = [ index for index in top_5_slowest if replaced_slowest_values[index] > 0 ] # format alert -> return the litellm model name + api base message = f"\n\nTime: `{time.time()}`s\nHere are today's key metrics š: \n\n" message += "\n\n*āļø Top Deployments with Most Failed Requests:*\n\n" if not top_5_failed: message += "\tNone\n" for i in range(len(top_5_failed)): key = failed_request_keys[top_5_failed[i]].split(":")[0] _deployment = router.get_model_info(key) if isinstance(_deployment, dict): deployment_name = _deployment["litellm_params"].get("model", "") else: return False api_base = litellm.get_api_base( model=deployment_name, optional_params=( _deployment["litellm_params"] if _deployment is not None else {} ), ) if api_base is None: api_base = "" value = replaced_failed_values[top_5_failed[i]] message += f"\t{i+1}. Deployment: `{deployment_name}`, Failed Requests: `{value}`, API Base: `{api_base}`\n" message += "\n\n*š Top Slowest Deployments:*\n\n" if not top_5_slowest: message += "\tNone\n" for i in range(len(top_5_slowest)): key = latency_keys[top_5_slowest[i]].split(":")[0] _deployment = router.get_model_info(key) if _deployment is not None: deployment_name = _deployment["litellm_params"].get("model", "") else: deployment_name = "" api_base = litellm.get_api_base( model=deployment_name, optional_params=( _deployment["litellm_params"] if _deployment is not None else {} ), ) value = round(replaced_slowest_values[top_5_slowest[i]], 3) message += f"\t{i+1}. Deployment: `{deployment_name}`, Latency per output token: `{value}s/token`, API Base: `{api_base}`\n\n" # cache cleanup -> reset values to 0 latency_cache_keys = [(key, 0) for key in latency_keys] failed_request_cache_keys = [(key, 0) for key in failed_request_keys] combined_metrics_cache_keys = latency_cache_keys + failed_request_cache_keys await self.internal_usage_cache.async_set_cache_pipeline( cache_list=combined_metrics_cache_keys ) message += f"\n\nNext Run is at: `{time.time() + self.alerting_args.daily_report_frequency}`s" # send alert await self.send_alert( message=message, level="Low", alert_type=AlertType.daily_reports, alerting_metadata={}, ) return True async def response_taking_too_long( self, start_time: Optional[datetime.datetime] = None, end_time: Optional[datetime.datetime] = None, type: Literal["hanging_request", "slow_response"] = "hanging_request", request_data: Optional[dict] = None, ): if self.alerting is None or self.alert_types is None: return model: str = "" if request_data is not None: model = request_data.get("model", "") messages = request_data.get("messages", None) if messages is None: # if messages does not exist fallback to "input" messages = request_data.get("input", None) # try casting messages to str and get the first 100 characters, else mark as None try: messages = str(messages) messages = messages[:100] except Exception: messages = "" if ( litellm.turn_off_message_logging or litellm.redact_messages_in_exceptions ): messages = ( "Message not logged. litellm.redact_messages_in_exceptions=True" ) request_info = f"\nRequest Model: `{model}`\nMessages: `{messages}`" else: request_info = "" if type == "hanging_request": await asyncio.sleep( self.alerting_threshold ) # Set it to 5 minutes - i'd imagine this might be different for streaming, non-streaming, non-completion (embedding + img) requests alerting_metadata: dict = {} if await self._request_is_completed(request_data=request_data) is True: return if request_data is not None: if request_data.get("deployment", None) is not None and isinstance( request_data["deployment"], dict ): _api_base = litellm.get_api_base( model=model, optional_params=request_data["deployment"].get( "litellm_params", {} ), ) if _api_base is None: _api_base = "" request_info += f"\nAPI Base: {_api_base}" elif request_data.get("metadata", None) is not None and isinstance( request_data["metadata"], dict ): # In hanging requests sometime it has not made it to the point where the deployment is passed to the `request_data`` # in that case we fallback to the api base set in the request metadata _metadata: dict = request_data["metadata"] _api_base = _metadata.get("api_base", "") request_info = _add_key_name_and_team_to_alert( request_info=request_info, metadata=_metadata ) if _api_base is None: _api_base = "" if "alerting_metadata" in _metadata: alerting_metadata = _metadata["alerting_metadata"] request_info += f"\nAPI Base: `{_api_base}`" # only alert hanging responses if they have not been marked as success alerting_message = ( f"`Requests are hanging - {self.alerting_threshold}s+ request time`" ) if "langfuse" in litellm.success_callback: langfuse_url = await _add_langfuse_trace_id_to_alert( request_data=request_data, ) if langfuse_url is not None: request_info += "\nšŖ¢ Langfuse Trace: {}".format(langfuse_url) # add deployment latencies to alert _deployment_latency_map = self._get_deployment_latencies_to_alert( metadata=request_data.get("metadata", {}) ) if _deployment_latency_map is not None: request_info += f"\nDeployment Latencies\n{_deployment_latency_map}" await self.send_alert( message=alerting_message + request_info, level="Medium", alert_type=AlertType.llm_requests_hanging, alerting_metadata=alerting_metadata, ) async def failed_tracking_alert(self, error_message: str, failing_model: str): """ Raise alert when tracking failed for specific model Args: error_message (str): Error message failing_model (str): Model that failed tracking """ if self.alerting is None or self.alert_types is None: # do nothing if alerting is not switched on return if "failed_tracking_spend" not in self.alert_types: return _cache: DualCache = self.internal_usage_cache message = "Failed Tracking Cost for " + error_message _cache_key = "budget_alerts:failed_tracking:{}".format(failing_model) result = await _cache.async_get_cache(key=_cache_key) if result is None: await self.send_alert( message=message, level="High", alert_type=AlertType.failed_tracking_spend, alerting_metadata={}, ) await _cache.async_set_cache( key=_cache_key, value="SENT", ttl=self.alerting_args.budget_alert_ttl, ) async def budget_alerts( # noqa: PLR0915 self, type: Literal[ "token_budget", "soft_budget", "user_budget", "team_budget", "proxy_budget", "projected_limit_exceeded", ], user_info: CallInfo, ): ## PREVENTITIVE ALERTING ## - https://github.com/BerriAI/litellm/issues/2727 # - Alert once within 24hr period # - Cache this information # - Don't re-alert, if alert already sent _cache: DualCache = self.internal_usage_cache if self.alerting is None or self.alert_types is None: # do nothing if alerting is not switched on return if "budget_alerts" not in self.alert_types: return _id: Optional[str] = "default_id" # used for caching user_info_json = user_info.model_dump(exclude_none=True) user_info_str = self._get_user_info_str(user_info) event: Optional[ Literal[ "budget_crossed", "threshold_crossed", "projected_limit_exceeded", "soft_budget_crossed", ] ] = None event_group: Optional[ Literal["internal_user", "team", "key", "proxy", "customer"] ] = None event_message: str = "" webhook_event: Optional[WebhookEvent] = None if type == "proxy_budget": event_group = "proxy" event_message += "Proxy Budget: " elif type == "soft_budget": event_group = "proxy" event_message += "Soft Budget Crossed: " elif type == "user_budget": event_group = "internal_user" event_message += "User Budget: " _id = user_info.user_id or _id elif type == "team_budget": event_group = "team" event_message += "Team Budget: " _id = user_info.team_id or _id elif type == "token_budget": event_group = "key" event_message += "Key Budget: " _id = user_info.token elif type == "projected_limit_exceeded": event_group = "key" event_message += "Key Budget: Projected Limit Exceeded" event = "projected_limit_exceeded" _id = user_info.token # percent of max_budget left to spend if user_info.max_budget is None and user_info.soft_budget is None: return percent_left: float = 0 if user_info.max_budget is not None: if user_info.max_budget > 0: percent_left = ( user_info.max_budget - user_info.spend ) / user_info.max_budget # check if crossed budget if user_info.max_budget is not None: if user_info.spend >= user_info.max_budget: event = "budget_crossed" event_message += ( f"Budget Crossed\n Total Budget:`{user_info.max_budget}`" ) elif percent_left <= 0.05: event = "threshold_crossed" event_message += "5% Threshold Crossed " elif percent_left <= 0.15: event = "threshold_crossed" event_message += "15% Threshold Crossed" elif user_info.soft_budget is not None: if user_info.spend >= user_info.soft_budget: event = "soft_budget_crossed" if event is not None and event_group is not None: _cache_key = "budget_alerts:{}:{}".format(event, _id) result = await _cache.async_get_cache(key=_cache_key) if result is None: webhook_event = WebhookEvent( event=event, event_group=event_group, event_message=event_message, **user_info_json, ) await self.send_alert( message=event_message + "\n\n" + user_info_str, level="High", alert_type=AlertType.budget_alerts, user_info=webhook_event, alerting_metadata={}, ) await _cache.async_set_cache( key=_cache_key, value="SENT", ttl=self.alerting_args.budget_alert_ttl, ) return return def _get_user_info_str(self, user_info: CallInfo) -> str: """ Create a standard message for a budget alert """ _all_fields_as_dict = user_info.model_dump(exclude_none=True) _all_fields_as_dict.pop("token") msg = "" for k, v in _all_fields_as_dict.items(): msg += f"*{k}:* `{v}`\n" return msg async def customer_spend_alert( self, token: Optional[str], key_alias: Optional[str], end_user_id: Optional[str], response_cost: Optional[float], max_budget: Optional[float], ): if ( self.alerting is not None and "webhook" in self.alerting and end_user_id is not None and token is not None and response_cost is not None ): # log customer spend event = WebhookEvent( spend=response_cost, max_budget=max_budget, token=token, customer_id=end_user_id, user_id=None, team_id=None, user_email=None, key_alias=key_alias, projected_exceeded_date=None, projected_spend=None, event="spend_tracked", event_group="customer", event_message="Customer spend tracked. Customer={}, spend={}".format( end_user_id, response_cost ), ) await self.send_webhook_alert(webhook_event=event) def _count_outage_alerts(self, alerts: List[int]) -> str: """ Parameters: - alerts: List[int] -> list of error codes (either 408 or 500+) Returns: - str -> formatted string. This is an alert message, giving a human-friendly description of the errors. """ error_breakdown = {"Timeout Errors": 0, "API Errors": 0, "Unknown Errors": 0} for alert in alerts: if alert == 408: error_breakdown["Timeout Errors"] += 1 elif alert >= 500: error_breakdown["API Errors"] += 1 else: error_breakdown["Unknown Errors"] += 1 error_msg = "" for key, value in error_breakdown.items(): if value > 0: error_msg += "\n{}: {}\n".format(key, value) return error_msg def _outage_alert_msg_factory( self, alert_type: Literal["Major", "Minor"], key: Literal["Model", "Region"], key_val: str, provider: str, api_base: Optional[str], outage_value: BaseOutageModel, ) -> str: """Format an alert message for slack""" headers = {f"{key} Name": key_val, "Provider": provider} if api_base is not None: headers["API Base"] = api_base # type: ignore headers_str = "\n" for k, v in headers.items(): headers_str += f"*{k}:* `{v}`\n" return f"""\n\n *ā ļø {alert_type} Service Outage* {headers_str} *Errors:* {self._count_outage_alerts(alerts=outage_value["alerts"])} *Last Check:* `{round(time.time() - outage_value["last_updated_at"], 4)}s ago`\n\n """ async def region_outage_alerts( self, exception: APIError, deployment_id: str, ) -> None: """ Send slack alert if specific provider region is having an outage. Track for 408 (Timeout) and >=500 Error codes """ ## CREATE (PROVIDER+REGION) ID ## if self.llm_router is None: return deployment = self.llm_router.get_deployment(model_id=deployment_id) if deployment is None: return model = deployment.litellm_params.model ### GET PROVIDER ### provider = deployment.litellm_params.custom_llm_provider if provider is None: model, provider, _, _ = litellm.get_llm_provider(model=model) ### GET REGION ### region_name = deployment.litellm_params.region_name if region_name is None: region_name = litellm.utils._get_model_region( custom_llm_provider=provider, litellm_params=deployment.litellm_params ) if region_name is None: return ### UNIQUE CACHE KEY ### cache_key = provider + region_name outage_value: Optional[ProviderRegionOutageModel] = ( await self.internal_usage_cache.async_get_cache(key=cache_key) ) if ( getattr(exception, "status_code", None) is None or ( exception.status_code != 408 # type: ignore and exception.status_code < 500 # type: ignore ) or self.llm_router is None ): return if outage_value is None: _deployment_set = set() _deployment_set.add(deployment_id) outage_value = ProviderRegionOutageModel( provider_region_id=cache_key, alerts=[exception.status_code], # type: ignore minor_alert_sent=False, major_alert_sent=False, last_updated_at=time.time(), deployment_ids=_deployment_set, ) ## add to cache ## await self.internal_usage_cache.async_set_cache( key=cache_key, value=outage_value, ttl=self.alerting_args.region_outage_alert_ttl, ) return if len(outage_value["alerts"]) < self.alerting_args.max_outage_alert_list_size: outage_value["alerts"].append(exception.status_code) # type: ignore else: # prevent memory leaks pass _deployment_set = outage_value["deployment_ids"] _deployment_set.add(deployment_id) outage_value["deployment_ids"] = _deployment_set outage_value["last_updated_at"] = time.time() ## MINOR OUTAGE ALERT SENT ## if ( outage_value["minor_alert_sent"] is False and len(outage_value["alerts"]) >= self.alerting_args.minor_outage_alert_threshold and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment ): msg = self._outage_alert_msg_factory( alert_type="Minor", key="Region", key_val=region_name, api_base=None, outage_value=outage_value, provider=provider, ) # send minor alert await self.send_alert( message=msg, level="Medium", alert_type=AlertType.outage_alerts, alerting_metadata={}, ) # set to true outage_value["minor_alert_sent"] = True ## MAJOR OUTAGE ALERT SENT ## elif ( outage_value["major_alert_sent"] is False and len(outage_value["alerts"]) >= self.alerting_args.major_outage_alert_threshold and len(_deployment_set) > 1 # make sure it's not just 1 bad deployment ): msg = self._outage_alert_msg_factory( alert_type="Major", key="Region", key_val=region_name, api_base=None, outage_value=outage_value, provider=provider, ) # send minor alert await self.send_alert( message=msg, level="High", alert_type=AlertType.outage_alerts, alerting_metadata={}, ) # set to true outage_value["major_alert_sent"] = True ## update cache ## await self.internal_usage_cache.async_set_cache( key=cache_key, value=outage_value ) async def outage_alerts( self, exception: APIError, deployment_id: str, ) -> None: """ Send slack alert if model is badly configured / having an outage (408, 401, 429, >=500). key = model_id value = { - model_id - threshold - alerts [] } ttl = 1hr max_alerts_size = 10 """ try: outage_value: Optional[OutageModel] = await self.internal_usage_cache.async_get_cache(key=deployment_id) # type: ignore if ( getattr(exception, "status_code", None) is None or ( exception.status_code != 408 # type: ignore and exception.status_code < 500 # type: ignore ) or self.llm_router is None ): return ### EXTRACT MODEL DETAILS ### deployment = self.llm_router.get_deployment(model_id=deployment_id) if deployment is None: return model = deployment.litellm_params.model provider = deployment.litellm_params.custom_llm_provider if provider is None: try: model, provider, _, _ = litellm.get_llm_provider(model=model) except Exception: provider = "" api_base = litellm.get_api_base( model=model, optional_params=deployment.litellm_params ) if outage_value is None: outage_value = OutageModel( model_id=deployment_id, alerts=[exception.status_code], # type: ignore minor_alert_sent=False, major_alert_sent=False, last_updated_at=time.time(), ) ## add to cache ## await self.internal_usage_cache.async_set_cache( key=deployment_id, value=outage_value, ttl=self.alerting_args.outage_alert_ttl, ) return if ( len(outage_value["alerts"]) < self.alerting_args.max_outage_alert_list_size ): outage_value["alerts"].append(exception.status_code) # type: ignore else: # prevent memory leaks pass outage_value["last_updated_at"] = time.time() ## MINOR OUTAGE ALERT SENT ## if ( outage_value["minor_alert_sent"] is False and len(outage_value["alerts"]) >= self.alerting_args.minor_outage_alert_threshold ): msg = self._outage_alert_msg_factory( alert_type="Minor", key="Model", key_val=model, api_base=api_base, outage_value=outage_value, provider=provider, ) # send minor alert await self.send_alert( message=msg, level="Medium", alert_type=AlertType.outage_alerts, alerting_metadata={}, ) # set to true outage_value["minor_alert_sent"] = True elif ( outage_value["major_alert_sent"] is False and len(outage_value["alerts"]) >= self.alerting_args.major_outage_alert_threshold ): msg = self._outage_alert_msg_factory( alert_type="Major", key="Model", key_val=model, api_base=api_base, outage_value=outage_value, provider=provider, ) # send minor alert await self.send_alert( message=msg, level="High", alert_type=AlertType.outage_alerts, alerting_metadata={}, ) # set to true outage_value["major_alert_sent"] = True ## update cache ## await self.internal_usage_cache.async_set_cache( key=deployment_id, value=outage_value ) except Exception: pass async def model_added_alert( self, model_name: str, litellm_model_name: str, passed_model_info: Any ): base_model_from_user = getattr(passed_model_info, "base_model", None) model_info = {} base_model = "" if base_model_from_user is not None: model_info = litellm.model_cost.get(base_model_from_user, {}) base_model = f"Base Model: `{base_model_from_user}`\n" else: model_info = litellm.model_cost.get(litellm_model_name, {}) model_info_str = "" for k, v in model_info.items(): if k == "input_cost_per_token" or k == "output_cost_per_token": # when converting to string it should not be 1.63e-06 v = "{:.8f}".format(v) model_info_str += f"{k}: {v}\n" message = f""" *š New Model Added* Model Name: `{model_name}` {base_model} Usage OpenAI Python SDK: ``` import openai client = openai.OpenAI( api_key="your_api_key", base_url={os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000")} ) response = client.chat.completions.create( model="{model_name}", # model to send to the proxy messages = [ {{ "role": "user", "content": "this is a test request, write a short poem" }} ] ) ``` Model Info: ``` {model_info_str} ``` """ alert_val = self.send_alert( message=message, level="Low", alert_type=AlertType.new_model_added, alerting_metadata={}, ) if alert_val is not None and asyncio.iscoroutine(alert_val): await alert_val async def model_removed_alert(self, model_name: str): pass async def send_webhook_alert(self, webhook_event: WebhookEvent) -> bool: """ Sends structured alert to webhook, if set. Currently only implemented for budget alerts Returns -> True if sent, False if not. Raises Exception - if WEBHOOK_URL is not set """ webhook_url = os.getenv("WEBHOOK_URL", None) if webhook_url is None: raise Exception("Missing webhook_url from environment") payload = webhook_event.model_dump_json() headers = {"Content-type": "application/json"} response = await self.async_http_handler.post( url=webhook_url, headers=headers, data=payload, ) if response.status_code == 200: return True else: print("Error sending webhook alert. Error=", response.text) # noqa return False async def _check_if_using_premium_email_feature( self, premium_user: bool, email_logo_url: Optional[str] = None, email_support_contact: Optional[str] = None, ): from litellm.proxy.proxy_server import CommonProxyErrors, premium_user if premium_user is not True: if email_logo_url is not None or email_support_contact is not None: raise ValueError( f"Trying to Customize Email Alerting\n {CommonProxyErrors.not_premium_user.value}" ) return async def send_key_created_or_user_invited_email( self, webhook_event: WebhookEvent ) -> bool: try: from litellm.proxy.utils import send_email if self.alerting is None or "email" not in self.alerting: # do nothing if user does not want email alerts verbose_proxy_logger.error( "Error sending email alert - 'email' not in self.alerting %s", self.alerting, ) return False from litellm.proxy.proxy_server import premium_user, prisma_client email_logo_url = os.getenv( "SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None) ) email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None) await self._check_if_using_premium_email_feature( premium_user, email_logo_url, email_support_contact ) if email_logo_url is None: email_logo_url = LITELLM_LOGO_URL if email_support_contact is None: email_support_contact = LITELLM_SUPPORT_CONTACT event_name = webhook_event.event_message recipient_email = webhook_event.user_email recipient_user_id = webhook_event.user_id if ( recipient_email is None and recipient_user_id is not None and prisma_client is not None ): user_row = await prisma_client.db.litellm_usertable.find_unique( where={"user_id": recipient_user_id} ) if user_row is not None: recipient_email = user_row.user_email key_token = webhook_event.token key_budget = webhook_event.max_budget base_url = os.getenv("PROXY_BASE_URL", "http://0.0.0.0:4000") email_html_content = "Alert from LiteLLM Server" if recipient_email is None: verbose_proxy_logger.error( "Trying to send email alert to no recipient", extra=webhook_event.dict(), ) if webhook_event.event == "key_created": email_html_content = KEY_CREATED_EMAIL_TEMPLATE.format( email_logo_url=email_logo_url, recipient_email=recipient_email, key_budget=key_budget, key_token=key_token, base_url=base_url, email_support_contact=email_support_contact, ) elif webhook_event.event == "internal_user_created": # GET TEAM NAME team_id = webhook_event.team_id team_name = "Default Team" if team_id is not None and prisma_client is not None: team_row = await prisma_client.db.litellm_teamtable.find_unique( where={"team_id": team_id} ) if team_row is not None: team_name = team_row.team_alias or "-" email_html_content = USER_INVITED_EMAIL_TEMPLATE.format( email_logo_url=email_logo_url, recipient_email=recipient_email, team_name=team_name, base_url=base_url, email_support_contact=email_support_contact, ) else: verbose_proxy_logger.error( "Trying to send email alert on unknown webhook event", extra=webhook_event.model_dump(), ) webhook_event.model_dump_json() email_event = { "to": recipient_email, "subject": f"LiteLLM: {event_name}", "html": email_html_content, } await send_email( receiver_email=email_event["to"], subject=email_event["subject"], html=email_event["html"], ) return True except Exception as e: verbose_proxy_logger.error("Error sending email alert %s", str(e)) return False async def send_email_alert_using_smtp( self, webhook_event: WebhookEvent, alert_type: str ) -> bool: """ Sends structured Email alert to an SMTP server Currently only implemented for budget alerts Returns -> True if sent, False if not. """ from litellm.proxy.proxy_server import premium_user from litellm.proxy.utils import send_email email_logo_url = os.getenv( "SMTP_SENDER_LOGO", os.getenv("EMAIL_LOGO_URL", None) ) email_support_contact = os.getenv("EMAIL_SUPPORT_CONTACT", None) await self._check_if_using_premium_email_feature( premium_user, email_logo_url, email_support_contact ) if email_logo_url is None: email_logo_url = LITELLM_LOGO_URL if email_support_contact is None: email_support_contact = LITELLM_SUPPORT_CONTACT event_name = webhook_event.event_message recipient_email = webhook_event.user_email user_name = webhook_event.user_id max_budget = webhook_event.max_budget email_html_content = "Alert from LiteLLM Server" if recipient_email is None: verbose_proxy_logger.error( "Trying to send email alert to no recipient", extra=webhook_event.dict() ) if webhook_event.event == "budget_crossed": email_html_content = f""" <img src="{email_logo_url}" alt="LiteLLM Logo" width="150" height="50" /> <p> Hi {user_name}, <br/> Your LLM API usage this month has reached your account's <b> monthly budget of ${max_budget} </b> <br /> <br /> API requests will be rejected until either (a) you increase your monthly budget or (b) your monthly usage resets at the beginning of the next calendar month. <br /> <br /> If you have any questions, please send an email to {email_support_contact} <br /> <br /> Best, <br /> The LiteLLM team <br /> """ webhook_event.model_dump_json() email_event = { "to": recipient_email, "subject": f"LiteLLM: {event_name}", "html": email_html_content, } await send_email( receiver_email=email_event["to"], subject=email_event["subject"], html=email_event["html"], ) if webhook_event.event_group == "team": from litellm.integrations.email_alerting import send_team_budget_alert await send_team_budget_alert(webhook_event=webhook_event) return False async def send_alert( self, message: str, level: Literal["Low", "Medium", "High"], alert_type: AlertType, alerting_metadata: dict, user_info: Optional[WebhookEvent] = None, **kwargs, ): """ Alerting based on thresholds: - https://github.com/BerriAI/litellm/issues/1298 - Responses taking too long - Requests are hanging - Calls are failing - DB Read/Writes are failing - Proxy Close to max budget - Key Close to max budget Parameters: level: str - Low|Medium|High - if calls might fail (Medium) or are failing (High); Currently, no alerts would be 'Low'. message: str - what is the alert about """ if self.alerting is None: return if ( "webhook" in self.alerting and alert_type == "budget_alerts" and user_info is not None ): await self.send_webhook_alert(webhook_event=user_info) if ( "email" in self.alerting and alert_type == "budget_alerts" and user_info is not None ): # only send budget alerts over Email await self.send_email_alert_using_smtp( webhook_event=user_info, alert_type=alert_type ) if "slack" not in self.alerting: return if alert_type not in self.alert_types: return from datetime import datetime # Get the current timestamp current_time = datetime.now().strftime("%H:%M:%S") _proxy_base_url = os.getenv("PROXY_BASE_URL", None) if alert_type == "daily_reports" or alert_type == "new_model_added": formatted_message = message else: formatted_message = ( f"Level: `{level}`\nTimestamp: `{current_time}`\n\nMessage: {message}" ) if kwargs: for key, value in kwargs.items(): formatted_message += f"\n\n{key}: `{value}`\n\n" if alerting_metadata: for key, value in alerting_metadata.items(): formatted_message += f"\n\n*Alerting Metadata*: \n{key}: `{value}`\n\n" if _proxy_base_url is not None: formatted_message += f"\n\nProxy URL: `{_proxy_base_url}`" # check if we find the slack webhook url in self.alert_to_webhook_url if ( self.alert_to_webhook_url is not None and alert_type in self.alert_to_webhook_url ): slack_webhook_url: Optional[Union[str, List[str]]] = ( self.alert_to_webhook_url[alert_type] ) elif self.default_webhook_url is not None: slack_webhook_url = self.default_webhook_url else: slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL", None) if slack_webhook_url is None: raise ValueError("Missing SLACK_WEBHOOK_URL from environment") payload = {"text": formatted_message} headers = {"Content-type": "application/json"} if isinstance(slack_webhook_url, list): for url in slack_webhook_url: self.log_queue.append( { "url": url, "headers": headers, "payload": payload, "alert_type": alert_type, } ) else: self.log_queue.append( { "url": slack_webhook_url, "headers": headers, "payload": payload, "alert_type": alert_type, } ) if len(self.log_queue) >= self.batch_size: await self.flush_queue() async def async_send_batch(self): if not self.log_queue: return squashed_queue = squash_payloads(self.log_queue) tasks = [ send_to_webhook( slackAlertingInstance=self, item=item["item"], count=item["count"] ) for item in squashed_queue.values() ] await asyncio.gather(*tasks) self.log_queue.clear() async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): """Log deployment latency""" try: if "daily_reports" in self.alert_types: litellm_params = kwargs.get("litellm_params", {}) or {} model_info = litellm_params.get("model_info", {}) or {} model_id = model_info.get("id", "") or "" response_s: timedelta = end_time - start_time final_value = response_s if isinstance(response_obj, litellm.ModelResponse) and ( hasattr(response_obj, "usage") and response_obj.usage is not None # type: ignore and hasattr(response_obj.usage, "completion_tokens") # type: ignore ): completion_tokens = response_obj.usage.completion_tokens # type: ignore if completion_tokens is not None and completion_tokens > 0: final_value = float( response_s.total_seconds() / completion_tokens ) if isinstance(final_value, timedelta): final_value = final_value.total_seconds() await self.async_update_daily_reports( DeploymentMetrics( id=model_id, failed_request=False, latency_per_output_token=final_value, updated_at=litellm.utils.get_utc_datetime(), ) ) except Exception as e: verbose_proxy_logger.error( f"[Non-Blocking Error] Slack Alerting: Got error in logging LLM deployment latency: {str(e)}" ) pass async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): """Log failure + deployment latency""" _litellm_params = kwargs.get("litellm_params", {}) _model_info = _litellm_params.get("model_info", {}) or {} model_id = _model_info.get("id", "") try: if "daily_reports" in self.alert_types: try: await self.async_update_daily_reports( DeploymentMetrics( id=model_id, failed_request=True, latency_per_output_token=None, updated_at=litellm.utils.get_utc_datetime(), ) ) except Exception as e: verbose_logger.debug(f"Exception raises -{str(e)}") if isinstance(kwargs.get("exception", ""), APIError): if "outage_alerts" in self.alert_types: await self.outage_alerts( exception=kwargs["exception"], deployment_id=model_id, ) if "region_outage_alerts" in self.alert_types: await self.region_outage_alerts( exception=kwargs["exception"], deployment_id=model_id ) except Exception: pass async def _run_scheduler_helper(self, llm_router) -> bool: """ Returns: - True -> report sent - False -> report not sent """ report_sent_bool = False report_sent = await self.internal_usage_cache.async_get_cache( key=SlackAlertingCacheKeys.report_sent_key.value, parent_otel_span=None, ) # None | float current_time = time.time() if report_sent is None: await self.internal_usage_cache.async_set_cache( key=SlackAlertingCacheKeys.report_sent_key.value, value=current_time, ) elif isinstance(report_sent, float): # Check if current time - interval >= time last sent interval_seconds = self.alerting_args.daily_report_frequency if current_time - report_sent >= interval_seconds: # Sneak in the reporting logic here await self.send_daily_reports(router=llm_router) # Also, don't forget to update the report_sent time after sending the report! await self.internal_usage_cache.async_set_cache( key=SlackAlertingCacheKeys.report_sent_key.value, value=current_time, ) report_sent_bool = True return report_sent_bool async def _run_scheduled_daily_report(self, llm_router: Optional[Any] = None): """ If 'daily_reports' enabled Ping redis cache every 5 minutes to check if we should send the report If yes -> call send_daily_report() """ if llm_router is None or self.alert_types is None: return if "daily_reports" in self.alert_types: while True: await self._run_scheduler_helper(llm_router=llm_router) interval = random.randint( self.alerting_args.report_check_interval - 3, self.alerting_args.report_check_interval + 3, ) # shuffle to prevent collisions await asyncio.sleep(interval) return async def send_weekly_spend_report( self, time_range: str = "7d", ): """ Send a spend report for a configurable time range. Args: time_range: A string specifying the time range for the report, e.g., "1d", "7d", "30d" """ if self.alerting is None or "spend_reports" not in self.alert_types: return try: from litellm.proxy.spend_tracking.spend_management_endpoints import ( _get_spend_report_for_time_range, ) # Parse the time range days = int(time_range[:-1]) if time_range[-1].lower() != "d": raise ValueError("Time range must be specified in days, e.g., '7d'") todays_date = datetime.datetime.now().date() start_date = todays_date - datetime.timedelta(days=days) _event_cache_key = f"weekly_spend_report_sent_{start_date.strftime('%Y-%m-%d')}_{todays_date.strftime('%Y-%m-%d')}" if await self.internal_usage_cache.async_get_cache(key=_event_cache_key): return _resp = await _get_spend_report_for_time_range( start_date=start_date.strftime("%Y-%m-%d"), end_date=todays_date.strftime("%Y-%m-%d"), ) if _resp is None or _resp == ([], []): return spend_per_team, spend_per_tag = _resp _spend_message = f"*šø Spend Report for `{start_date.strftime('%m-%d-%Y')} - {todays_date.strftime('%m-%d-%Y')}` ({days} days)*\n" if spend_per_team is not None: _spend_message += "\n*Team Spend Report:*\n" for spend in spend_per_team: _team_spend = round(float(spend["total_spend"]), 4) _spend_message += ( f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n" ) if spend_per_tag is not None: _spend_message += "\n*Tag Spend Report:*\n" for spend in spend_per_tag: _tag_spend = round(float(spend["total_spend"]), 4) _spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n" await self.send_alert( message=_spend_message, level="Low", alert_type=AlertType.spend_reports, alerting_metadata={}, ) await self.internal_usage_cache.async_set_cache( key=_event_cache_key, value="SENT", ttl=duration_in_seconds(time_range), ) except ValueError as ve: verbose_proxy_logger.error(f"Invalid time range format: {ve}") except Exception as e: verbose_proxy_logger.error(f"Error sending spend report: {e}") async def send_monthly_spend_report(self): """ """ try: from calendar import monthrange from litellm.proxy.spend_tracking.spend_management_endpoints import ( _get_spend_report_for_time_range, ) todays_date = datetime.datetime.now().date() first_day_of_month = todays_date.replace(day=1) _, last_day_of_month = monthrange(todays_date.year, todays_date.month) last_day_of_month = first_day_of_month + datetime.timedelta( days=last_day_of_month - 1 ) _event_cache_key = f"monthly_spend_report_sent_{first_day_of_month.strftime('%Y-%m-%d')}_{last_day_of_month.strftime('%Y-%m-%d')}" if await self.internal_usage_cache.async_get_cache(key=_event_cache_key): return _resp = await _get_spend_report_for_time_range( start_date=first_day_of_month.strftime("%Y-%m-%d"), end_date=last_day_of_month.strftime("%Y-%m-%d"), ) if _resp is None or _resp == ([], []): return monthly_spend_per_team, monthly_spend_per_tag = _resp _spend_message = f"*šø Monthly Spend Report for `{first_day_of_month.strftime('%m-%d-%Y')} - {last_day_of_month.strftime('%m-%d-%Y')}` *\n" if monthly_spend_per_team is not None: _spend_message += "\n*Team Spend Report:*\n" for spend in monthly_spend_per_team: _team_spend = spend["total_spend"] _team_spend = float(_team_spend) # round to 4 decimal places _team_spend = round(_team_spend, 4) _spend_message += ( f"Team: `{spend['team_alias']}` | Spend: `${_team_spend}`\n" ) if monthly_spend_per_tag is not None: _spend_message += "\n*Tag Spend Report:*\n" for spend in monthly_spend_per_tag: _tag_spend = spend["total_spend"] _tag_spend = float(_tag_spend) # round to 4 decimal places _tag_spend = round(_tag_spend, 4) _spend_message += f"Tag: `{spend['individual_request_tag']}` | Spend: `${_tag_spend}`\n" await self.send_alert( message=_spend_message, level="Low", alert_type=AlertType.spend_reports, alerting_metadata={}, ) await self.internal_usage_cache.async_set_cache( key=_event_cache_key, value="SENT", ttl=(30 * 24 * 60 * 60), # 1 month ) except Exception as e: verbose_proxy_logger.exception("Error sending weekly spend report %s", e) async def send_fallback_stats_from_prometheus(self): """ Helper to send fallback statistics from prometheus server -> to slack This runs once per day and sends an overview of all the fallback statistics """ try: from litellm.integrations.prometheus_helpers.prometheus_api import ( get_fallback_metric_from_prometheus, ) # call prometheuslogger. falllback_success_info_prometheus = ( await get_fallback_metric_from_prometheus() ) fallback_message = ( f"*Fallback Statistics:*\n{falllback_success_info_prometheus}" ) await self.send_alert( message=fallback_message, level="Low", alert_type=AlertType.fallback_reports, alerting_metadata={}, ) except Exception as e: verbose_proxy_logger.error("Error sending weekly spend report %s", e) pass async def send_virtual_key_event_slack( self, key_event: VirtualKeyEvent, alert_type: AlertType, event_name: str, ): """ Handles sending Virtual Key related alerts Example: - New Virtual Key Created - Internal User Updated - Team Created, Updated, Deleted """ try: message = f"`{event_name}`\n" key_event_dict = key_event.model_dump() # Add Created by information first message += "*Action Done by:*\n" for key, value in key_event_dict.items(): if "created_by" in key: message += f"{key}: `{value}`\n" # Add args sent to function in the alert message += "\n*Arguments passed:*\n" request_kwargs = key_event.request_kwargs for key, value in request_kwargs.items(): if key == "user_api_key_dict": continue message += f"{key}: `{value}`\n" await self.send_alert( message=message, level="High", alert_type=alert_type, alerting_metadata={}, ) except Exception as e: verbose_proxy_logger.error( "Error sending send_virtual_key_event_slack %s", e ) return async def _request_is_completed(self, request_data: Optional[dict]) -> bool: """ Returns True if the request is completed - either as a success or failure """ if request_data is None: return False if ( request_data.get("litellm_status", "") != "success" and request_data.get("litellm_status", "") != "fail" ): ## CHECK IF CACHE IS UPDATED litellm_call_id = request_data.get("litellm_call_id", "") status: Optional[str] = await self.internal_usage_cache.async_get_cache( key="request_status:{}".format(litellm_call_id), local_only=True ) if status is not None and (status == "success" or status == "fail"): return True return False