diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints/_health_endpoints.py | 698 |
1 files changed, 698 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints/_health_endpoints.py b/.venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints/_health_endpoints.py new file mode 100644 index 00000000..34e7d34b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints/_health_endpoints.py @@ -0,0 +1,698 @@ +import asyncio +import copy +import os +import traceback +from datetime import datetime, timedelta +from typing import Dict, Literal, Optional, Union + +import fastapi +from fastapi import APIRouter, Depends, HTTPException, Request, Response, status + +import litellm +from litellm._logging import verbose_proxy_logger +from litellm.constants import HEALTH_CHECK_TIMEOUT_SECONDS +from litellm.proxy._types import ( + AlertType, + CallInfo, + ProxyErrorTypes, + ProxyException, + UserAPIKeyAuth, + WebhookEvent, +) +from litellm.proxy.auth.user_api_key_auth import user_api_key_auth +from litellm.proxy.health_check import ( + _clean_endpoint_data, + _update_litellm_params_for_health_check, + perform_health_check, + run_with_timeout, +) + +#### Health ENDPOINTS #### + +router = APIRouter() + + +@router.get( + "/test", + tags=["health"], + dependencies=[Depends(user_api_key_auth)], +) +async def test_endpoint(request: Request): + """ + [DEPRECATED] use `/health/liveliness` instead. + + A test endpoint that pings the proxy server to check if it's healthy. + + Parameters: + request (Request): The incoming request. + + Returns: + dict: A dictionary containing the route of the request URL. + """ + # ping the proxy server to check if its healthy + return {"route": request.url.path} + + +@router.get( + "/health/services", + tags=["health"], + dependencies=[Depends(user_api_key_auth)], +) +async def health_services_endpoint( # noqa: PLR0915 + user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), + service: Union[ + Literal[ + "slack_budget_alerts", + "langfuse", + "slack", + "openmeter", + "webhook", + "email", + "braintrust", + "datadog", + ], + str, + ] = fastapi.Query(description="Specify the service being hit."), +): + """ + Use this admin-only endpoint to check if the service is healthy. + + Example: + ``` + curl -L -X GET 'http://0.0.0.0:4000/health/services?service=datadog' \ + -H 'Authorization: Bearer sk-1234' + ``` + """ + try: + from litellm.proxy.proxy_server import ( + general_settings, + prisma_client, + proxy_logging_obj, + ) + + if service is None: + raise HTTPException( + status_code=400, detail={"error": "Service must be specified."} + ) + + if service not in [ + "slack_budget_alerts", + "email", + "langfuse", + "slack", + "openmeter", + "webhook", + "braintrust", + "otel", + "custom_callback_api", + "langsmith", + "datadog", + ]: + raise HTTPException( + status_code=400, + detail={ + "error": f"Service must be in list. Service={service}. List={['slack_budget_alerts']}" + }, + ) + + if ( + service == "openmeter" + or service == "braintrust" + or (service in litellm.success_callback and service != "langfuse") + ): + _ = await litellm.acompletion( + model="openai/litellm-mock-response-model", + messages=[{"role": "user", "content": "Hey, how's it going?"}], + user="litellm:/health/services", + mock_response="This is a mock response", + ) + return { + "status": "success", + "message": "Mock LLM request made - check {}.".format(service), + } + elif service == "datadog": + from litellm.integrations.datadog.datadog import DataDogLogger + + datadog_logger = DataDogLogger() + response = await datadog_logger.async_health_check() + return { + "status": response["status"], + "message": ( + response["error_message"] + if response["status"] == "unhealthy" + else "Datadog is healthy" + ), + } + elif service == "langfuse": + from litellm.integrations.langfuse.langfuse import LangFuseLogger + + langfuse_logger = LangFuseLogger() + langfuse_logger.Langfuse.auth_check() + _ = litellm.completion( + model="openai/litellm-mock-response-model", + messages=[{"role": "user", "content": "Hey, how's it going?"}], + user="litellm:/health/services", + mock_response="This is a mock response", + ) + return { + "status": "success", + "message": "Mock LLM request made - check langfuse.", + } + + if service == "webhook": + user_info = CallInfo( + token=user_api_key_dict.token or "", + spend=1, + max_budget=0, + user_id=user_api_key_dict.user_id, + key_alias=user_api_key_dict.key_alias, + team_id=user_api_key_dict.team_id, + ) + await proxy_logging_obj.budget_alerts( + type="user_budget", + user_info=user_info, + ) + + if service == "slack" or service == "slack_budget_alerts": + if "slack" in general_settings.get("alerting", []): + # test_message = f"""\nšØ `ProjectedLimitExceededError` šø\n\n`Key Alias:` litellm-ui-test-alert \n`Expected Day of Error`: 28th March \n`Current Spend`: $100.00 \n`Projected Spend at end of month`: $1000.00 \n`Soft Limit`: $700""" + # check if user has opted into unique_alert_webhooks + if ( + proxy_logging_obj.slack_alerting_instance.alert_to_webhook_url + is not None + ): + for ( + alert_type + ) in proxy_logging_obj.slack_alerting_instance.alert_to_webhook_url: + # only test alert if it's in active alert types + if ( + proxy_logging_obj.slack_alerting_instance.alert_types + is not None + and alert_type + not in proxy_logging_obj.slack_alerting_instance.alert_types + ): + continue + + test_message = "default test message" + if alert_type == AlertType.llm_exceptions: + test_message = "LLM Exception test alert" + elif alert_type == AlertType.llm_too_slow: + test_message = "LLM Too Slow test alert" + elif alert_type == AlertType.llm_requests_hanging: + test_message = "LLM Requests Hanging test alert" + elif alert_type == AlertType.budget_alerts: + test_message = "Budget Alert test alert" + elif alert_type == AlertType.db_exceptions: + test_message = "DB Exception test alert" + elif alert_type == AlertType.outage_alerts: + test_message = "Outage Alert Exception test alert" + elif alert_type == AlertType.daily_reports: + test_message = "Daily Reports test alert" + else: + test_message = "Budget Alert test alert" + + await proxy_logging_obj.alerting_handler( + message=test_message, level="Low", alert_type=alert_type + ) + else: + await proxy_logging_obj.alerting_handler( + message="This is a test slack alert message", + level="Low", + alert_type=AlertType.budget_alerts, + ) + + if prisma_client is not None: + asyncio.create_task( + proxy_logging_obj.slack_alerting_instance.send_monthly_spend_report() + ) + asyncio.create_task( + proxy_logging_obj.slack_alerting_instance.send_weekly_spend_report() + ) + + alert_types = ( + proxy_logging_obj.slack_alerting_instance.alert_types or [] + ) + alert_types = list(alert_types) + return { + "status": "success", + "alert_types": alert_types, + "message": "Mock Slack Alert sent, verify Slack Alert Received on your channel", + } + else: + raise HTTPException( + status_code=422, + detail={ + "error": '"{}" not in proxy config: general_settings. Unable to test this.'.format( + service + ) + }, + ) + if service == "email": + webhook_event = WebhookEvent( + event="key_created", + event_group="key", + event_message="Test Email Alert", + token=user_api_key_dict.token or "", + key_alias="Email Test key (This is only a test alert key. DO NOT USE THIS IN PRODUCTION.)", + spend=0, + max_budget=0, + user_id=user_api_key_dict.user_id, + user_email=os.getenv("TEST_EMAIL_ADDRESS"), + team_id=user_api_key_dict.team_id, + ) + + # use create task - this can take 10 seconds. don't keep ui users waiting for notification to check their email + await proxy_logging_obj.slack_alerting_instance.send_key_created_or_user_invited_email( + webhook_event=webhook_event + ) + + return { + "status": "success", + "message": "Mock Email Alert sent, verify Email Alert Received", + } + + except Exception as e: + verbose_proxy_logger.error( + "litellm.proxy.proxy_server.health_services_endpoint(): Exception occured - {}".format( + str(e) + ) + ) + verbose_proxy_logger.debug(traceback.format_exc()) + if isinstance(e, HTTPException): + raise ProxyException( + message=getattr(e, "detail", f"Authentication Error({str(e)})"), + type=ProxyErrorTypes.auth_error, + param=getattr(e, "param", "None"), + code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR), + ) + elif isinstance(e, ProxyException): + raise e + raise ProxyException( + message="Authentication Error, " + str(e), + type=ProxyErrorTypes.auth_error, + param=getattr(e, "param", "None"), + code=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + +@router.get("/health", tags=["health"], dependencies=[Depends(user_api_key_auth)]) +async def health_endpoint( + user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), + model: Optional[str] = fastapi.Query( + None, description="Specify the model name (optional)" + ), +): + """ + šØ USE `/health/liveliness` to health check the proxy šØ + + See more š https://docs.litellm.ai/docs/proxy/health + + + Check the health of all the endpoints in config.yaml + + To run health checks in the background, add this to config.yaml: + ``` + general_settings: + # ... other settings + background_health_checks: True + ``` + else, the health checks will be run on models when /health is called. + """ + from litellm.proxy.proxy_server import ( + health_check_details, + health_check_results, + llm_model_list, + use_background_health_checks, + user_model, + ) + + try: + if llm_model_list is None: + # if no router set, check if user set a model using litellm --model ollama/llama2 + if user_model is not None: + healthy_endpoints, unhealthy_endpoints = await perform_health_check( + model_list=[], cli_model=user_model, details=health_check_details + ) + return { + "healthy_endpoints": healthy_endpoints, + "unhealthy_endpoints": unhealthy_endpoints, + "healthy_count": len(healthy_endpoints), + "unhealthy_count": len(unhealthy_endpoints), + } + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": "Model list not initialized"}, + ) + _llm_model_list = copy.deepcopy(llm_model_list) + ### FILTER MODELS FOR ONLY THOSE USER HAS ACCESS TO ### + if len(user_api_key_dict.models) > 0: + pass + else: + pass # + if use_background_health_checks: + return health_check_results + else: + healthy_endpoints, unhealthy_endpoints = await perform_health_check( + _llm_model_list, model, details=health_check_details + ) + + return { + "healthy_endpoints": healthy_endpoints, + "unhealthy_endpoints": unhealthy_endpoints, + "healthy_count": len(healthy_endpoints), + "unhealthy_count": len(unhealthy_endpoints), + } + except Exception as e: + verbose_proxy_logger.error( + "litellm.proxy.proxy_server.py::health_endpoint(): Exception occured - {}".format( + str(e) + ) + ) + verbose_proxy_logger.debug(traceback.format_exc()) + raise e + + +db_health_cache = {"status": "unknown", "last_updated": datetime.now()} + + +async def _db_health_readiness_check(): + from litellm.proxy.proxy_server import prisma_client + + global db_health_cache + + # Note - Intentionally don't try/except this so it raises an exception when it fails + + # if timedelta is less than 2 minutes return DB Status + time_diff = datetime.now() - db_health_cache["last_updated"] + if db_health_cache["status"] != "unknown" and time_diff < timedelta(minutes=2): + return db_health_cache + + if prisma_client is None: + db_health_cache = {"status": "disconnected", "last_updated": datetime.now()} + return db_health_cache + + await prisma_client.health_check() + db_health_cache = {"status": "connected", "last_updated": datetime.now()} + return db_health_cache + + +@router.get( + "/settings", + tags=["health"], + dependencies=[Depends(user_api_key_auth)], +) +@router.get( + "/active/callbacks", + tags=["health"], + dependencies=[Depends(user_api_key_auth)], +) +async def active_callbacks(): + """ + Returns a list of litellm level settings + + This is useful for debugging and ensuring the proxy server is configured correctly. + + Response schema: + ``` + { + "alerting": _alerting, + "litellm.callbacks": litellm_callbacks, + "litellm.input_callback": litellm_input_callbacks, + "litellm.failure_callback": litellm_failure_callbacks, + "litellm.success_callback": litellm_success_callbacks, + "litellm._async_success_callback": litellm_async_success_callbacks, + "litellm._async_failure_callback": litellm_async_failure_callbacks, + "litellm._async_input_callback": litellm_async_input_callbacks, + "all_litellm_callbacks": all_litellm_callbacks, + "num_callbacks": len(all_litellm_callbacks), + "num_alerting": _num_alerting, + "litellm.request_timeout": litellm.request_timeout, + } + ``` + """ + + from litellm.proxy.proxy_server import general_settings, proxy_logging_obj + + _alerting = str(general_settings.get("alerting")) + # get success callbacks + + litellm_callbacks = [str(x) for x in litellm.callbacks] + litellm_input_callbacks = [str(x) for x in litellm.input_callback] + litellm_failure_callbacks = [str(x) for x in litellm.failure_callback] + litellm_success_callbacks = [str(x) for x in litellm.success_callback] + litellm_async_success_callbacks = [str(x) for x in litellm._async_success_callback] + litellm_async_failure_callbacks = [str(x) for x in litellm._async_failure_callback] + litellm_async_input_callbacks = [str(x) for x in litellm._async_input_callback] + + all_litellm_callbacks = ( + litellm_callbacks + + litellm_input_callbacks + + litellm_failure_callbacks + + litellm_success_callbacks + + litellm_async_success_callbacks + + litellm_async_failure_callbacks + + litellm_async_input_callbacks + ) + + alerting = proxy_logging_obj.alerting + _num_alerting = 0 + if alerting and isinstance(alerting, list): + _num_alerting = len(alerting) + + return { + "alerting": _alerting, + "litellm.callbacks": litellm_callbacks, + "litellm.input_callback": litellm_input_callbacks, + "litellm.failure_callback": litellm_failure_callbacks, + "litellm.success_callback": litellm_success_callbacks, + "litellm._async_success_callback": litellm_async_success_callbacks, + "litellm._async_failure_callback": litellm_async_failure_callbacks, + "litellm._async_input_callback": litellm_async_input_callbacks, + "all_litellm_callbacks": all_litellm_callbacks, + "num_callbacks": len(all_litellm_callbacks), + "num_alerting": _num_alerting, + "litellm.request_timeout": litellm.request_timeout, + } + + +def callback_name(callback): + if isinstance(callback, str): + return callback + + try: + return callback.__name__ + except AttributeError: + try: + return callback.__class__.__name__ + except AttributeError: + return str(callback) + + +@router.get( + "/health/readiness", + tags=["health"], + dependencies=[Depends(user_api_key_auth)], +) +async def health_readiness(): + """ + Unprotected endpoint for checking if worker can receive requests + """ + from litellm.proxy.proxy_server import prisma_client, version + + try: + # get success callback + success_callback_names = [] + + try: + # this was returning a JSON of the values in some of the callbacks + # all we need is the callback name, hence we do str(callback) + success_callback_names = [ + callback_name(x) for x in litellm.success_callback + ] + except AttributeError: + # don't let this block the /health/readiness response, if we can't convert to str -> return litellm.success_callback + success_callback_names = litellm.success_callback + + # check Cache + cache_type = None + if litellm.cache is not None: + from litellm.caching.caching import RedisSemanticCache + + cache_type = litellm.cache.type + + if isinstance(litellm.cache.cache, RedisSemanticCache): + # ping the cache + # TODO: @ishaan-jaff - we should probably not ping the cache on every /health/readiness check + try: + index_info = await litellm.cache.cache._index_info() + except Exception as e: + index_info = "index does not exist - error: " + str(e) + cache_type = {"type": cache_type, "index_info": index_info} + + # check DB + if prisma_client is not None: # if db passed in, check if it's connected + db_health_status = await _db_health_readiness_check() + return { + "status": "healthy", + "db": "connected", + "cache": cache_type, + "litellm_version": version, + "success_callbacks": success_callback_names, + **db_health_status, + } + else: + return { + "status": "healthy", + "db": "Not connected", + "cache": cache_type, + "litellm_version": version, + "success_callbacks": success_callback_names, + } + except Exception as e: + raise HTTPException(status_code=503, detail=f"Service Unhealthy ({str(e)})") + + +@router.get( + "/health/liveliness", # Historical LiteLLM name; doesn't match k8s terminology but kept for backwards compatibility + tags=["health"], + dependencies=[Depends(user_api_key_auth)], +) +@router.get( + "/health/liveness", # Kubernetes has "liveness" probes (https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command) + tags=["health"], + dependencies=[Depends(user_api_key_auth)], +) +async def health_liveliness(): + """ + Unprotected endpoint for checking if worker is alive + """ + return "I'm alive!" + + +@router.options( + "/health/readiness", + tags=["health"], + dependencies=[Depends(user_api_key_auth)], +) +async def health_readiness_options(): + """ + Options endpoint for health/readiness check. + """ + response_headers = { + "Allow": "GET, OPTIONS", + "Access-Control-Allow-Methods": "GET, OPTIONS", + "Access-Control-Allow-Headers": "*", + } + return Response(headers=response_headers, status_code=200) + + +@router.options( + "/health/liveliness", + tags=["health"], + dependencies=[Depends(user_api_key_auth)], +) +@router.options( + "/health/liveness", # Kubernetes has "liveness" probes (https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command) + tags=["health"], + dependencies=[Depends(user_api_key_auth)], +) +async def health_liveliness_options(): + """ + Options endpoint for health/liveliness check. + """ + response_headers = { + "Allow": "GET, OPTIONS", + "Access-Control-Allow-Methods": "GET, OPTIONS", + "Access-Control-Allow-Headers": "*", + } + return Response(headers=response_headers, status_code=200) + + +@router.post( + "/health/test_connection", + tags=["health"], + dependencies=[Depends(user_api_key_auth)], +) +async def test_model_connection( + request: Request, + mode: Optional[ + Literal[ + "chat", + "completion", + "embedding", + "audio_speech", + "audio_transcription", + "image_generation", + "batch", + "rerank", + "realtime", + ] + ] = fastapi.Body("chat", description="The mode to test the model with"), + litellm_params: Dict = fastapi.Body( + None, + description="Parameters for litellm.completion, litellm.embedding for the health check", + ), + user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), +): + """ + Test a direct connection to a specific model. + + This endpoint allows you to verify if your proxy can successfully connect to a specific model. + It's useful for troubleshooting model connectivity issues without going through the full proxy routing. + + Example: + ```bash + curl -X POST 'http://localhost:4000/health/test_connection' \\ + -H 'Authorization: Bearer sk-1234' \\ + -H 'Content-Type: application/json' \\ + -d '{ + "litellm_params": { + "model": "gpt-4", + "custom_llm_provider": "azure_ai", + "litellm_credential_name": null, + "api_key": "6xxxxxxx", + "api_base": "https://litellm8397336933.openai.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2024-10-21", + }, + "mode": "chat" + }' + ``` + + Returns: + dict: A dictionary containing the health check result with either success information or error details. + """ + try: + # Include health_check_params if provided + litellm_params = _update_litellm_params_for_health_check( + model_info={}, + litellm_params=litellm_params, + ) + mode = mode or litellm_params.pop("mode", None) + result = await run_with_timeout( + litellm.ahealth_check( + model_params=litellm_params, + mode=mode, + prompt="test from litellm", + input=["test from litellm"], + ), + HEALTH_CHECK_TIMEOUT_SECONDS, + ) + + # Clean the result for display + cleaned_result = _clean_endpoint_data( + {**litellm_params, **result}, details=True + ) + + return { + "status": "error" if "error" in result else "success", + "result": cleaned_result, + } + + except Exception as e: + verbose_proxy_logger.error( + f"litellm.proxy.health_endpoints.test_model_connection(): Exception occurred - {str(e)}" + ) + verbose_proxy_logger.debug(traceback.format_exc()) + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail={"error": f"Failed to test connection: {str(e)}"}, + ) |