aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/logfire_logger.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/logfire_logger.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/logfire_logger.py179
1 files changed, 179 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/logfire_logger.py b/.venv/lib/python3.12/site-packages/litellm/integrations/logfire_logger.py
new file mode 100644
index 00000000..516bd4a8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/logfire_logger.py
@@ -0,0 +1,179 @@
+#### What this does ####
+# On success + failure, log events to Logfire
+
+import os
+import traceback
+import uuid
+from enum import Enum
+from typing import Any, Dict, NamedTuple
+
+from typing_extensions import LiteralString
+
+from litellm._logging import print_verbose, verbose_logger
+from litellm.litellm_core_utils.redact_messages import redact_user_api_key_info
+
+
+class SpanConfig(NamedTuple):
+ message_template: LiteralString
+ span_data: Dict[str, Any]
+
+
+class LogfireLevel(str, Enum):
+ INFO = "info"
+ ERROR = "error"
+
+
+class LogfireLogger:
+ # Class variables or attributes
+ def __init__(self):
+ try:
+ verbose_logger.debug("in init logfire logger")
+ import logfire
+
+ # only setting up logfire if we are sending to logfire
+ # in testing, we don't want to send to logfire
+ if logfire.DEFAULT_LOGFIRE_INSTANCE.config.send_to_logfire:
+ logfire.configure(token=os.getenv("LOGFIRE_TOKEN"))
+ except Exception as e:
+ print_verbose(f"Got exception on init logfire client {str(e)}")
+ raise e
+
+ def _get_span_config(self, payload) -> SpanConfig:
+ if (
+ payload["call_type"] == "completion"
+ or payload["call_type"] == "acompletion"
+ ):
+ return SpanConfig(
+ message_template="Chat Completion with {request_data[model]!r}",
+ span_data={"request_data": payload},
+ )
+ elif (
+ payload["call_type"] == "embedding" or payload["call_type"] == "aembedding"
+ ):
+ return SpanConfig(
+ message_template="Embedding Creation with {request_data[model]!r}",
+ span_data={"request_data": payload},
+ )
+ elif (
+ payload["call_type"] == "image_generation"
+ or payload["call_type"] == "aimage_generation"
+ ):
+ return SpanConfig(
+ message_template="Image Generation with {request_data[model]!r}",
+ span_data={"request_data": payload},
+ )
+ else:
+ return SpanConfig(
+ message_template="Litellm Call with {request_data[model]!r}",
+ span_data={"request_data": payload},
+ )
+
+ async def _async_log_event(
+ self,
+ kwargs,
+ response_obj,
+ start_time,
+ end_time,
+ print_verbose,
+ level: LogfireLevel,
+ ):
+ self.log_event(
+ kwargs=kwargs,
+ response_obj=response_obj,
+ start_time=start_time,
+ end_time=end_time,
+ print_verbose=print_verbose,
+ level=level,
+ )
+
+ def log_event(
+ self,
+ kwargs,
+ start_time,
+ end_time,
+ print_verbose,
+ level: LogfireLevel,
+ response_obj,
+ ):
+ try:
+ import logfire
+
+ verbose_logger.debug(
+ f"logfire Logging - Enters logging function for model {kwargs}"
+ )
+
+ if not response_obj:
+ response_obj = {}
+ litellm_params = kwargs.get("litellm_params", {})
+ metadata = (
+ litellm_params.get("metadata", {}) or {}
+ ) # if litellm_params['metadata'] == None
+ messages = kwargs.get("messages")
+ optional_params = kwargs.get("optional_params", {})
+ call_type = kwargs.get("call_type", "completion")
+ cache_hit = kwargs.get("cache_hit", False)
+ usage = response_obj.get("usage", {})
+ id = response_obj.get("id", str(uuid.uuid4()))
+ try:
+ response_time = (end_time - start_time).total_seconds()
+ except Exception:
+ response_time = None
+
+ # Clean Metadata before logging - never log raw metadata
+ # the raw metadata can contain circular references which leads to infinite recursion
+ # we clean out all extra litellm metadata params before logging
+ clean_metadata = {}
+ if isinstance(metadata, dict):
+ for key, value in metadata.items():
+ # clean litellm metadata before logging
+ if key in [
+ "endpoint",
+ "caching_groups",
+ "previous_models",
+ ]:
+ continue
+ else:
+ clean_metadata[key] = value
+
+ clean_metadata = redact_user_api_key_info(metadata=clean_metadata)
+
+ # Build the initial payload
+ payload = {
+ "id": id,
+ "call_type": call_type,
+ "cache_hit": cache_hit,
+ "startTime": start_time,
+ "endTime": end_time,
+ "responseTime (seconds)": response_time,
+ "model": kwargs.get("model", ""),
+ "user": kwargs.get("user", ""),
+ "modelParameters": optional_params,
+ "spend": kwargs.get("response_cost", 0),
+ "messages": messages,
+ "response": response_obj,
+ "usage": usage,
+ "metadata": clean_metadata,
+ }
+ logfire_openai = logfire.with_settings(custom_scope_suffix="openai")
+ message_template, span_data = self._get_span_config(payload)
+ if level == LogfireLevel.INFO:
+ logfire_openai.info(
+ message_template,
+ **span_data,
+ )
+ elif level == LogfireLevel.ERROR:
+ logfire_openai.error(
+ message_template,
+ **span_data,
+ _exc_info=True,
+ )
+ print_verbose(f"\ndd Logger - Logging payload = {payload}")
+
+ print_verbose(
+ f"Logfire Layer Logging - final response object: {response_obj}"
+ )
+ except Exception as e:
+ verbose_logger.debug(
+ f"Logfire Layer Error - {str(e)}\n{traceback.format_exc()}"
+ )
+ pass