aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/opentelemetry.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/opentelemetry.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/opentelemetry.py1023
1 files changed, 1023 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/opentelemetry.py b/.venv/lib/python3.12/site-packages/litellm/integrations/opentelemetry.py
new file mode 100644
index 00000000..1572eb81
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/opentelemetry.py
@@ -0,0 +1,1023 @@
+import os
+from dataclasses import dataclass
+from datetime import datetime
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.integrations.custom_logger import CustomLogger
+from litellm.types.services import ServiceLoggerPayload
+from litellm.types.utils import (
+ ChatCompletionMessageToolCall,
+ Function,
+ StandardCallbackDynamicParams,
+ StandardLoggingPayload,
+)
+
+if TYPE_CHECKING:
+ from opentelemetry.sdk.trace.export import SpanExporter as _SpanExporter
+ from opentelemetry.trace import Span as _Span
+
+ from litellm.proxy._types import (
+ ManagementEndpointLoggingPayload as _ManagementEndpointLoggingPayload,
+ )
+ from litellm.proxy.proxy_server import UserAPIKeyAuth as _UserAPIKeyAuth
+
+ Span = _Span
+ SpanExporter = _SpanExporter
+ UserAPIKeyAuth = _UserAPIKeyAuth
+ ManagementEndpointLoggingPayload = _ManagementEndpointLoggingPayload
+else:
+ Span = Any
+ SpanExporter = Any
+ UserAPIKeyAuth = Any
+ ManagementEndpointLoggingPayload = Any
+
+
+LITELLM_TRACER_NAME = os.getenv("OTEL_TRACER_NAME", "litellm")
+LITELLM_RESOURCE: Dict[Any, Any] = {
+ "service.name": os.getenv("OTEL_SERVICE_NAME", "litellm"),
+ "deployment.environment": os.getenv("OTEL_ENVIRONMENT_NAME", "production"),
+ "model_id": os.getenv("OTEL_SERVICE_NAME", "litellm"),
+}
+RAW_REQUEST_SPAN_NAME = "raw_gen_ai_request"
+LITELLM_REQUEST_SPAN_NAME = "litellm_request"
+
+
+@dataclass
+class OpenTelemetryConfig:
+
+ exporter: Union[str, SpanExporter] = "console"
+ endpoint: Optional[str] = None
+ headers: Optional[str] = None
+
+ @classmethod
+ def from_env(cls):
+ """
+ OTEL_HEADERS=x-honeycomb-team=B85YgLm9****
+ OTEL_EXPORTER="otlp_http"
+ OTEL_ENDPOINT="https://api.honeycomb.io/v1/traces"
+
+ OTEL_HEADERS gets sent as headers = {"x-honeycomb-team": "B85YgLm96******"}
+ """
+ from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
+ InMemorySpanExporter,
+ )
+
+ if os.getenv("OTEL_EXPORTER") == "in_memory":
+ return cls(exporter=InMemorySpanExporter())
+ return cls(
+ exporter=os.getenv("OTEL_EXPORTER", "console"),
+ endpoint=os.getenv("OTEL_ENDPOINT"),
+ headers=os.getenv(
+ "OTEL_HEADERS"
+ ), # example: OTEL_HEADERS=x-honeycomb-team=B85YgLm96***"
+ )
+
+
+class OpenTelemetry(CustomLogger):
+ def __init__(
+ self,
+ config: Optional[OpenTelemetryConfig] = None,
+ callback_name: Optional[str] = None,
+ **kwargs,
+ ):
+ from opentelemetry import trace
+ from opentelemetry.sdk.resources import Resource
+ from opentelemetry.sdk.trace import TracerProvider
+ from opentelemetry.trace import SpanKind
+
+ if config is None:
+ config = OpenTelemetryConfig.from_env()
+
+ self.config = config
+ self.OTEL_EXPORTER = self.config.exporter
+ self.OTEL_ENDPOINT = self.config.endpoint
+ self.OTEL_HEADERS = self.config.headers
+ provider = TracerProvider(resource=Resource(attributes=LITELLM_RESOURCE))
+ provider.add_span_processor(self._get_span_processor())
+ self.callback_name = callback_name
+
+ trace.set_tracer_provider(provider)
+ self.tracer = trace.get_tracer(LITELLM_TRACER_NAME)
+
+ self.span_kind = SpanKind
+
+ _debug_otel = str(os.getenv("DEBUG_OTEL", "False")).lower()
+
+ if _debug_otel == "true":
+ # Set up logging
+ import logging
+
+ logging.basicConfig(level=logging.DEBUG)
+ logging.getLogger(__name__)
+
+ # Enable OpenTelemetry logging
+ otel_exporter_logger = logging.getLogger("opentelemetry.sdk.trace.export")
+ otel_exporter_logger.setLevel(logging.DEBUG)
+
+ # init CustomLogger params
+ super().__init__(**kwargs)
+ self._init_otel_logger_on_litellm_proxy()
+
+ def _init_otel_logger_on_litellm_proxy(self):
+ """
+ Initializes OpenTelemetry for litellm proxy server
+
+ - Adds Otel as a service callback
+ - Sets `proxy_server.open_telemetry_logger` to self
+ """
+ from litellm.proxy import proxy_server
+
+ # Add Otel as a service callback
+ if "otel" not in litellm.service_callback:
+ litellm.service_callback.append("otel")
+ setattr(proxy_server, "open_telemetry_logger", self)
+
+ def log_success_event(self, kwargs, response_obj, start_time, end_time):
+ self._handle_sucess(kwargs, response_obj, start_time, end_time)
+
+ def log_failure_event(self, kwargs, response_obj, start_time, end_time):
+ self._handle_failure(kwargs, response_obj, start_time, end_time)
+
+ async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
+ self._handle_sucess(kwargs, response_obj, start_time, end_time)
+
+ async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
+ self._handle_failure(kwargs, response_obj, start_time, end_time)
+
+ async def async_service_success_hook(
+ self,
+ payload: ServiceLoggerPayload,
+ parent_otel_span: Optional[Span] = None,
+ start_time: Optional[Union[datetime, float]] = None,
+ end_time: Optional[Union[datetime, float]] = None,
+ event_metadata: Optional[dict] = None,
+ ):
+
+ from opentelemetry import trace
+ from opentelemetry.trace import Status, StatusCode
+
+ _start_time_ns = 0
+ _end_time_ns = 0
+
+ if isinstance(start_time, float):
+ _start_time_ns = int(start_time * 1e9)
+ else:
+ _start_time_ns = self._to_ns(start_time)
+
+ if isinstance(end_time, float):
+ _end_time_ns = int(end_time * 1e9)
+ else:
+ _end_time_ns = self._to_ns(end_time)
+
+ if parent_otel_span is not None:
+ _span_name = payload.service
+ service_logging_span = self.tracer.start_span(
+ name=_span_name,
+ context=trace.set_span_in_context(parent_otel_span),
+ start_time=_start_time_ns,
+ )
+ self.safe_set_attribute(
+ span=service_logging_span,
+ key="call_type",
+ value=payload.call_type,
+ )
+ self.safe_set_attribute(
+ span=service_logging_span,
+ key="service",
+ value=payload.service.value,
+ )
+
+ if event_metadata:
+ for key, value in event_metadata.items():
+ if value is None:
+ value = "None"
+ if isinstance(value, dict):
+ try:
+ value = str(value)
+ except Exception:
+ value = "litellm logging error - could_not_json_serialize"
+ self.safe_set_attribute(
+ span=service_logging_span,
+ key=key,
+ value=value,
+ )
+ service_logging_span.set_status(Status(StatusCode.OK))
+ service_logging_span.end(end_time=_end_time_ns)
+
+ async def async_service_failure_hook(
+ self,
+ payload: ServiceLoggerPayload,
+ error: Optional[str] = "",
+ parent_otel_span: Optional[Span] = None,
+ start_time: Optional[Union[datetime, float]] = None,
+ end_time: Optional[Union[float, datetime]] = None,
+ event_metadata: Optional[dict] = None,
+ ):
+
+ from opentelemetry import trace
+ from opentelemetry.trace import Status, StatusCode
+
+ _start_time_ns = 0
+ _end_time_ns = 0
+
+ if isinstance(start_time, float):
+ _start_time_ns = int(int(start_time) * 1e9)
+ else:
+ _start_time_ns = self._to_ns(start_time)
+
+ if isinstance(end_time, float):
+ _end_time_ns = int(int(end_time) * 1e9)
+ else:
+ _end_time_ns = self._to_ns(end_time)
+
+ if parent_otel_span is not None:
+ _span_name = payload.service
+ service_logging_span = self.tracer.start_span(
+ name=_span_name,
+ context=trace.set_span_in_context(parent_otel_span),
+ start_time=_start_time_ns,
+ )
+ self.safe_set_attribute(
+ span=service_logging_span,
+ key="call_type",
+ value=payload.call_type,
+ )
+ self.safe_set_attribute(
+ span=service_logging_span,
+ key="service",
+ value=payload.service.value,
+ )
+ if error:
+ self.safe_set_attribute(
+ span=service_logging_span,
+ key="error",
+ value=error,
+ )
+ if event_metadata:
+ for key, value in event_metadata.items():
+ if isinstance(value, dict):
+ try:
+ value = str(value)
+ except Exception:
+ value = "litllm logging error - could_not_json_serialize"
+ self.safe_set_attribute(
+ span=service_logging_span,
+ key=key,
+ value=value,
+ )
+
+ service_logging_span.set_status(Status(StatusCode.ERROR))
+ service_logging_span.end(end_time=_end_time_ns)
+
+ async def async_post_call_failure_hook(
+ self,
+ request_data: dict,
+ original_exception: Exception,
+ user_api_key_dict: UserAPIKeyAuth,
+ ):
+ from opentelemetry import trace
+ from opentelemetry.trace import Status, StatusCode
+
+ parent_otel_span = user_api_key_dict.parent_otel_span
+ if parent_otel_span is not None:
+ parent_otel_span.set_status(Status(StatusCode.ERROR))
+ _span_name = "Failed Proxy Server Request"
+
+ # Exception Logging Child Span
+ exception_logging_span = self.tracer.start_span(
+ name=_span_name,
+ context=trace.set_span_in_context(parent_otel_span),
+ )
+ self.safe_set_attribute(
+ span=exception_logging_span,
+ key="exception",
+ value=str(original_exception),
+ )
+ exception_logging_span.set_status(Status(StatusCode.ERROR))
+ exception_logging_span.end(end_time=self._to_ns(datetime.now()))
+
+ # End Parent OTEL Sspan
+ parent_otel_span.end(end_time=self._to_ns(datetime.now()))
+
+ def _handle_sucess(self, kwargs, response_obj, start_time, end_time):
+ from opentelemetry import trace
+ from opentelemetry.trace import Status, StatusCode
+
+ verbose_logger.debug(
+ "OpenTelemetry Logger: Logging kwargs: %s, OTEL config settings=%s",
+ kwargs,
+ self.config,
+ )
+ _parent_context, parent_otel_span = self._get_span_context(kwargs)
+
+ self._add_dynamic_span_processor_if_needed(kwargs)
+
+ # Span 1: Requst sent to litellm SDK
+ span = self.tracer.start_span(
+ name=self._get_span_name(kwargs),
+ start_time=self._to_ns(start_time),
+ context=_parent_context,
+ )
+ span.set_status(Status(StatusCode.OK))
+ self.set_attributes(span, kwargs, response_obj)
+
+ if litellm.turn_off_message_logging is True:
+ pass
+ elif self.message_logging is not True:
+ pass
+ else:
+ # Span 2: Raw Request / Response to LLM
+ raw_request_span = self.tracer.start_span(
+ name=RAW_REQUEST_SPAN_NAME,
+ start_time=self._to_ns(start_time),
+ context=trace.set_span_in_context(span),
+ )
+
+ raw_request_span.set_status(Status(StatusCode.OK))
+ self.set_raw_request_attributes(raw_request_span, kwargs, response_obj)
+ raw_request_span.end(end_time=self._to_ns(end_time))
+
+ span.end(end_time=self._to_ns(end_time))
+
+ if parent_otel_span is not None:
+ parent_otel_span.end(end_time=self._to_ns(datetime.now()))
+
+ def _add_dynamic_span_processor_if_needed(self, kwargs):
+ """
+ Helper method to add a span processor with dynamic headers if needed.
+
+ This allows for per-request configuration of telemetry exporters by
+ extracting headers from standard_callback_dynamic_params.
+ """
+ from opentelemetry import trace
+
+ standard_callback_dynamic_params: Optional[StandardCallbackDynamicParams] = (
+ kwargs.get("standard_callback_dynamic_params")
+ )
+ if not standard_callback_dynamic_params:
+ return
+
+ # Extract headers from dynamic params
+ dynamic_headers = {}
+
+ # Handle Arize headers
+ if standard_callback_dynamic_params.get("arize_space_key"):
+ dynamic_headers["space_key"] = standard_callback_dynamic_params.get(
+ "arize_space_key"
+ )
+ if standard_callback_dynamic_params.get("arize_api_key"):
+ dynamic_headers["api_key"] = standard_callback_dynamic_params.get(
+ "arize_api_key"
+ )
+
+ # Only create a span processor if we have headers to use
+ if len(dynamic_headers) > 0:
+ from opentelemetry.sdk.trace import TracerProvider
+
+ provider = trace.get_tracer_provider()
+ if isinstance(provider, TracerProvider):
+ span_processor = self._get_span_processor(
+ dynamic_headers=dynamic_headers
+ )
+ provider.add_span_processor(span_processor)
+
+ def _handle_failure(self, kwargs, response_obj, start_time, end_time):
+ from opentelemetry.trace import Status, StatusCode
+
+ verbose_logger.debug(
+ "OpenTelemetry Logger: Failure HandlerLogging kwargs: %s, OTEL config settings=%s",
+ kwargs,
+ self.config,
+ )
+ _parent_context, parent_otel_span = self._get_span_context(kwargs)
+
+ # Span 1: Requst sent to litellm SDK
+ span = self.tracer.start_span(
+ name=self._get_span_name(kwargs),
+ start_time=self._to_ns(start_time),
+ context=_parent_context,
+ )
+ span.set_status(Status(StatusCode.ERROR))
+ self.set_attributes(span, kwargs, response_obj)
+ span.end(end_time=self._to_ns(end_time))
+
+ if parent_otel_span is not None:
+ parent_otel_span.end(end_time=self._to_ns(datetime.now()))
+
+ def set_tools_attributes(self, span: Span, tools):
+ import json
+
+ from litellm.proxy._types import SpanAttributes
+
+ if not tools:
+ return
+
+ try:
+ for i, tool in enumerate(tools):
+ function = tool.get("function")
+ if not function:
+ continue
+
+ prefix = f"{SpanAttributes.LLM_REQUEST_FUNCTIONS}.{i}"
+ self.safe_set_attribute(
+ span=span,
+ key=f"{prefix}.name",
+ value=function.get("name"),
+ )
+ self.safe_set_attribute(
+ span=span,
+ key=f"{prefix}.description",
+ value=function.get("description"),
+ )
+ self.safe_set_attribute(
+ span=span,
+ key=f"{prefix}.parameters",
+ value=json.dumps(function.get("parameters")),
+ )
+ except Exception as e:
+ verbose_logger.error(
+ "OpenTelemetry: Error setting tools attributes: %s", str(e)
+ )
+ pass
+
+ def cast_as_primitive_value_type(self, value) -> Union[str, bool, int, float]:
+ """
+ Casts the value to a primitive OTEL type if it is not already a primitive type.
+
+ OTEL supports - str, bool, int, float
+
+ If it's not a primitive type, then it's converted to a string
+ """
+ if value is None:
+ return ""
+ if isinstance(value, (str, bool, int, float)):
+ return value
+ try:
+ return str(value)
+ except Exception:
+ return ""
+
+ @staticmethod
+ def _tool_calls_kv_pair(
+ tool_calls: List[ChatCompletionMessageToolCall],
+ ) -> Dict[str, Any]:
+ from litellm.proxy._types import SpanAttributes
+
+ kv_pairs: Dict[str, Any] = {}
+ for idx, tool_call in enumerate(tool_calls):
+ _function = tool_call.get("function")
+ if not _function:
+ continue
+
+ keys = Function.__annotations__.keys()
+ for key in keys:
+ _value = _function.get(key)
+ if _value:
+ kv_pairs[
+ f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.function_call.{key}"
+ ] = _value
+
+ return kv_pairs
+
+ def set_attributes( # noqa: PLR0915
+ self, span: Span, kwargs, response_obj: Optional[Any]
+ ):
+ try:
+ if self.callback_name == "arize_phoenix":
+ from litellm.integrations.arize.arize_phoenix import ArizePhoenixLogger
+
+ ArizePhoenixLogger.set_arize_phoenix_attributes(
+ span, kwargs, response_obj
+ )
+ return
+ elif self.callback_name == "langtrace":
+ from litellm.integrations.langtrace import LangtraceAttributes
+
+ LangtraceAttributes().set_langtrace_attributes(
+ span, kwargs, response_obj
+ )
+ return
+ from litellm.proxy._types import SpanAttributes
+
+ optional_params = kwargs.get("optional_params", {})
+ litellm_params = kwargs.get("litellm_params", {}) or {}
+ standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
+ "standard_logging_object"
+ )
+ if standard_logging_payload is None:
+ raise ValueError("standard_logging_object not found in kwargs")
+
+ # https://github.com/open-telemetry/semantic-conventions/blob/main/model/registry/gen-ai.yaml
+ # Following Conventions here: https://github.com/open-telemetry/semantic-conventions/blob/main/docs/gen-ai/llm-spans.md
+ #############################################
+ ############ LLM CALL METADATA ##############
+ #############################################
+ metadata = standard_logging_payload["metadata"]
+ for key, value in metadata.items():
+ self.safe_set_attribute(
+ span=span, key="metadata.{}".format(key), value=value
+ )
+
+ #############################################
+ ########## LLM Request Attributes ###########
+ #############################################
+
+ # The name of the LLM a request is being made to
+ if kwargs.get("model"):
+ self.safe_set_attribute(
+ span=span,
+ key=SpanAttributes.LLM_REQUEST_MODEL,
+ value=kwargs.get("model"),
+ )
+
+ # The LLM request type
+ self.safe_set_attribute(
+ span=span,
+ key=SpanAttributes.LLM_REQUEST_TYPE,
+ value=standard_logging_payload["call_type"],
+ )
+
+ # The Generative AI Provider: Azure, OpenAI, etc.
+ self.safe_set_attribute(
+ span=span,
+ key=SpanAttributes.LLM_SYSTEM,
+ value=litellm_params.get("custom_llm_provider", "Unknown"),
+ )
+
+ # The maximum number of tokens the LLM generates for a request.
+ if optional_params.get("max_tokens"):
+ self.safe_set_attribute(
+ span=span,
+ key=SpanAttributes.LLM_REQUEST_MAX_TOKENS,
+ value=optional_params.get("max_tokens"),
+ )
+
+ # The temperature setting for the LLM request.
+ if optional_params.get("temperature"):
+ self.safe_set_attribute(
+ span=span,
+ key=SpanAttributes.LLM_REQUEST_TEMPERATURE,
+ value=optional_params.get("temperature"),
+ )
+
+ # The top_p sampling setting for the LLM request.
+ if optional_params.get("top_p"):
+ self.safe_set_attribute(
+ span=span,
+ key=SpanAttributes.LLM_REQUEST_TOP_P,
+ value=optional_params.get("top_p"),
+ )
+
+ self.safe_set_attribute(
+ span=span,
+ key=SpanAttributes.LLM_IS_STREAMING,
+ value=str(optional_params.get("stream", False)),
+ )
+
+ if optional_params.get("user"):
+ self.safe_set_attribute(
+ span=span,
+ key=SpanAttributes.LLM_USER,
+ value=optional_params.get("user"),
+ )
+
+ # The unique identifier for the completion.
+ if response_obj and response_obj.get("id"):
+ self.safe_set_attribute(
+ span=span, key="gen_ai.response.id", value=response_obj.get("id")
+ )
+
+ # The model used to generate the response.
+ if response_obj and response_obj.get("model"):
+ self.safe_set_attribute(
+ span=span,
+ key=SpanAttributes.LLM_RESPONSE_MODEL,
+ value=response_obj.get("model"),
+ )
+
+ usage = response_obj and response_obj.get("usage")
+ if usage:
+ self.safe_set_attribute(
+ span=span,
+ key=SpanAttributes.LLM_USAGE_TOTAL_TOKENS,
+ value=usage.get("total_tokens"),
+ )
+
+ # The number of tokens used in the LLM response (completion).
+ self.safe_set_attribute(
+ span=span,
+ key=SpanAttributes.LLM_USAGE_COMPLETION_TOKENS,
+ value=usage.get("completion_tokens"),
+ )
+
+ # The number of tokens used in the LLM prompt.
+ self.safe_set_attribute(
+ span=span,
+ key=SpanAttributes.LLM_USAGE_PROMPT_TOKENS,
+ value=usage.get("prompt_tokens"),
+ )
+
+ ########################################################################
+ ########## LLM Request Medssages / tools / content Attributes ###########
+ #########################################################################
+
+ if litellm.turn_off_message_logging is True:
+ return
+ if self.message_logging is not True:
+ return
+
+ if optional_params.get("tools"):
+ tools = optional_params["tools"]
+ self.set_tools_attributes(span, tools)
+
+ if kwargs.get("messages"):
+ for idx, prompt in enumerate(kwargs.get("messages")):
+ if prompt.get("role"):
+ self.safe_set_attribute(
+ span=span,
+ key=f"{SpanAttributes.LLM_PROMPTS}.{idx}.role",
+ value=prompt.get("role"),
+ )
+
+ if prompt.get("content"):
+ if not isinstance(prompt.get("content"), str):
+ prompt["content"] = str(prompt.get("content"))
+ self.safe_set_attribute(
+ span=span,
+ key=f"{SpanAttributes.LLM_PROMPTS}.{idx}.content",
+ value=prompt.get("content"),
+ )
+ #############################################
+ ########## LLM Response Attributes ##########
+ #############################################
+ if response_obj is not None:
+ if response_obj.get("choices"):
+ for idx, choice in enumerate(response_obj.get("choices")):
+ if choice.get("finish_reason"):
+ self.safe_set_attribute(
+ span=span,
+ key=f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.finish_reason",
+ value=choice.get("finish_reason"),
+ )
+ if choice.get("message"):
+ if choice.get("message").get("role"):
+ self.safe_set_attribute(
+ span=span,
+ key=f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.role",
+ value=choice.get("message").get("role"),
+ )
+ if choice.get("message").get("content"):
+ if not isinstance(
+ choice.get("message").get("content"), str
+ ):
+ choice["message"]["content"] = str(
+ choice.get("message").get("content")
+ )
+ self.safe_set_attribute(
+ span=span,
+ key=f"{SpanAttributes.LLM_COMPLETIONS}.{idx}.content",
+ value=choice.get("message").get("content"),
+ )
+
+ message = choice.get("message")
+ tool_calls = message.get("tool_calls")
+ if tool_calls:
+ kv_pairs = OpenTelemetry._tool_calls_kv_pair(tool_calls) # type: ignore
+ for key, value in kv_pairs.items():
+ self.safe_set_attribute(
+ span=span,
+ key=key,
+ value=value,
+ )
+
+ except Exception as e:
+ verbose_logger.exception(
+ "OpenTelemetry logging error in set_attributes %s", str(e)
+ )
+
+ def _cast_as_primitive_value_type(self, value) -> Union[str, bool, int, float]:
+ """
+ Casts the value to a primitive OTEL type if it is not already a primitive type.
+
+ OTEL supports - str, bool, int, float
+
+ If it's not a primitive type, then it's converted to a string
+ """
+ if value is None:
+ return ""
+ if isinstance(value, (str, bool, int, float)):
+ return value
+ try:
+ return str(value)
+ except Exception:
+ return ""
+
+ def safe_set_attribute(self, span: Span, key: str, value: Any):
+ """
+ Safely sets an attribute on the span, ensuring the value is a primitive type.
+ """
+ primitive_value = self._cast_as_primitive_value_type(value)
+ span.set_attribute(key, primitive_value)
+
+ def set_raw_request_attributes(self, span: Span, kwargs, response_obj):
+
+ kwargs.get("optional_params", {})
+ litellm_params = kwargs.get("litellm_params", {}) or {}
+ custom_llm_provider = litellm_params.get("custom_llm_provider", "Unknown")
+
+ _raw_response = kwargs.get("original_response")
+ _additional_args = kwargs.get("additional_args", {}) or {}
+ complete_input_dict = _additional_args.get("complete_input_dict")
+ #############################################
+ ########## LLM Request Attributes ###########
+ #############################################
+
+ # OTEL Attributes for the RAW Request to https://docs.anthropic.com/en/api/messages
+ if complete_input_dict and isinstance(complete_input_dict, dict):
+ for param, val in complete_input_dict.items():
+ self.safe_set_attribute(
+ span=span, key=f"llm.{custom_llm_provider}.{param}", value=val
+ )
+
+ #############################################
+ ########## LLM Response Attributes ##########
+ #############################################
+ if _raw_response and isinstance(_raw_response, str):
+ # cast sr -> dict
+ import json
+
+ try:
+ _raw_response = json.loads(_raw_response)
+ for param, val in _raw_response.items():
+ self.safe_set_attribute(
+ span=span,
+ key=f"llm.{custom_llm_provider}.{param}",
+ value=val,
+ )
+ except json.JSONDecodeError:
+ verbose_logger.debug(
+ "litellm.integrations.opentelemetry.py::set_raw_request_attributes() - raw_response not json string - {}".format(
+ _raw_response
+ )
+ )
+
+ self.safe_set_attribute(
+ span=span,
+ key=f"llm.{custom_llm_provider}.stringified_raw_response",
+ value=_raw_response,
+ )
+
+ def _to_ns(self, dt):
+ return int(dt.timestamp() * 1e9)
+
+ def _get_span_name(self, kwargs):
+ return LITELLM_REQUEST_SPAN_NAME
+
+ def get_traceparent_from_header(self, headers):
+ if headers is None:
+ return None
+ _traceparent = headers.get("traceparent", None)
+ if _traceparent is None:
+ return None
+
+ from opentelemetry.trace.propagation.tracecontext import (
+ TraceContextTextMapPropagator,
+ )
+
+ propagator = TraceContextTextMapPropagator()
+ carrier = {"traceparent": _traceparent}
+ _parent_context = propagator.extract(carrier=carrier)
+
+ return _parent_context
+
+ def _get_span_context(self, kwargs):
+ from opentelemetry import trace
+ from opentelemetry.trace.propagation.tracecontext import (
+ TraceContextTextMapPropagator,
+ )
+
+ litellm_params = kwargs.get("litellm_params", {}) or {}
+ proxy_server_request = litellm_params.get("proxy_server_request", {}) or {}
+ headers = proxy_server_request.get("headers", {}) or {}
+ traceparent = headers.get("traceparent", None)
+ _metadata = litellm_params.get("metadata", {}) or {}
+ parent_otel_span = _metadata.get("litellm_parent_otel_span", None)
+
+ """
+ Two way to use parents in opentelemetry
+ - using the traceparent header
+ - using the parent_otel_span in the [metadata][parent_otel_span]
+ """
+ if parent_otel_span is not None:
+ return trace.set_span_in_context(parent_otel_span), parent_otel_span
+
+ if traceparent is None:
+ return None, None
+ else:
+ carrier = {"traceparent": traceparent}
+ return TraceContextTextMapPropagator().extract(carrier=carrier), None
+
+ def _get_span_processor(self, dynamic_headers: Optional[dict] = None):
+ from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
+ OTLPSpanExporter as OTLPSpanExporterGRPC,
+ )
+ from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
+ OTLPSpanExporter as OTLPSpanExporterHTTP,
+ )
+ from opentelemetry.sdk.trace.export import (
+ BatchSpanProcessor,
+ ConsoleSpanExporter,
+ SimpleSpanProcessor,
+ SpanExporter,
+ )
+
+ verbose_logger.debug(
+ "OpenTelemetry Logger, initializing span processor \nself.OTEL_EXPORTER: %s\nself.OTEL_ENDPOINT: %s\nself.OTEL_HEADERS: %s",
+ self.OTEL_EXPORTER,
+ self.OTEL_ENDPOINT,
+ self.OTEL_HEADERS,
+ )
+ _split_otel_headers = OpenTelemetry._get_headers_dictionary(
+ headers=dynamic_headers or self.OTEL_HEADERS
+ )
+
+ if isinstance(self.OTEL_EXPORTER, SpanExporter):
+ verbose_logger.debug(
+ "OpenTelemetry: intiializing SpanExporter. Value of OTEL_EXPORTER: %s",
+ self.OTEL_EXPORTER,
+ )
+ return SimpleSpanProcessor(self.OTEL_EXPORTER)
+
+ if self.OTEL_EXPORTER == "console":
+ verbose_logger.debug(
+ "OpenTelemetry: intiializing console exporter. Value of OTEL_EXPORTER: %s",
+ self.OTEL_EXPORTER,
+ )
+ return BatchSpanProcessor(ConsoleSpanExporter())
+ elif self.OTEL_EXPORTER == "otlp_http":
+ verbose_logger.debug(
+ "OpenTelemetry: intiializing http exporter. Value of OTEL_EXPORTER: %s",
+ self.OTEL_EXPORTER,
+ )
+ return BatchSpanProcessor(
+ OTLPSpanExporterHTTP(
+ endpoint=self.OTEL_ENDPOINT, headers=_split_otel_headers
+ ),
+ )
+ elif self.OTEL_EXPORTER == "otlp_grpc":
+ verbose_logger.debug(
+ "OpenTelemetry: intiializing grpc exporter. Value of OTEL_EXPORTER: %s",
+ self.OTEL_EXPORTER,
+ )
+ return BatchSpanProcessor(
+ OTLPSpanExporterGRPC(
+ endpoint=self.OTEL_ENDPOINT, headers=_split_otel_headers
+ ),
+ )
+ else:
+ verbose_logger.debug(
+ "OpenTelemetry: intiializing console exporter. Value of OTEL_EXPORTER: %s",
+ self.OTEL_EXPORTER,
+ )
+ return BatchSpanProcessor(ConsoleSpanExporter())
+
+ @staticmethod
+ def _get_headers_dictionary(headers: Optional[Union[str, dict]]) -> Dict[str, str]:
+ """
+ Convert a string or dictionary of headers into a dictionary of headers.
+ """
+ _split_otel_headers: Dict[str, str] = {}
+ if headers:
+ if isinstance(headers, str):
+ # when passed HEADERS="x-honeycomb-team=B85YgLm96******"
+ # Split only on first '=' occurrence
+ parts = headers.split("=", 1)
+ if len(parts) == 2:
+ _split_otel_headers = {parts[0]: parts[1]}
+ else:
+ _split_otel_headers = {}
+ elif isinstance(headers, dict):
+ _split_otel_headers = headers
+ return _split_otel_headers
+
+ async def async_management_endpoint_success_hook(
+ self,
+ logging_payload: ManagementEndpointLoggingPayload,
+ parent_otel_span: Optional[Span] = None,
+ ):
+
+ from opentelemetry import trace
+ from opentelemetry.trace import Status, StatusCode
+
+ _start_time_ns = 0
+ _end_time_ns = 0
+
+ start_time = logging_payload.start_time
+ end_time = logging_payload.end_time
+
+ if isinstance(start_time, float):
+ _start_time_ns = int(start_time * 1e9)
+ else:
+ _start_time_ns = self._to_ns(start_time)
+
+ if isinstance(end_time, float):
+ _end_time_ns = int(end_time * 1e9)
+ else:
+ _end_time_ns = self._to_ns(end_time)
+
+ if parent_otel_span is not None:
+ _span_name = logging_payload.route
+ management_endpoint_span = self.tracer.start_span(
+ name=_span_name,
+ context=trace.set_span_in_context(parent_otel_span),
+ start_time=_start_time_ns,
+ )
+
+ _request_data = logging_payload.request_data
+ if _request_data is not None:
+ for key, value in _request_data.items():
+ self.safe_set_attribute(
+ span=management_endpoint_span,
+ key=f"request.{key}",
+ value=value,
+ )
+
+ _response = logging_payload.response
+ if _response is not None:
+ for key, value in _response.items():
+ self.safe_set_attribute(
+ span=management_endpoint_span,
+ key=f"response.{key}",
+ value=value,
+ )
+
+ management_endpoint_span.set_status(Status(StatusCode.OK))
+ management_endpoint_span.end(end_time=_end_time_ns)
+
+ async def async_management_endpoint_failure_hook(
+ self,
+ logging_payload: ManagementEndpointLoggingPayload,
+ parent_otel_span: Optional[Span] = None,
+ ):
+
+ from opentelemetry import trace
+ from opentelemetry.trace import Status, StatusCode
+
+ _start_time_ns = 0
+ _end_time_ns = 0
+
+ start_time = logging_payload.start_time
+ end_time = logging_payload.end_time
+
+ if isinstance(start_time, float):
+ _start_time_ns = int(int(start_time) * 1e9)
+ else:
+ _start_time_ns = self._to_ns(start_time)
+
+ if isinstance(end_time, float):
+ _end_time_ns = int(int(end_time) * 1e9)
+ else:
+ _end_time_ns = self._to_ns(end_time)
+
+ if parent_otel_span is not None:
+ _span_name = logging_payload.route
+ management_endpoint_span = self.tracer.start_span(
+ name=_span_name,
+ context=trace.set_span_in_context(parent_otel_span),
+ start_time=_start_time_ns,
+ )
+
+ _request_data = logging_payload.request_data
+ if _request_data is not None:
+ for key, value in _request_data.items():
+ self.safe_set_attribute(
+ span=management_endpoint_span,
+ key=f"request.{key}",
+ value=value,
+ )
+
+ _exception = logging_payload.exception
+ self.safe_set_attribute(
+ span=management_endpoint_span,
+ key="exception",
+ value=str(_exception),
+ )
+ management_endpoint_span.set_status(Status(StatusCode.ERROR))
+ management_endpoint_span.end(end_time=_end_time_ns)
+
+ def create_litellm_proxy_request_started_span(
+ self,
+ start_time: datetime,
+ headers: dict,
+ ) -> Optional[Span]:
+ """
+ Create a span for the received proxy server request.
+ """
+ return self.tracer.start_span(
+ name="Received Proxy Server Request",
+ start_time=self._to_ns(start_time),
+ context=self.get_traceparent_from_header(headers=headers),
+ kind=self.span_kind.SERVER,
+ )