about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog_llm_obs.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog_llm_obs.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog_llm_obs.py203
1 files changed, 203 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog_llm_obs.py b/.venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog_llm_obs.py
new file mode 100644
index 00000000..e4e074ba
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/datadog/datadog_llm_obs.py
@@ -0,0 +1,203 @@
+"""
+Implements logging integration with Datadog's LLM Observability Service
+
+
+API Reference: https://docs.datadoghq.com/llm_observability/setup/api/?tab=example#api-standards
+
+"""
+
+import asyncio
+import json
+import os
+import uuid
+from datetime import datetime
+from typing import Any, Dict, List, Optional, Union
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.integrations.custom_batch_logger import CustomBatchLogger
+from litellm.integrations.datadog.datadog import DataDogLogger
+from litellm.llms.custom_httpx.http_handler import (
+    get_async_httpx_client,
+    httpxSpecialProvider,
+)
+from litellm.types.integrations.datadog_llm_obs import *
+from litellm.types.utils import StandardLoggingPayload
+
+
+class DataDogLLMObsLogger(DataDogLogger, CustomBatchLogger):
+    def __init__(self, **kwargs):
+        try:
+            verbose_logger.debug("DataDogLLMObs: Initializing logger")
+            if os.getenv("DD_API_KEY", None) is None:
+                raise Exception("DD_API_KEY is not set, set 'DD_API_KEY=<>'")
+            if os.getenv("DD_SITE", None) is None:
+                raise Exception(
+                    "DD_SITE is not set, set 'DD_SITE=<>', example sit = `us5.datadoghq.com`"
+                )
+
+            self.async_client = get_async_httpx_client(
+                llm_provider=httpxSpecialProvider.LoggingCallback
+            )
+            self.DD_API_KEY = os.getenv("DD_API_KEY")
+            self.DD_SITE = os.getenv("DD_SITE")
+            self.intake_url = (
+                f"https://api.{self.DD_SITE}/api/intake/llm-obs/v1/trace/spans"
+            )
+
+            # testing base url
+            dd_base_url = os.getenv("DD_BASE_URL")
+            if dd_base_url:
+                self.intake_url = f"{dd_base_url}/api/intake/llm-obs/v1/trace/spans"
+
+            asyncio.create_task(self.periodic_flush())
+            self.flush_lock = asyncio.Lock()
+            self.log_queue: List[LLMObsPayload] = []
+            CustomBatchLogger.__init__(self, **kwargs, flush_lock=self.flush_lock)
+        except Exception as e:
+            verbose_logger.exception(f"DataDogLLMObs: Error initializing - {str(e)}")
+            raise e
+
+    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
+        try:
+            verbose_logger.debug(
+                f"DataDogLLMObs: Logging success event for model {kwargs.get('model', 'unknown')}"
+            )
+            payload = self.create_llm_obs_payload(
+                kwargs, response_obj, start_time, end_time
+            )
+            verbose_logger.debug(f"DataDogLLMObs: Payload: {payload}")
+            self.log_queue.append(payload)
+
+            if len(self.log_queue) >= self.batch_size:
+                await self.async_send_batch()
+        except Exception as e:
+            verbose_logger.exception(
+                f"DataDogLLMObs: Error logging success event - {str(e)}"
+            )
+
+    async def async_send_batch(self):
+        try:
+            if not self.log_queue:
+                return
+
+            verbose_logger.debug(
+                f"DataDogLLMObs: Flushing {len(self.log_queue)} events"
+            )
+
+            # Prepare the payload
+            payload = {
+                "data": DDIntakePayload(
+                    type="span",
+                    attributes=DDSpanAttributes(
+                        ml_app=self._get_datadog_service(),
+                        tags=[self._get_datadog_tags()],
+                        spans=self.log_queue,
+                    ),
+                ),
+            }
+            verbose_logger.debug("payload %s", json.dumps(payload, indent=4))
+            response = await self.async_client.post(
+                url=self.intake_url,
+                json=payload,
+                headers={
+                    "DD-API-KEY": self.DD_API_KEY,
+                    "Content-Type": "application/json",
+                },
+            )
+
+            response.raise_for_status()
+            if response.status_code != 202:
+                raise Exception(
+                    f"DataDogLLMObs: Unexpected response - status_code: {response.status_code}, text: {response.text}"
+                )
+
+            verbose_logger.debug(
+                f"DataDogLLMObs: Successfully sent batch - status_code: {response.status_code}"
+            )
+            self.log_queue.clear()
+        except Exception as e:
+            verbose_logger.exception(f"DataDogLLMObs: Error sending batch - {str(e)}")
+
+    def create_llm_obs_payload(
+        self, kwargs: Dict, response_obj: Any, start_time: datetime, end_time: datetime
+    ) -> LLMObsPayload:
+        standard_logging_payload: Optional[StandardLoggingPayload] = kwargs.get(
+            "standard_logging_object"
+        )
+        if standard_logging_payload is None:
+            raise Exception("DataDogLLMObs: standard_logging_object is not set")
+
+        messages = standard_logging_payload["messages"]
+        messages = self._ensure_string_content(messages=messages)
+
+        metadata = kwargs.get("litellm_params", {}).get("metadata", {})
+
+        input_meta = InputMeta(messages=messages)  # type: ignore
+        output_meta = OutputMeta(messages=self._get_response_messages(response_obj))
+
+        meta = Meta(
+            kind="llm",
+            input=input_meta,
+            output=output_meta,
+            metadata=self._get_dd_llm_obs_payload_metadata(standard_logging_payload),
+        )
+
+        # Calculate metrics (you may need to adjust these based on available data)
+        metrics = LLMMetrics(
+            input_tokens=float(standard_logging_payload.get("prompt_tokens", 0)),
+            output_tokens=float(standard_logging_payload.get("completion_tokens", 0)),
+            total_tokens=float(standard_logging_payload.get("total_tokens", 0)),
+        )
+
+        return LLMObsPayload(
+            parent_id=metadata.get("parent_id", "undefined"),
+            trace_id=metadata.get("trace_id", str(uuid.uuid4())),
+            span_id=metadata.get("span_id", str(uuid.uuid4())),
+            name=metadata.get("name", "litellm_llm_call"),
+            meta=meta,
+            start_ns=int(start_time.timestamp() * 1e9),
+            duration=int((end_time - start_time).total_seconds() * 1e9),
+            metrics=metrics,
+            tags=[
+                self._get_datadog_tags(standard_logging_object=standard_logging_payload)
+            ],
+        )
+
+    def _get_response_messages(self, response_obj: Any) -> List[Any]:
+        """
+        Get the messages from the response object
+
+        for now this handles logging /chat/completions responses
+        """
+        if isinstance(response_obj, litellm.ModelResponse):
+            return [response_obj["choices"][0]["message"].json()]
+        return []
+
+    def _ensure_string_content(
+        self, messages: Optional[Union[str, List[Any], Dict[Any, Any]]]
+    ) -> List[Any]:
+        if messages is None:
+            return []
+        if isinstance(messages, str):
+            return [messages]
+        elif isinstance(messages, list):
+            return [message for message in messages]
+        elif isinstance(messages, dict):
+            return [str(messages.get("content", ""))]
+        return []
+
+    def _get_dd_llm_obs_payload_metadata(
+        self, standard_logging_payload: StandardLoggingPayload
+    ) -> Dict:
+        _metadata = {
+            "model_name": standard_logging_payload.get("model", "unknown"),
+            "model_provider": standard_logging_payload.get(
+                "custom_llm_provider", "unknown"
+            ),
+        }
+        _standard_logging_metadata: dict = (
+            dict(standard_logging_payload.get("metadata", {})) or {}
+        )
+        _metadata.update(_standard_logging_metadata)
+        return _metadata