about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/opik
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/opik')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/opik/opik.py326
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/opik/utils.py110
2 files changed, 436 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/opik/opik.py b/.venv/lib/python3.12/site-packages/litellm/integrations/opik/opik.py
new file mode 100644
index 00000000..1f7f18f3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/opik/opik.py
@@ -0,0 +1,326 @@
+"""
+Opik Logger that logs LLM events to an Opik server
+"""
+
+import asyncio
+import json
+import traceback
+from typing import Dict, List
+
+from litellm._logging import verbose_logger
+from litellm.integrations.custom_batch_logger import CustomBatchLogger
+from litellm.llms.custom_httpx.http_handler import (
+    _get_httpx_client,
+    get_async_httpx_client,
+    httpxSpecialProvider,
+)
+
+from .utils import (
+    create_usage_object,
+    create_uuid7,
+    get_opik_config_variable,
+    get_traces_and_spans_from_payload,
+)
+
+
+class OpikLogger(CustomBatchLogger):
+    """
+    Opik Logger for logging events to an Opik Server
+    """
+
+    def __init__(self, **kwargs):
+        self.async_httpx_client = get_async_httpx_client(
+            llm_provider=httpxSpecialProvider.LoggingCallback
+        )
+        self.sync_httpx_client = _get_httpx_client()
+
+        self.opik_project_name = get_opik_config_variable(
+            "project_name",
+            user_value=kwargs.get("project_name", None),
+            default_value="Default Project",
+        )
+
+        opik_base_url = get_opik_config_variable(
+            "url_override",
+            user_value=kwargs.get("url", None),
+            default_value="https://www.comet.com/opik/api",
+        )
+        opik_api_key = get_opik_config_variable(
+            "api_key", user_value=kwargs.get("api_key", None), default_value=None
+        )
+        opik_workspace = get_opik_config_variable(
+            "workspace", user_value=kwargs.get("workspace", None), default_value=None
+        )
+
+        self.trace_url = f"{opik_base_url}/v1/private/traces/batch"
+        self.span_url = f"{opik_base_url}/v1/private/spans/batch"
+
+        self.headers = {}
+        if opik_workspace:
+            self.headers["Comet-Workspace"] = opik_workspace
+
+        if opik_api_key:
+            self.headers["authorization"] = opik_api_key
+
+        self.opik_workspace = opik_workspace
+        self.opik_api_key = opik_api_key
+        try:
+            asyncio.create_task(self.periodic_flush())
+            self.flush_lock = asyncio.Lock()
+        except Exception as e:
+            verbose_logger.exception(
+                f"OpikLogger - Asynchronous processing not initialized as we are not running in an async context {str(e)}"
+            )
+            self.flush_lock = None
+
+        super().__init__(**kwargs, flush_lock=self.flush_lock)
+
+    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
+        try:
+            opik_payload = self._create_opik_payload(
+                kwargs=kwargs,
+                response_obj=response_obj,
+                start_time=start_time,
+                end_time=end_time,
+            )
+
+            self.log_queue.extend(opik_payload)
+            verbose_logger.debug(
+                f"OpikLogger added event to log_queue - Will flush in {self.flush_interval} seconds..."
+            )
+
+            if len(self.log_queue) >= self.batch_size:
+                verbose_logger.debug("OpikLogger - Flushing batch")
+                await self.flush_queue()
+        except Exception as e:
+            verbose_logger.exception(
+                f"OpikLogger failed to log success event - {str(e)}\n{traceback.format_exc()}"
+            )
+
+    def _sync_send(self, url: str, headers: Dict[str, str], batch: Dict):
+        try:
+            response = self.sync_httpx_client.post(
+                url=url, headers=headers, json=batch  # type: ignore
+            )
+            response.raise_for_status()
+            if response.status_code != 204:
+                raise Exception(
+                    f"Response from opik API status_code: {response.status_code}, text: {response.text}"
+                )
+        except Exception as e:
+            verbose_logger.exception(
+                f"OpikLogger failed to send batch - {str(e)}\n{traceback.format_exc()}"
+            )
+
+    def log_success_event(self, kwargs, response_obj, start_time, end_time):
+        try:
+            opik_payload = self._create_opik_payload(
+                kwargs=kwargs,
+                response_obj=response_obj,
+                start_time=start_time,
+                end_time=end_time,
+            )
+
+            traces, spans = get_traces_and_spans_from_payload(opik_payload)
+            if len(traces) > 0:
+                self._sync_send(
+                    url=self.trace_url, headers=self.headers, batch={"traces": traces}
+                )
+            if len(spans) > 0:
+                self._sync_send(
+                    url=self.span_url, headers=self.headers, batch={"spans": spans}
+                )
+        except Exception as e:
+            verbose_logger.exception(
+                f"OpikLogger failed to log success event - {str(e)}\n{traceback.format_exc()}"
+            )
+
+    async def _submit_batch(self, url: str, headers: Dict[str, str], batch: Dict):
+        try:
+            response = await self.async_httpx_client.post(
+                url=url, headers=headers, json=batch  # type: ignore
+            )
+            response.raise_for_status()
+
+            if response.status_code >= 300:
+                verbose_logger.error(
+                    f"OpikLogger - Error: {response.status_code} - {response.text}"
+                )
+            else:
+                verbose_logger.info(
+                    f"OpikLogger - {len(self.log_queue)} Opik events submitted"
+                )
+        except Exception as e:
+            verbose_logger.exception(f"OpikLogger failed to send batch - {str(e)}")
+
+    def _create_opik_headers(self):
+        headers = {}
+        if self.opik_workspace:
+            headers["Comet-Workspace"] = self.opik_workspace
+
+        if self.opik_api_key:
+            headers["authorization"] = self.opik_api_key
+        return headers
+
+    async def async_send_batch(self):
+        verbose_logger.info("Calling async_send_batch")
+        if not self.log_queue:
+            return
+
+        # Split the log_queue into traces and spans
+        traces, spans = get_traces_and_spans_from_payload(self.log_queue)
+
+        # Send trace batch
+        if len(traces) > 0:
+            await self._submit_batch(
+                url=self.trace_url, headers=self.headers, batch={"traces": traces}
+            )
+            verbose_logger.info(f"Sent {len(traces)} traces")
+        if len(spans) > 0:
+            await self._submit_batch(
+                url=self.span_url, headers=self.headers, batch={"spans": spans}
+            )
+            verbose_logger.info(f"Sent {len(spans)} spans")
+
+    def _create_opik_payload(  # noqa: PLR0915
+        self, kwargs, response_obj, start_time, end_time
+    ) -> List[Dict]:
+
+        # Get metadata
+        _litellm_params = kwargs.get("litellm_params", {}) or {}
+        litellm_params_metadata = _litellm_params.get("metadata", {}) or {}
+
+        # Extract opik metadata
+        litellm_opik_metadata = litellm_params_metadata.get("opik", {})
+        verbose_logger.debug(
+            f"litellm_opik_metadata - {json.dumps(litellm_opik_metadata, default=str)}"
+        )
+        project_name = litellm_opik_metadata.get("project_name", self.opik_project_name)
+
+        # Extract trace_id and parent_span_id
+        current_span_data = litellm_opik_metadata.get("current_span_data", None)
+        if isinstance(current_span_data, dict):
+            trace_id = current_span_data.get("trace_id", None)
+            parent_span_id = current_span_data.get("id", None)
+        elif current_span_data:
+            trace_id = current_span_data.trace_id
+            parent_span_id = current_span_data.id
+        else:
+            trace_id = None
+            parent_span_id = None
+        # Create Opik tags
+        opik_tags = litellm_opik_metadata.get("tags", [])
+        if kwargs.get("custom_llm_provider"):
+            opik_tags.append(kwargs["custom_llm_provider"])
+
+        # Use standard_logging_object to create metadata and input/output data
+        standard_logging_object = kwargs.get("standard_logging_object", None)
+        if standard_logging_object is None:
+            verbose_logger.debug(
+                "OpikLogger skipping event; no standard_logging_object found"
+            )
+            return []
+
+        # Create input and output data
+        input_data = standard_logging_object.get("messages", {})
+        output_data = standard_logging_object.get("response", {})
+
+        # Create usage object
+        usage = create_usage_object(response_obj["usage"])
+
+        # Define span and trace names
+        span_name = "%s_%s_%s" % (
+            response_obj.get("model", "unknown-model"),
+            response_obj.get("object", "unknown-object"),
+            response_obj.get("created", 0),
+        )
+        trace_name = response_obj.get("object", "unknown type")
+
+        # Create metadata object, we add the opik metadata first and then
+        # update it with the standard_logging_object metadata
+        metadata = litellm_opik_metadata
+        if "current_span_data" in metadata:
+            del metadata["current_span_data"]
+        metadata["created_from"] = "litellm"
+
+        metadata.update(standard_logging_object.get("metadata", {}))
+        if "call_type" in standard_logging_object:
+            metadata["type"] = standard_logging_object["call_type"]
+        if "status" in standard_logging_object:
+            metadata["status"] = standard_logging_object["status"]
+        if "response_cost" in kwargs:
+            metadata["cost"] = {
+                "total_tokens": kwargs["response_cost"],
+                "currency": "USD",
+            }
+        if "response_cost_failure_debug_info" in kwargs:
+            metadata["response_cost_failure_debug_info"] = kwargs[
+                "response_cost_failure_debug_info"
+            ]
+        if "model_map_information" in standard_logging_object:
+            metadata["model_map_information"] = standard_logging_object[
+                "model_map_information"
+            ]
+        if "model" in standard_logging_object:
+            metadata["model"] = standard_logging_object["model"]
+        if "model_id" in standard_logging_object:
+            metadata["model_id"] = standard_logging_object["model_id"]
+        if "model_group" in standard_logging_object:
+            metadata["model_group"] = standard_logging_object["model_group"]
+        if "api_base" in standard_logging_object:
+            metadata["api_base"] = standard_logging_object["api_base"]
+        if "cache_hit" in standard_logging_object:
+            metadata["cache_hit"] = standard_logging_object["cache_hit"]
+        if "saved_cache_cost" in standard_logging_object:
+            metadata["saved_cache_cost"] = standard_logging_object["saved_cache_cost"]
+        if "error_str" in standard_logging_object:
+            metadata["error_str"] = standard_logging_object["error_str"]
+        if "model_parameters" in standard_logging_object:
+            metadata["model_parameters"] = standard_logging_object["model_parameters"]
+        if "hidden_params" in standard_logging_object:
+            metadata["hidden_params"] = standard_logging_object["hidden_params"]
+
+        payload = []
+        if trace_id is None:
+            trace_id = create_uuid7()
+            verbose_logger.debug(
+                f"OpikLogger creating payload for trace with id {trace_id}"
+            )
+
+            payload.append(
+                {
+                    "project_name": project_name,
+                    "id": trace_id,
+                    "name": trace_name,
+                    "start_time": start_time.isoformat() + "Z",
+                    "end_time": end_time.isoformat() + "Z",
+                    "input": input_data,
+                    "output": output_data,
+                    "metadata": metadata,
+                    "tags": opik_tags,
+                }
+            )
+
+        span_id = create_uuid7()
+        verbose_logger.debug(
+            f"OpikLogger creating payload for trace with id {trace_id} and span with id {span_id}"
+        )
+        payload.append(
+            {
+                "id": span_id,
+                "project_name": project_name,
+                "trace_id": trace_id,
+                "parent_span_id": parent_span_id,
+                "name": span_name,
+                "type": "llm",
+                "start_time": start_time.isoformat() + "Z",
+                "end_time": end_time.isoformat() + "Z",
+                "input": input_data,
+                "output": output_data,
+                "metadata": metadata,
+                "tags": opik_tags,
+                "usage": usage,
+            }
+        )
+        verbose_logger.debug(f"Payload: {payload}")
+        return payload
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/opik/utils.py b/.venv/lib/python3.12/site-packages/litellm/integrations/opik/utils.py
new file mode 100644
index 00000000..7b3b64dc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/opik/utils.py
@@ -0,0 +1,110 @@
+import configparser
+import os
+import time
+from typing import Dict, Final, List, Optional
+
+CONFIG_FILE_PATH_DEFAULT: Final[str] = "~/.opik.config"
+
+
+def create_uuid7():
+    ns = time.time_ns()
+    last = [0, 0, 0, 0]
+
+    # Simple uuid7 implementation
+    sixteen_secs = 16_000_000_000
+    t1, rest1 = divmod(ns, sixteen_secs)
+    t2, rest2 = divmod(rest1 << 16, sixteen_secs)
+    t3, _ = divmod(rest2 << 12, sixteen_secs)
+    t3 |= 7 << 12  # Put uuid version in top 4 bits, which are 0 in t3
+
+    # The next two bytes are an int (t4) with two bits for
+    # the variant 2 and a 14 bit sequence counter which increments
+    # if the time is unchanged.
+    if t1 == last[0] and t2 == last[1] and t3 == last[2]:
+        # Stop the seq counter wrapping past 0x3FFF.
+        # This won't happen in practice, but if it does,
+        # uuids after the 16383rd with that same timestamp
+        # will not longer be correctly ordered but
+        # are still unique due to the 6 random bytes.
+        if last[3] < 0x3FFF:
+            last[3] += 1
+    else:
+        last[:] = (t1, t2, t3, 0)
+    t4 = (2 << 14) | last[3]  # Put variant 0b10 in top two bits
+
+    # Six random bytes for the lower part of the uuid
+    rand = os.urandom(6)
+    return f"{t1:>08x}-{t2:>04x}-{t3:>04x}-{t4:>04x}-{rand.hex()}"
+
+
+def _read_opik_config_file() -> Dict[str, str]:
+    config_path = os.path.expanduser(CONFIG_FILE_PATH_DEFAULT)
+
+    config = configparser.ConfigParser()
+    config.read(config_path)
+
+    config_values = {
+        section: dict(config.items(section)) for section in config.sections()
+    }
+
+    if "opik" in config_values:
+        return config_values["opik"]
+
+    return {}
+
+
+def _get_env_variable(key: str) -> Optional[str]:
+    env_prefix = "opik_"
+    return os.getenv((env_prefix + key).upper(), None)
+
+
+def get_opik_config_variable(
+    key: str, user_value: Optional[str] = None, default_value: Optional[str] = None
+) -> Optional[str]:
+    """
+    Get the configuration value of a variable, order priority is:
+    1. user provided value
+    2. environment variable
+    3. Opik configuration file
+    4. default value
+    """
+    # Return user provided value if it is not None
+    if user_value is not None:
+        return user_value
+
+    # Return environment variable if it is not None
+    env_value = _get_env_variable(key)
+    if env_value is not None:
+        return env_value
+
+    # Return value from Opik configuration file if it is not None
+    config_values = _read_opik_config_file()
+
+    if key in config_values:
+        return config_values[key]
+
+    # Return default value if it is not None
+    return default_value
+
+
+def create_usage_object(usage):
+    usage_dict = {}
+
+    if usage.completion_tokens is not None:
+        usage_dict["completion_tokens"] = usage.completion_tokens
+    if usage.prompt_tokens is not None:
+        usage_dict["prompt_tokens"] = usage.prompt_tokens
+    if usage.total_tokens is not None:
+        usage_dict["total_tokens"] = usage.total_tokens
+    return usage_dict
+
+
+def _remove_nulls(x):
+    x_ = {k: v for k, v in x.items() if v is not None}
+    return x_
+
+
+def get_traces_and_spans_from_payload(payload: List):
+    traces = [_remove_nulls(x) for x in payload if "type" not in x]
+    spans = [_remove_nulls(x) for x in payload if "type" in x]
+    return traces, spans