about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/custom_logger.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/custom_logger.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/custom_logger.py388
1 files changed, 388 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/custom_logger.py b/.venv/lib/python3.12/site-packages/litellm/integrations/custom_logger.py
new file mode 100644
index 00000000..6f1ec88d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/custom_logger.py
@@ -0,0 +1,388 @@
+#### What this does ####
+#    On success, logs events to Promptlayer
+import traceback
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    AsyncGenerator,
+    List,
+    Literal,
+    Optional,
+    Tuple,
+    Union,
+)
+
+from pydantic import BaseModel
+
+from litellm.caching.caching import DualCache
+from litellm.proxy._types import UserAPIKeyAuth
+from litellm.types.integrations.argilla import ArgillaItem
+from litellm.types.llms.openai import AllMessageValues, ChatCompletionRequest
+from litellm.types.utils import (
+    AdapterCompletionStreamWrapper,
+    EmbeddingResponse,
+    ImageResponse,
+    ModelResponse,
+    ModelResponseStream,
+    StandardCallbackDynamicParams,
+    StandardLoggingPayload,
+)
+
+if TYPE_CHECKING:
+    from opentelemetry.trace import Span as _Span
+
+    Span = _Span
+else:
+    Span = Any
+
+
+class CustomLogger:  # https://docs.litellm.ai/docs/observability/custom_callback#callback-class
+    # Class variables or attributes
+    def __init__(self, message_logging: bool = True) -> None:
+        self.message_logging = message_logging
+        pass
+
+    def log_pre_api_call(self, model, messages, kwargs):
+        pass
+
+    def log_post_api_call(self, kwargs, response_obj, start_time, end_time):
+        pass
+
+    def log_stream_event(self, kwargs, response_obj, start_time, end_time):
+        pass
+
+    def log_success_event(self, kwargs, response_obj, start_time, end_time):
+        pass
+
+    def log_failure_event(self, kwargs, response_obj, start_time, end_time):
+        pass
+
+    #### ASYNC ####
+
+    async def async_log_stream_event(self, kwargs, response_obj, start_time, end_time):
+        pass
+
+    async def async_log_pre_api_call(self, model, messages, kwargs):
+        pass
+
+    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
+        pass
+
+    async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
+        pass
+
+    #### PROMPT MANAGEMENT HOOKS ####
+
+    async def async_get_chat_completion_prompt(
+        self,
+        model: str,
+        messages: List[AllMessageValues],
+        non_default_params: dict,
+        prompt_id: str,
+        prompt_variables: Optional[dict],
+        dynamic_callback_params: StandardCallbackDynamicParams,
+    ) -> Tuple[str, List[AllMessageValues], dict]:
+        """
+        Returns:
+        - model: str - the model to use (can be pulled from prompt management tool)
+        - messages: List[AllMessageValues] - the messages to use (can be pulled from prompt management tool)
+        - non_default_params: dict - update with any optional params (e.g. temperature, max_tokens, etc.) to use (can be pulled from prompt management tool)
+        """
+        return model, messages, non_default_params
+
+    def get_chat_completion_prompt(
+        self,
+        model: str,
+        messages: List[AllMessageValues],
+        non_default_params: dict,
+        prompt_id: str,
+        prompt_variables: Optional[dict],
+        dynamic_callback_params: StandardCallbackDynamicParams,
+    ) -> Tuple[str, List[AllMessageValues], dict]:
+        """
+        Returns:
+        - model: str - the model to use (can be pulled from prompt management tool)
+        - messages: List[AllMessageValues] - the messages to use (can be pulled from prompt management tool)
+        - non_default_params: dict - update with any optional params (e.g. temperature, max_tokens, etc.) to use (can be pulled from prompt management tool)
+        """
+        return model, messages, non_default_params
+
+    #### PRE-CALL CHECKS - router/proxy only ####
+    """
+    Allows usage-based-routing-v2 to run pre-call rpm checks within the picked deployment's semaphore (concurrency-safe tpm/rpm checks).
+    """
+
+    async def async_filter_deployments(
+        self,
+        model: str,
+        healthy_deployments: List,
+        messages: Optional[List[AllMessageValues]],
+        request_kwargs: Optional[dict] = None,
+        parent_otel_span: Optional[Span] = None,
+    ) -> List[dict]:
+        return healthy_deployments
+
+    async def async_pre_call_check(
+        self, deployment: dict, parent_otel_span: Optional[Span]
+    ) -> Optional[dict]:
+        pass
+
+    def pre_call_check(self, deployment: dict) -> Optional[dict]:
+        pass
+
+    #### Fallback Events - router/proxy only ####
+    async def log_model_group_rate_limit_error(
+        self, exception: Exception, original_model_group: Optional[str], kwargs: dict
+    ):
+        pass
+
+    async def log_success_fallback_event(
+        self, original_model_group: str, kwargs: dict, original_exception: Exception
+    ):
+        pass
+
+    async def log_failure_fallback_event(
+        self, original_model_group: str, kwargs: dict, original_exception: Exception
+    ):
+        pass
+
+    #### ADAPTERS #### Allow calling 100+ LLMs in custom format - https://github.com/BerriAI/litellm/pulls
+
+    def translate_completion_input_params(
+        self, kwargs
+    ) -> Optional[ChatCompletionRequest]:
+        """
+        Translates the input params, from the provider's native format to the litellm.completion() format.
+        """
+        pass
+
+    def translate_completion_output_params(
+        self, response: ModelResponse
+    ) -> Optional[BaseModel]:
+        """
+        Translates the output params, from the OpenAI format to the custom format.
+        """
+        pass
+
+    def translate_completion_output_params_streaming(
+        self, completion_stream: Any
+    ) -> Optional[AdapterCompletionStreamWrapper]:
+        """
+        Translates the streaming chunk, from the OpenAI format to the custom format.
+        """
+        pass
+
+    ### DATASET HOOKS #### - currently only used for Argilla
+
+    async def async_dataset_hook(
+        self,
+        logged_item: ArgillaItem,
+        standard_logging_payload: Optional[StandardLoggingPayload],
+    ) -> Optional[ArgillaItem]:
+        """
+        - Decide if the result should be logged to Argilla.
+        - Modify the result before logging to Argilla.
+        - Return None if the result should not be logged to Argilla.
+        """
+        raise NotImplementedError("async_dataset_hook not implemented")
+
+    #### CALL HOOKS - proxy only ####
+    """
+    Control the modify incoming / outgoung data before calling the model
+    """
+
+    async def async_pre_call_hook(
+        self,
+        user_api_key_dict: UserAPIKeyAuth,
+        cache: DualCache,
+        data: dict,
+        call_type: Literal[
+            "completion",
+            "text_completion",
+            "embeddings",
+            "image_generation",
+            "moderation",
+            "audio_transcription",
+            "pass_through_endpoint",
+            "rerank",
+        ],
+    ) -> Optional[
+        Union[Exception, str, dict]
+    ]:  # raise exception if invalid, return a str for the user to receive - if rejected, or return a modified dictionary for passing into litellm
+        pass
+
+    async def async_post_call_failure_hook(
+        self,
+        request_data: dict,
+        original_exception: Exception,
+        user_api_key_dict: UserAPIKeyAuth,
+    ):
+        pass
+
+    async def async_post_call_success_hook(
+        self,
+        data: dict,
+        user_api_key_dict: UserAPIKeyAuth,
+        response: Union[Any, ModelResponse, EmbeddingResponse, ImageResponse],
+    ) -> Any:
+        pass
+
+    async def async_logging_hook(
+        self, kwargs: dict, result: Any, call_type: str
+    ) -> Tuple[dict, Any]:
+        """For masking logged request/response. Return a modified version of the request/result."""
+        return kwargs, result
+
+    def logging_hook(
+        self, kwargs: dict, result: Any, call_type: str
+    ) -> Tuple[dict, Any]:
+        """For masking logged request/response. Return a modified version of the request/result."""
+        return kwargs, result
+
+    async def async_moderation_hook(
+        self,
+        data: dict,
+        user_api_key_dict: UserAPIKeyAuth,
+        call_type: Literal[
+            "completion",
+            "embeddings",
+            "image_generation",
+            "moderation",
+            "audio_transcription",
+            "responses",
+        ],
+    ) -> Any:
+        pass
+
+    async def async_post_call_streaming_hook(
+        self,
+        user_api_key_dict: UserAPIKeyAuth,
+        response: str,
+    ) -> Any:
+        pass
+
+    async def async_post_call_streaming_iterator_hook(
+        self,
+        user_api_key_dict: UserAPIKeyAuth,
+        response: Any,
+        request_data: dict,
+    ) -> AsyncGenerator[ModelResponseStream, None]:
+        async for item in response:
+            yield item
+
+    #### SINGLE-USE #### - https://docs.litellm.ai/docs/observability/custom_callback#using-your-custom-callback-function
+
+    def log_input_event(self, model, messages, kwargs, print_verbose, callback_func):
+        try:
+            kwargs["model"] = model
+            kwargs["messages"] = messages
+            kwargs["log_event_type"] = "pre_api_call"
+            callback_func(
+                kwargs,
+            )
+            print_verbose(f"Custom Logger - model call details: {kwargs}")
+        except Exception:
+            print_verbose(f"Custom Logger Error - {traceback.format_exc()}")
+
+    async def async_log_input_event(
+        self, model, messages, kwargs, print_verbose, callback_func
+    ):
+        try:
+            kwargs["model"] = model
+            kwargs["messages"] = messages
+            kwargs["log_event_type"] = "pre_api_call"
+            await callback_func(
+                kwargs,
+            )
+            print_verbose(f"Custom Logger - model call details: {kwargs}")
+        except Exception:
+            print_verbose(f"Custom Logger Error - {traceback.format_exc()}")
+
+    def log_event(
+        self, kwargs, response_obj, start_time, end_time, print_verbose, callback_func
+    ):
+        # Method definition
+        try:
+            kwargs["log_event_type"] = "post_api_call"
+            callback_func(
+                kwargs,  # kwargs to func
+                response_obj,
+                start_time,
+                end_time,
+            )
+        except Exception:
+            print_verbose(f"Custom Logger Error - {traceback.format_exc()}")
+            pass
+
+    async def async_log_event(
+        self, kwargs, response_obj, start_time, end_time, print_verbose, callback_func
+    ):
+        # Method definition
+        try:
+            kwargs["log_event_type"] = "post_api_call"
+            await callback_func(
+                kwargs,  # kwargs to func
+                response_obj,
+                start_time,
+                end_time,
+            )
+        except Exception:
+            print_verbose(f"Custom Logger Error - {traceback.format_exc()}")
+            pass
+
+    # Useful helpers for custom logger classes
+
+    def truncate_standard_logging_payload_content(
+        self,
+        standard_logging_object: StandardLoggingPayload,
+    ):
+        """
+        Truncate error strings and message content in logging payload
+
+        Some loggers like DataDog/ GCS Bucket have a limit on the size of the payload. (1MB)
+
+        This function truncates the error string and the message content if they exceed a certain length.
+        """
+        MAX_STR_LENGTH = 10_000
+
+        # Truncate fields that might exceed max length
+        fields_to_truncate = ["error_str", "messages", "response"]
+        for field in fields_to_truncate:
+            self._truncate_field(
+                standard_logging_object=standard_logging_object,
+                field_name=field,
+                max_length=MAX_STR_LENGTH,
+            )
+
+    def _truncate_field(
+        self,
+        standard_logging_object: StandardLoggingPayload,
+        field_name: str,
+        max_length: int,
+    ) -> None:
+        """
+        Helper function to truncate a field in the logging payload
+
+        This converts the field to a string and then truncates it if it exceeds the max length.
+
+        Why convert to string ?
+        1. User was sending a poorly formatted list for `messages` field, we could not predict where they would send content
+            - Converting to string and then truncating the logged content catches this
+        2. We want to avoid modifying the original `messages`, `response`, and `error_str` in the logging payload since these are in kwargs and could be returned to the user
+        """
+        field_value = standard_logging_object.get(field_name)  # type: ignore
+        if field_value:
+            str_value = str(field_value)
+            if len(str_value) > max_length:
+                standard_logging_object[field_name] = self._truncate_text(  # type: ignore
+                    text=str_value, max_length=max_length
+                )
+
+    def _truncate_text(self, text: str, max_length: int) -> str:
+        """Truncate text if it exceeds max_length"""
+        return (
+            text[:max_length]
+            + "...truncated by litellm, this logger does not support large content"
+            if len(text) > max_length
+            else text
+        )