aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints/_health_endpoints.py698
1 files changed, 698 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints/_health_endpoints.py b/.venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints/_health_endpoints.py
new file mode 100644
index 00000000..34e7d34b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/proxy/health_endpoints/_health_endpoints.py
@@ -0,0 +1,698 @@
+import asyncio
+import copy
+import os
+import traceback
+from datetime import datetime, timedelta
+from typing import Dict, Literal, Optional, Union
+
+import fastapi
+from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
+
+import litellm
+from litellm._logging import verbose_proxy_logger
+from litellm.constants import HEALTH_CHECK_TIMEOUT_SECONDS
+from litellm.proxy._types import (
+ AlertType,
+ CallInfo,
+ ProxyErrorTypes,
+ ProxyException,
+ UserAPIKeyAuth,
+ WebhookEvent,
+)
+from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
+from litellm.proxy.health_check import (
+ _clean_endpoint_data,
+ _update_litellm_params_for_health_check,
+ perform_health_check,
+ run_with_timeout,
+)
+
+#### Health ENDPOINTS ####
+
+router = APIRouter()
+
+
+@router.get(
+ "/test",
+ tags=["health"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def test_endpoint(request: Request):
+ """
+ [DEPRECATED] use `/health/liveliness` instead.
+
+ A test endpoint that pings the proxy server to check if it's healthy.
+
+ Parameters:
+ request (Request): The incoming request.
+
+ Returns:
+ dict: A dictionary containing the route of the request URL.
+ """
+ # ping the proxy server to check if its healthy
+ return {"route": request.url.path}
+
+
+@router.get(
+ "/health/services",
+ tags=["health"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def health_services_endpoint( # noqa: PLR0915
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+ service: Union[
+ Literal[
+ "slack_budget_alerts",
+ "langfuse",
+ "slack",
+ "openmeter",
+ "webhook",
+ "email",
+ "braintrust",
+ "datadog",
+ ],
+ str,
+ ] = fastapi.Query(description="Specify the service being hit."),
+):
+ """
+ Use this admin-only endpoint to check if the service is healthy.
+
+ Example:
+ ```
+ curl -L -X GET 'http://0.0.0.0:4000/health/services?service=datadog' \
+ -H 'Authorization: Bearer sk-1234'
+ ```
+ """
+ try:
+ from litellm.proxy.proxy_server import (
+ general_settings,
+ prisma_client,
+ proxy_logging_obj,
+ )
+
+ if service is None:
+ raise HTTPException(
+ status_code=400, detail={"error": "Service must be specified."}
+ )
+
+ if service not in [
+ "slack_budget_alerts",
+ "email",
+ "langfuse",
+ "slack",
+ "openmeter",
+ "webhook",
+ "braintrust",
+ "otel",
+ "custom_callback_api",
+ "langsmith",
+ "datadog",
+ ]:
+ raise HTTPException(
+ status_code=400,
+ detail={
+ "error": f"Service must be in list. Service={service}. List={['slack_budget_alerts']}"
+ },
+ )
+
+ if (
+ service == "openmeter"
+ or service == "braintrust"
+ or (service in litellm.success_callback and service != "langfuse")
+ ):
+ _ = await litellm.acompletion(
+ model="openai/litellm-mock-response-model",
+ messages=[{"role": "user", "content": "Hey, how's it going?"}],
+ user="litellm:/health/services",
+ mock_response="This is a mock response",
+ )
+ return {
+ "status": "success",
+ "message": "Mock LLM request made - check {}.".format(service),
+ }
+ elif service == "datadog":
+ from litellm.integrations.datadog.datadog import DataDogLogger
+
+ datadog_logger = DataDogLogger()
+ response = await datadog_logger.async_health_check()
+ return {
+ "status": response["status"],
+ "message": (
+ response["error_message"]
+ if response["status"] == "unhealthy"
+ else "Datadog is healthy"
+ ),
+ }
+ elif service == "langfuse":
+ from litellm.integrations.langfuse.langfuse import LangFuseLogger
+
+ langfuse_logger = LangFuseLogger()
+ langfuse_logger.Langfuse.auth_check()
+ _ = litellm.completion(
+ model="openai/litellm-mock-response-model",
+ messages=[{"role": "user", "content": "Hey, how's it going?"}],
+ user="litellm:/health/services",
+ mock_response="This is a mock response",
+ )
+ return {
+ "status": "success",
+ "message": "Mock LLM request made - check langfuse.",
+ }
+
+ if service == "webhook":
+ user_info = CallInfo(
+ token=user_api_key_dict.token or "",
+ spend=1,
+ max_budget=0,
+ user_id=user_api_key_dict.user_id,
+ key_alias=user_api_key_dict.key_alias,
+ team_id=user_api_key_dict.team_id,
+ )
+ await proxy_logging_obj.budget_alerts(
+ type="user_budget",
+ user_info=user_info,
+ )
+
+ if service == "slack" or service == "slack_budget_alerts":
+ if "slack" in general_settings.get("alerting", []):
+ # test_message = f"""\n🚨 `ProjectedLimitExceededError` šŸ’ø\n\n`Key Alias:` litellm-ui-test-alert \n`Expected Day of Error`: 28th March \n`Current Spend`: $100.00 \n`Projected Spend at end of month`: $1000.00 \n`Soft Limit`: $700"""
+ # check if user has opted into unique_alert_webhooks
+ if (
+ proxy_logging_obj.slack_alerting_instance.alert_to_webhook_url
+ is not None
+ ):
+ for (
+ alert_type
+ ) in proxy_logging_obj.slack_alerting_instance.alert_to_webhook_url:
+ # only test alert if it's in active alert types
+ if (
+ proxy_logging_obj.slack_alerting_instance.alert_types
+ is not None
+ and alert_type
+ not in proxy_logging_obj.slack_alerting_instance.alert_types
+ ):
+ continue
+
+ test_message = "default test message"
+ if alert_type == AlertType.llm_exceptions:
+ test_message = "LLM Exception test alert"
+ elif alert_type == AlertType.llm_too_slow:
+ test_message = "LLM Too Slow test alert"
+ elif alert_type == AlertType.llm_requests_hanging:
+ test_message = "LLM Requests Hanging test alert"
+ elif alert_type == AlertType.budget_alerts:
+ test_message = "Budget Alert test alert"
+ elif alert_type == AlertType.db_exceptions:
+ test_message = "DB Exception test alert"
+ elif alert_type == AlertType.outage_alerts:
+ test_message = "Outage Alert Exception test alert"
+ elif alert_type == AlertType.daily_reports:
+ test_message = "Daily Reports test alert"
+ else:
+ test_message = "Budget Alert test alert"
+
+ await proxy_logging_obj.alerting_handler(
+ message=test_message, level="Low", alert_type=alert_type
+ )
+ else:
+ await proxy_logging_obj.alerting_handler(
+ message="This is a test slack alert message",
+ level="Low",
+ alert_type=AlertType.budget_alerts,
+ )
+
+ if prisma_client is not None:
+ asyncio.create_task(
+ proxy_logging_obj.slack_alerting_instance.send_monthly_spend_report()
+ )
+ asyncio.create_task(
+ proxy_logging_obj.slack_alerting_instance.send_weekly_spend_report()
+ )
+
+ alert_types = (
+ proxy_logging_obj.slack_alerting_instance.alert_types or []
+ )
+ alert_types = list(alert_types)
+ return {
+ "status": "success",
+ "alert_types": alert_types,
+ "message": "Mock Slack Alert sent, verify Slack Alert Received on your channel",
+ }
+ else:
+ raise HTTPException(
+ status_code=422,
+ detail={
+ "error": '"{}" not in proxy config: general_settings. Unable to test this.'.format(
+ service
+ )
+ },
+ )
+ if service == "email":
+ webhook_event = WebhookEvent(
+ event="key_created",
+ event_group="key",
+ event_message="Test Email Alert",
+ token=user_api_key_dict.token or "",
+ key_alias="Email Test key (This is only a test alert key. DO NOT USE THIS IN PRODUCTION.)",
+ spend=0,
+ max_budget=0,
+ user_id=user_api_key_dict.user_id,
+ user_email=os.getenv("TEST_EMAIL_ADDRESS"),
+ team_id=user_api_key_dict.team_id,
+ )
+
+ # use create task - this can take 10 seconds. don't keep ui users waiting for notification to check their email
+ await proxy_logging_obj.slack_alerting_instance.send_key_created_or_user_invited_email(
+ webhook_event=webhook_event
+ )
+
+ return {
+ "status": "success",
+ "message": "Mock Email Alert sent, verify Email Alert Received",
+ }
+
+ except Exception as e:
+ verbose_proxy_logger.error(
+ "litellm.proxy.proxy_server.health_services_endpoint(): Exception occured - {}".format(
+ str(e)
+ )
+ )
+ verbose_proxy_logger.debug(traceback.format_exc())
+ if isinstance(e, HTTPException):
+ raise ProxyException(
+ message=getattr(e, "detail", f"Authentication Error({str(e)})"),
+ type=ProxyErrorTypes.auth_error,
+ param=getattr(e, "param", "None"),
+ code=getattr(e, "status_code", status.HTTP_500_INTERNAL_SERVER_ERROR),
+ )
+ elif isinstance(e, ProxyException):
+ raise e
+ raise ProxyException(
+ message="Authentication Error, " + str(e),
+ type=ProxyErrorTypes.auth_error,
+ param=getattr(e, "param", "None"),
+ code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ )
+
+
+@router.get("/health", tags=["health"], dependencies=[Depends(user_api_key_auth)])
+async def health_endpoint(
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+ model: Optional[str] = fastapi.Query(
+ None, description="Specify the model name (optional)"
+ ),
+):
+ """
+ 🚨 USE `/health/liveliness` to health check the proxy 🚨
+
+ See more šŸ‘‰ https://docs.litellm.ai/docs/proxy/health
+
+
+ Check the health of all the endpoints in config.yaml
+
+ To run health checks in the background, add this to config.yaml:
+ ```
+ general_settings:
+ # ... other settings
+ background_health_checks: True
+ ```
+ else, the health checks will be run on models when /health is called.
+ """
+ from litellm.proxy.proxy_server import (
+ health_check_details,
+ health_check_results,
+ llm_model_list,
+ use_background_health_checks,
+ user_model,
+ )
+
+ try:
+ if llm_model_list is None:
+ # if no router set, check if user set a model using litellm --model ollama/llama2
+ if user_model is not None:
+ healthy_endpoints, unhealthy_endpoints = await perform_health_check(
+ model_list=[], cli_model=user_model, details=health_check_details
+ )
+ return {
+ "healthy_endpoints": healthy_endpoints,
+ "unhealthy_endpoints": unhealthy_endpoints,
+ "healthy_count": len(healthy_endpoints),
+ "unhealthy_count": len(unhealthy_endpoints),
+ }
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail={"error": "Model list not initialized"},
+ )
+ _llm_model_list = copy.deepcopy(llm_model_list)
+ ### FILTER MODELS FOR ONLY THOSE USER HAS ACCESS TO ###
+ if len(user_api_key_dict.models) > 0:
+ pass
+ else:
+ pass #
+ if use_background_health_checks:
+ return health_check_results
+ else:
+ healthy_endpoints, unhealthy_endpoints = await perform_health_check(
+ _llm_model_list, model, details=health_check_details
+ )
+
+ return {
+ "healthy_endpoints": healthy_endpoints,
+ "unhealthy_endpoints": unhealthy_endpoints,
+ "healthy_count": len(healthy_endpoints),
+ "unhealthy_count": len(unhealthy_endpoints),
+ }
+ except Exception as e:
+ verbose_proxy_logger.error(
+ "litellm.proxy.proxy_server.py::health_endpoint(): Exception occured - {}".format(
+ str(e)
+ )
+ )
+ verbose_proxy_logger.debug(traceback.format_exc())
+ raise e
+
+
+db_health_cache = {"status": "unknown", "last_updated": datetime.now()}
+
+
+async def _db_health_readiness_check():
+ from litellm.proxy.proxy_server import prisma_client
+
+ global db_health_cache
+
+ # Note - Intentionally don't try/except this so it raises an exception when it fails
+
+ # if timedelta is less than 2 minutes return DB Status
+ time_diff = datetime.now() - db_health_cache["last_updated"]
+ if db_health_cache["status"] != "unknown" and time_diff < timedelta(minutes=2):
+ return db_health_cache
+
+ if prisma_client is None:
+ db_health_cache = {"status": "disconnected", "last_updated": datetime.now()}
+ return db_health_cache
+
+ await prisma_client.health_check()
+ db_health_cache = {"status": "connected", "last_updated": datetime.now()}
+ return db_health_cache
+
+
+@router.get(
+ "/settings",
+ tags=["health"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+@router.get(
+ "/active/callbacks",
+ tags=["health"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def active_callbacks():
+ """
+ Returns a list of litellm level settings
+
+ This is useful for debugging and ensuring the proxy server is configured correctly.
+
+ Response schema:
+ ```
+ {
+ "alerting": _alerting,
+ "litellm.callbacks": litellm_callbacks,
+ "litellm.input_callback": litellm_input_callbacks,
+ "litellm.failure_callback": litellm_failure_callbacks,
+ "litellm.success_callback": litellm_success_callbacks,
+ "litellm._async_success_callback": litellm_async_success_callbacks,
+ "litellm._async_failure_callback": litellm_async_failure_callbacks,
+ "litellm._async_input_callback": litellm_async_input_callbacks,
+ "all_litellm_callbacks": all_litellm_callbacks,
+ "num_callbacks": len(all_litellm_callbacks),
+ "num_alerting": _num_alerting,
+ "litellm.request_timeout": litellm.request_timeout,
+ }
+ ```
+ """
+
+ from litellm.proxy.proxy_server import general_settings, proxy_logging_obj
+
+ _alerting = str(general_settings.get("alerting"))
+ # get success callbacks
+
+ litellm_callbacks = [str(x) for x in litellm.callbacks]
+ litellm_input_callbacks = [str(x) for x in litellm.input_callback]
+ litellm_failure_callbacks = [str(x) for x in litellm.failure_callback]
+ litellm_success_callbacks = [str(x) for x in litellm.success_callback]
+ litellm_async_success_callbacks = [str(x) for x in litellm._async_success_callback]
+ litellm_async_failure_callbacks = [str(x) for x in litellm._async_failure_callback]
+ litellm_async_input_callbacks = [str(x) for x in litellm._async_input_callback]
+
+ all_litellm_callbacks = (
+ litellm_callbacks
+ + litellm_input_callbacks
+ + litellm_failure_callbacks
+ + litellm_success_callbacks
+ + litellm_async_success_callbacks
+ + litellm_async_failure_callbacks
+ + litellm_async_input_callbacks
+ )
+
+ alerting = proxy_logging_obj.alerting
+ _num_alerting = 0
+ if alerting and isinstance(alerting, list):
+ _num_alerting = len(alerting)
+
+ return {
+ "alerting": _alerting,
+ "litellm.callbacks": litellm_callbacks,
+ "litellm.input_callback": litellm_input_callbacks,
+ "litellm.failure_callback": litellm_failure_callbacks,
+ "litellm.success_callback": litellm_success_callbacks,
+ "litellm._async_success_callback": litellm_async_success_callbacks,
+ "litellm._async_failure_callback": litellm_async_failure_callbacks,
+ "litellm._async_input_callback": litellm_async_input_callbacks,
+ "all_litellm_callbacks": all_litellm_callbacks,
+ "num_callbacks": len(all_litellm_callbacks),
+ "num_alerting": _num_alerting,
+ "litellm.request_timeout": litellm.request_timeout,
+ }
+
+
+def callback_name(callback):
+ if isinstance(callback, str):
+ return callback
+
+ try:
+ return callback.__name__
+ except AttributeError:
+ try:
+ return callback.__class__.__name__
+ except AttributeError:
+ return str(callback)
+
+
+@router.get(
+ "/health/readiness",
+ tags=["health"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def health_readiness():
+ """
+ Unprotected endpoint for checking if worker can receive requests
+ """
+ from litellm.proxy.proxy_server import prisma_client, version
+
+ try:
+ # get success callback
+ success_callback_names = []
+
+ try:
+ # this was returning a JSON of the values in some of the callbacks
+ # all we need is the callback name, hence we do str(callback)
+ success_callback_names = [
+ callback_name(x) for x in litellm.success_callback
+ ]
+ except AttributeError:
+ # don't let this block the /health/readiness response, if we can't convert to str -> return litellm.success_callback
+ success_callback_names = litellm.success_callback
+
+ # check Cache
+ cache_type = None
+ if litellm.cache is not None:
+ from litellm.caching.caching import RedisSemanticCache
+
+ cache_type = litellm.cache.type
+
+ if isinstance(litellm.cache.cache, RedisSemanticCache):
+ # ping the cache
+ # TODO: @ishaan-jaff - we should probably not ping the cache on every /health/readiness check
+ try:
+ index_info = await litellm.cache.cache._index_info()
+ except Exception as e:
+ index_info = "index does not exist - error: " + str(e)
+ cache_type = {"type": cache_type, "index_info": index_info}
+
+ # check DB
+ if prisma_client is not None: # if db passed in, check if it's connected
+ db_health_status = await _db_health_readiness_check()
+ return {
+ "status": "healthy",
+ "db": "connected",
+ "cache": cache_type,
+ "litellm_version": version,
+ "success_callbacks": success_callback_names,
+ **db_health_status,
+ }
+ else:
+ return {
+ "status": "healthy",
+ "db": "Not connected",
+ "cache": cache_type,
+ "litellm_version": version,
+ "success_callbacks": success_callback_names,
+ }
+ except Exception as e:
+ raise HTTPException(status_code=503, detail=f"Service Unhealthy ({str(e)})")
+
+
+@router.get(
+ "/health/liveliness", # Historical LiteLLM name; doesn't match k8s terminology but kept for backwards compatibility
+ tags=["health"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+@router.get(
+ "/health/liveness", # Kubernetes has "liveness" probes (https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command)
+ tags=["health"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def health_liveliness():
+ """
+ Unprotected endpoint for checking if worker is alive
+ """
+ return "I'm alive!"
+
+
+@router.options(
+ "/health/readiness",
+ tags=["health"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def health_readiness_options():
+ """
+ Options endpoint for health/readiness check.
+ """
+ response_headers = {
+ "Allow": "GET, OPTIONS",
+ "Access-Control-Allow-Methods": "GET, OPTIONS",
+ "Access-Control-Allow-Headers": "*",
+ }
+ return Response(headers=response_headers, status_code=200)
+
+
+@router.options(
+ "/health/liveliness",
+ tags=["health"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+@router.options(
+ "/health/liveness", # Kubernetes has "liveness" probes (https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/#define-a-liveness-command)
+ tags=["health"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def health_liveliness_options():
+ """
+ Options endpoint for health/liveliness check.
+ """
+ response_headers = {
+ "Allow": "GET, OPTIONS",
+ "Access-Control-Allow-Methods": "GET, OPTIONS",
+ "Access-Control-Allow-Headers": "*",
+ }
+ return Response(headers=response_headers, status_code=200)
+
+
+@router.post(
+ "/health/test_connection",
+ tags=["health"],
+ dependencies=[Depends(user_api_key_auth)],
+)
+async def test_model_connection(
+ request: Request,
+ mode: Optional[
+ Literal[
+ "chat",
+ "completion",
+ "embedding",
+ "audio_speech",
+ "audio_transcription",
+ "image_generation",
+ "batch",
+ "rerank",
+ "realtime",
+ ]
+ ] = fastapi.Body("chat", description="The mode to test the model with"),
+ litellm_params: Dict = fastapi.Body(
+ None,
+ description="Parameters for litellm.completion, litellm.embedding for the health check",
+ ),
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """
+ Test a direct connection to a specific model.
+
+ This endpoint allows you to verify if your proxy can successfully connect to a specific model.
+ It's useful for troubleshooting model connectivity issues without going through the full proxy routing.
+
+ Example:
+ ```bash
+ curl -X POST 'http://localhost:4000/health/test_connection' \\
+ -H 'Authorization: Bearer sk-1234' \\
+ -H 'Content-Type: application/json' \\
+ -d '{
+ "litellm_params": {
+ "model": "gpt-4",
+ "custom_llm_provider": "azure_ai",
+ "litellm_credential_name": null,
+ "api_key": "6xxxxxxx",
+ "api_base": "https://litellm8397336933.openai.azure.com/openai/deployments/gpt-4o/chat/completions?api-version=2024-10-21",
+ },
+ "mode": "chat"
+ }'
+ ```
+
+ Returns:
+ dict: A dictionary containing the health check result with either success information or error details.
+ """
+ try:
+ # Include health_check_params if provided
+ litellm_params = _update_litellm_params_for_health_check(
+ model_info={},
+ litellm_params=litellm_params,
+ )
+ mode = mode or litellm_params.pop("mode", None)
+ result = await run_with_timeout(
+ litellm.ahealth_check(
+ model_params=litellm_params,
+ mode=mode,
+ prompt="test from litellm",
+ input=["test from litellm"],
+ ),
+ HEALTH_CHECK_TIMEOUT_SECONDS,
+ )
+
+ # Clean the result for display
+ cleaned_result = _clean_endpoint_data(
+ {**litellm_params, **result}, details=True
+ )
+
+ return {
+ "status": "error" if "error" in result else "success",
+ "result": cleaned_result,
+ }
+
+ except Exception as e:
+ verbose_proxy_logger.error(
+ f"litellm.proxy.health_endpoints.test_model_connection(): Exception occurred - {str(e)}"
+ )
+ verbose_proxy_logger.debug(traceback.format_exc())
+ raise HTTPException(
+ status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
+ detail={"error": f"Failed to test connection: {str(e)}"},
+ )