aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/_service_logger.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/_service_logger.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/_service_logger.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/_service_logger.py309
1 files changed, 309 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/_service_logger.py b/.venv/lib/python3.12/site-packages/litellm/_service_logger.py
new file mode 100644
index 00000000..0b4f22e2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/_service_logger.py
@@ -0,0 +1,309 @@
+import asyncio
+from datetime import datetime, timedelta
+from typing import TYPE_CHECKING, Any, Optional, Union
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.proxy._types import UserAPIKeyAuth
+
+from .integrations.custom_logger import CustomLogger
+from .integrations.datadog.datadog import DataDogLogger
+from .integrations.opentelemetry import OpenTelemetry
+from .integrations.prometheus_services import PrometheusServicesLogger
+from .types.services import ServiceLoggerPayload, ServiceTypes
+
+if TYPE_CHECKING:
+ from opentelemetry.trace import Span as _Span
+
+ Span = _Span
+ OTELClass = OpenTelemetry
+else:
+ Span = Any
+ OTELClass = Any
+
+
+class ServiceLogging(CustomLogger):
+ """
+ Separate class used for monitoring health of litellm-adjacent services (redis/postgres).
+ """
+
+ def __init__(self, mock_testing: bool = False) -> None:
+ self.mock_testing = mock_testing
+ self.mock_testing_sync_success_hook = 0
+ self.mock_testing_async_success_hook = 0
+ self.mock_testing_sync_failure_hook = 0
+ self.mock_testing_async_failure_hook = 0
+ if "prometheus_system" in litellm.service_callback:
+ self.prometheusServicesLogger = PrometheusServicesLogger()
+
+ def service_success_hook(
+ self,
+ service: ServiceTypes,
+ duration: float,
+ call_type: str,
+ parent_otel_span: Optional[Span] = None,
+ start_time: Optional[Union[datetime, float]] = None,
+ end_time: Optional[Union[float, datetime]] = None,
+ ):
+ """
+ Handles both sync and async monitoring by checking for existing event loop.
+ """
+
+ if self.mock_testing:
+ self.mock_testing_sync_success_hook += 1
+
+ try:
+ # Try to get the current event loop
+ loop = asyncio.get_event_loop()
+ # Check if the loop is running
+ if loop.is_running():
+ # If we're in a running loop, create a task
+ loop.create_task(
+ self.async_service_success_hook(
+ service=service,
+ duration=duration,
+ call_type=call_type,
+ parent_otel_span=parent_otel_span,
+ start_time=start_time,
+ end_time=end_time,
+ )
+ )
+ else:
+ # Loop exists but not running, we can use run_until_complete
+ loop.run_until_complete(
+ self.async_service_success_hook(
+ service=service,
+ duration=duration,
+ call_type=call_type,
+ parent_otel_span=parent_otel_span,
+ start_time=start_time,
+ end_time=end_time,
+ )
+ )
+ except RuntimeError:
+ # No event loop exists, create a new one and run
+ asyncio.run(
+ self.async_service_success_hook(
+ service=service,
+ duration=duration,
+ call_type=call_type,
+ parent_otel_span=parent_otel_span,
+ start_time=start_time,
+ end_time=end_time,
+ )
+ )
+
+ def service_failure_hook(
+ self, service: ServiceTypes, duration: float, error: Exception, call_type: str
+ ):
+ """
+ [TODO] Not implemented for sync calls yet. V0 is focused on async monitoring (used by proxy).
+ """
+ if self.mock_testing:
+ self.mock_testing_sync_failure_hook += 1
+
+ async def async_service_success_hook(
+ self,
+ service: ServiceTypes,
+ call_type: str,
+ duration: float,
+ parent_otel_span: Optional[Span] = None,
+ start_time: Optional[Union[datetime, float]] = None,
+ end_time: Optional[Union[datetime, float]] = None,
+ event_metadata: Optional[dict] = None,
+ ):
+ """
+ - For counting if the redis, postgres call is successful
+ """
+ if self.mock_testing:
+ self.mock_testing_async_success_hook += 1
+
+ payload = ServiceLoggerPayload(
+ is_error=False,
+ error=None,
+ service=service,
+ duration=duration,
+ call_type=call_type,
+ )
+
+ for callback in litellm.service_callback:
+ if callback == "prometheus_system":
+ await self.init_prometheus_services_logger_if_none()
+ await self.prometheusServicesLogger.async_service_success_hook(
+ payload=payload
+ )
+ elif callback == "datadog" or isinstance(callback, DataDogLogger):
+ await self.init_datadog_logger_if_none()
+ await self.dd_logger.async_service_success_hook(
+ payload=payload,
+ parent_otel_span=parent_otel_span,
+ start_time=start_time,
+ end_time=end_time,
+ event_metadata=event_metadata,
+ )
+ elif callback == "otel" or isinstance(callback, OpenTelemetry):
+ from litellm.proxy.proxy_server import open_telemetry_logger
+
+ await self.init_otel_logger_if_none()
+
+ if (
+ parent_otel_span is not None
+ and open_telemetry_logger is not None
+ and isinstance(open_telemetry_logger, OpenTelemetry)
+ ):
+ await self.otel_logger.async_service_success_hook(
+ payload=payload,
+ parent_otel_span=parent_otel_span,
+ start_time=start_time,
+ end_time=end_time,
+ event_metadata=event_metadata,
+ )
+
+ async def init_prometheus_services_logger_if_none(self):
+ """
+ initializes prometheusServicesLogger if it is None or no attribute exists on ServiceLogging Object
+
+ """
+ if not hasattr(self, "prometheusServicesLogger"):
+ self.prometheusServicesLogger = PrometheusServicesLogger()
+ elif self.prometheusServicesLogger is None:
+ self.prometheusServicesLogger = self.prometheusServicesLogger()
+ return
+
+ async def init_datadog_logger_if_none(self):
+ """
+ initializes dd_logger if it is None or no attribute exists on ServiceLogging Object
+
+ """
+ from litellm.integrations.datadog.datadog import DataDogLogger
+
+ if not hasattr(self, "dd_logger"):
+ self.dd_logger: DataDogLogger = DataDogLogger()
+
+ return
+
+ async def init_otel_logger_if_none(self):
+ """
+ initializes otel_logger if it is None or no attribute exists on ServiceLogging Object
+
+ """
+ from litellm.proxy.proxy_server import open_telemetry_logger
+
+ if not hasattr(self, "otel_logger"):
+ if open_telemetry_logger is not None and isinstance(
+ open_telemetry_logger, OpenTelemetry
+ ):
+ self.otel_logger: OpenTelemetry = open_telemetry_logger
+ else:
+ verbose_logger.warning(
+ "ServiceLogger: open_telemetry_logger is None or not an instance of OpenTelemetry"
+ )
+ return
+
+ async def async_service_failure_hook(
+ self,
+ service: ServiceTypes,
+ duration: float,
+ error: Union[str, Exception],
+ call_type: str,
+ parent_otel_span: Optional[Span] = None,
+ start_time: Optional[Union[datetime, float]] = None,
+ end_time: Optional[Union[float, datetime]] = None,
+ event_metadata: Optional[dict] = None,
+ ):
+ """
+ - For counting if the redis, postgres call is unsuccessful
+ """
+ if self.mock_testing:
+ self.mock_testing_async_failure_hook += 1
+
+ error_message = ""
+ if isinstance(error, Exception):
+ error_message = str(error)
+ elif isinstance(error, str):
+ error_message = error
+
+ payload = ServiceLoggerPayload(
+ is_error=True,
+ error=error_message,
+ service=service,
+ duration=duration,
+ call_type=call_type,
+ )
+
+ for callback in litellm.service_callback:
+ if callback == "prometheus_system":
+ await self.init_prometheus_services_logger_if_none()
+ await self.prometheusServicesLogger.async_service_failure_hook(
+ payload=payload,
+ error=error,
+ )
+ elif callback == "datadog" or isinstance(callback, DataDogLogger):
+ await self.init_datadog_logger_if_none()
+ await self.dd_logger.async_service_failure_hook(
+ payload=payload,
+ error=error_message,
+ parent_otel_span=parent_otel_span,
+ start_time=start_time,
+ end_time=end_time,
+ event_metadata=event_metadata,
+ )
+ elif callback == "otel" or isinstance(callback, OpenTelemetry):
+ from litellm.proxy.proxy_server import open_telemetry_logger
+
+ await self.init_otel_logger_if_none()
+
+ if not isinstance(error, str):
+ error = str(error)
+
+ if (
+ parent_otel_span is not None
+ and open_telemetry_logger is not None
+ and isinstance(open_telemetry_logger, OpenTelemetry)
+ ):
+ await self.otel_logger.async_service_success_hook(
+ payload=payload,
+ parent_otel_span=parent_otel_span,
+ start_time=start_time,
+ end_time=end_time,
+ event_metadata=event_metadata,
+ )
+
+ async def async_post_call_failure_hook(
+ self,
+ request_data: dict,
+ original_exception: Exception,
+ user_api_key_dict: UserAPIKeyAuth,
+ ):
+ """
+ Hook to track failed litellm-service calls
+ """
+ return await super().async_post_call_failure_hook(
+ request_data,
+ original_exception,
+ user_api_key_dict,
+ )
+
+ async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
+ """
+ Hook to track latency for litellm proxy llm api calls
+ """
+ try:
+ _duration = end_time - start_time
+ if isinstance(_duration, timedelta):
+ _duration = _duration.total_seconds()
+ elif isinstance(_duration, float):
+ pass
+ else:
+ raise Exception(
+ "Duration={} is not a float or timedelta object. type={}".format(
+ _duration, type(_duration)
+ )
+ ) # invalid _duration value
+ await self.async_service_success_hook(
+ service=ServiceTypes.LITELLM,
+ duration=_duration,
+ call_type=kwargs["call_type"],
+ )
+ except Exception as e:
+ raise e