diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/opik/opik.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/integrations/opik/opik.py | 326 |
1 files changed, 326 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/opik/opik.py b/.venv/lib/python3.12/site-packages/litellm/integrations/opik/opik.py new file mode 100644 index 00000000..1f7f18f3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/integrations/opik/opik.py @@ -0,0 +1,326 @@ +""" +Opik Logger that logs LLM events to an Opik server +""" + +import asyncio +import json +import traceback +from typing import Dict, List + +from litellm._logging import verbose_logger +from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.llms.custom_httpx.http_handler import ( + _get_httpx_client, + get_async_httpx_client, + httpxSpecialProvider, +) + +from .utils import ( + create_usage_object, + create_uuid7, + get_opik_config_variable, + get_traces_and_spans_from_payload, +) + + +class OpikLogger(CustomBatchLogger): + """ + Opik Logger for logging events to an Opik Server + """ + + def __init__(self, **kwargs): + self.async_httpx_client = get_async_httpx_client( + llm_provider=httpxSpecialProvider.LoggingCallback + ) + self.sync_httpx_client = _get_httpx_client() + + self.opik_project_name = get_opik_config_variable( + "project_name", + user_value=kwargs.get("project_name", None), + default_value="Default Project", + ) + + opik_base_url = get_opik_config_variable( + "url_override", + user_value=kwargs.get("url", None), + default_value="https://www.comet.com/opik/api", + ) + opik_api_key = get_opik_config_variable( + "api_key", user_value=kwargs.get("api_key", None), default_value=None + ) + opik_workspace = get_opik_config_variable( + "workspace", user_value=kwargs.get("workspace", None), default_value=None + ) + + self.trace_url = f"{opik_base_url}/v1/private/traces/batch" + self.span_url = f"{opik_base_url}/v1/private/spans/batch" + + self.headers = {} + if opik_workspace: + self.headers["Comet-Workspace"] = opik_workspace + + if opik_api_key: + self.headers["authorization"] = opik_api_key + + self.opik_workspace = opik_workspace + self.opik_api_key = opik_api_key + try: + asyncio.create_task(self.periodic_flush()) + self.flush_lock = asyncio.Lock() + except Exception as e: + verbose_logger.exception( + f"OpikLogger - Asynchronous processing not initialized as we are not running in an async context {str(e)}" + ) + self.flush_lock = None + + super().__init__(**kwargs, flush_lock=self.flush_lock) + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + try: + opik_payload = self._create_opik_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + + self.log_queue.extend(opik_payload) + verbose_logger.debug( + f"OpikLogger added event to log_queue - Will flush in {self.flush_interval} seconds..." + ) + + if len(self.log_queue) >= self.batch_size: + verbose_logger.debug("OpikLogger - Flushing batch") + await self.flush_queue() + except Exception as e: + verbose_logger.exception( + f"OpikLogger failed to log success event - {str(e)}\n{traceback.format_exc()}" + ) + + def _sync_send(self, url: str, headers: Dict[str, str], batch: Dict): + try: + response = self.sync_httpx_client.post( + url=url, headers=headers, json=batch # type: ignore + ) + response.raise_for_status() + if response.status_code != 204: + raise Exception( + f"Response from opik API status_code: {response.status_code}, text: {response.text}" + ) + except Exception as e: + verbose_logger.exception( + f"OpikLogger failed to send batch - {str(e)}\n{traceback.format_exc()}" + ) + + def log_success_event(self, kwargs, response_obj, start_time, end_time): + try: + opik_payload = self._create_opik_payload( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + ) + + traces, spans = get_traces_and_spans_from_payload(opik_payload) + if len(traces) > 0: + self._sync_send( + url=self.trace_url, headers=self.headers, batch={"traces": traces} + ) + if len(spans) > 0: + self._sync_send( + url=self.span_url, headers=self.headers, batch={"spans": spans} + ) + except Exception as e: + verbose_logger.exception( + f"OpikLogger failed to log success event - {str(e)}\n{traceback.format_exc()}" + ) + + async def _submit_batch(self, url: str, headers: Dict[str, str], batch: Dict): + try: + response = await self.async_httpx_client.post( + url=url, headers=headers, json=batch # type: ignore + ) + response.raise_for_status() + + if response.status_code >= 300: + verbose_logger.error( + f"OpikLogger - Error: {response.status_code} - {response.text}" + ) + else: + verbose_logger.info( + f"OpikLogger - {len(self.log_queue)} Opik events submitted" + ) + except Exception as e: + verbose_logger.exception(f"OpikLogger failed to send batch - {str(e)}") + + def _create_opik_headers(self): + headers = {} + if self.opik_workspace: + headers["Comet-Workspace"] = self.opik_workspace + + if self.opik_api_key: + headers["authorization"] = self.opik_api_key + return headers + + async def async_send_batch(self): + verbose_logger.info("Calling async_send_batch") + if not self.log_queue: + return + + # Split the log_queue into traces and spans + traces, spans = get_traces_and_spans_from_payload(self.log_queue) + + # Send trace batch + if len(traces) > 0: + await self._submit_batch( + url=self.trace_url, headers=self.headers, batch={"traces": traces} + ) + verbose_logger.info(f"Sent {len(traces)} traces") + if len(spans) > 0: + await self._submit_batch( + url=self.span_url, headers=self.headers, batch={"spans": spans} + ) + verbose_logger.info(f"Sent {len(spans)} spans") + + def _create_opik_payload( # noqa: PLR0915 + self, kwargs, response_obj, start_time, end_time + ) -> List[Dict]: + + # Get metadata + _litellm_params = kwargs.get("litellm_params", {}) or {} + litellm_params_metadata = _litellm_params.get("metadata", {}) or {} + + # Extract opik metadata + litellm_opik_metadata = litellm_params_metadata.get("opik", {}) + verbose_logger.debug( + f"litellm_opik_metadata - {json.dumps(litellm_opik_metadata, default=str)}" + ) + project_name = litellm_opik_metadata.get("project_name", self.opik_project_name) + + # Extract trace_id and parent_span_id + current_span_data = litellm_opik_metadata.get("current_span_data", None) + if isinstance(current_span_data, dict): + trace_id = current_span_data.get("trace_id", None) + parent_span_id = current_span_data.get("id", None) + elif current_span_data: + trace_id = current_span_data.trace_id + parent_span_id = current_span_data.id + else: + trace_id = None + parent_span_id = None + # Create Opik tags + opik_tags = litellm_opik_metadata.get("tags", []) + if kwargs.get("custom_llm_provider"): + opik_tags.append(kwargs["custom_llm_provider"]) + + # Use standard_logging_object to create metadata and input/output data + standard_logging_object = kwargs.get("standard_logging_object", None) + if standard_logging_object is None: + verbose_logger.debug( + "OpikLogger skipping event; no standard_logging_object found" + ) + return [] + + # Create input and output data + input_data = standard_logging_object.get("messages", {}) + output_data = standard_logging_object.get("response", {}) + + # Create usage object + usage = create_usage_object(response_obj["usage"]) + + # Define span and trace names + span_name = "%s_%s_%s" % ( + response_obj.get("model", "unknown-model"), + response_obj.get("object", "unknown-object"), + response_obj.get("created", 0), + ) + trace_name = response_obj.get("object", "unknown type") + + # Create metadata object, we add the opik metadata first and then + # update it with the standard_logging_object metadata + metadata = litellm_opik_metadata + if "current_span_data" in metadata: + del metadata["current_span_data"] + metadata["created_from"] = "litellm" + + metadata.update(standard_logging_object.get("metadata", {})) + if "call_type" in standard_logging_object: + metadata["type"] = standard_logging_object["call_type"] + if "status" in standard_logging_object: + metadata["status"] = standard_logging_object["status"] + if "response_cost" in kwargs: + metadata["cost"] = { + "total_tokens": kwargs["response_cost"], + "currency": "USD", + } + if "response_cost_failure_debug_info" in kwargs: + metadata["response_cost_failure_debug_info"] = kwargs[ + "response_cost_failure_debug_info" + ] + if "model_map_information" in standard_logging_object: + metadata["model_map_information"] = standard_logging_object[ + "model_map_information" + ] + if "model" in standard_logging_object: + metadata["model"] = standard_logging_object["model"] + if "model_id" in standard_logging_object: + metadata["model_id"] = standard_logging_object["model_id"] + if "model_group" in standard_logging_object: + metadata["model_group"] = standard_logging_object["model_group"] + if "api_base" in standard_logging_object: + metadata["api_base"] = standard_logging_object["api_base"] + if "cache_hit" in standard_logging_object: + metadata["cache_hit"] = standard_logging_object["cache_hit"] + if "saved_cache_cost" in standard_logging_object: + metadata["saved_cache_cost"] = standard_logging_object["saved_cache_cost"] + if "error_str" in standard_logging_object: + metadata["error_str"] = standard_logging_object["error_str"] + if "model_parameters" in standard_logging_object: + metadata["model_parameters"] = standard_logging_object["model_parameters"] + if "hidden_params" in standard_logging_object: + metadata["hidden_params"] = standard_logging_object["hidden_params"] + + payload = [] + if trace_id is None: + trace_id = create_uuid7() + verbose_logger.debug( + f"OpikLogger creating payload for trace with id {trace_id}" + ) + + payload.append( + { + "project_name": project_name, + "id": trace_id, + "name": trace_name, + "start_time": start_time.isoformat() + "Z", + "end_time": end_time.isoformat() + "Z", + "input": input_data, + "output": output_data, + "metadata": metadata, + "tags": opik_tags, + } + ) + + span_id = create_uuid7() + verbose_logger.debug( + f"OpikLogger creating payload for trace with id {trace_id} and span with id {span_id}" + ) + payload.append( + { + "id": span_id, + "project_name": project_name, + "trace_id": trace_id, + "parent_span_id": parent_span_id, + "name": span_name, + "type": "llm", + "start_time": start_time.isoformat() + "Z", + "end_time": end_time.isoformat() + "Z", + "input": input_data, + "output": output_data, + "metadata": metadata, + "tags": opik_tags, + "usage": usage, + } + ) + verbose_logger.debug(f"Payload: {payload}") + return payload |