about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/langsmith.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/integrations/langsmith.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/langsmith.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/langsmith.py500
1 files changed, 500 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/langsmith.py b/.venv/lib/python3.12/site-packages/litellm/integrations/langsmith.py
new file mode 100644
index 00000000..1ef90c18
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/langsmith.py
@@ -0,0 +1,500 @@
+#### What this does ####
+#    On success, logs events to Langsmith
+import asyncio
+import os
+import random
+import traceback
+import types
+import uuid
+from datetime import datetime, timezone
+from typing import Any, Dict, List, Optional
+
+import httpx
+from pydantic import BaseModel  # type: ignore
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.integrations.custom_batch_logger import CustomBatchLogger
+from litellm.llms.custom_httpx.http_handler import (
+    get_async_httpx_client,
+    httpxSpecialProvider,
+)
+from litellm.types.integrations.langsmith import *
+from litellm.types.utils import StandardCallbackDynamicParams, StandardLoggingPayload
+
+
+def is_serializable(value):
+    non_serializable_types = (
+        types.CoroutineType,
+        types.FunctionType,
+        types.GeneratorType,
+        BaseModel,
+    )
+    return not isinstance(value, non_serializable_types)
+
+
+class LangsmithLogger(CustomBatchLogger):
+    def __init__(
+        self,
+        langsmith_api_key: Optional[str] = None,
+        langsmith_project: Optional[str] = None,
+        langsmith_base_url: Optional[str] = None,
+        **kwargs,
+    ):
+        self.default_credentials = self.get_credentials_from_env(
+            langsmith_api_key=langsmith_api_key,
+            langsmith_project=langsmith_project,
+            langsmith_base_url=langsmith_base_url,
+        )
+        self.sampling_rate: float = (
+            float(os.getenv("LANGSMITH_SAMPLING_RATE"))  # type: ignore
+            if os.getenv("LANGSMITH_SAMPLING_RATE") is not None
+            and os.getenv("LANGSMITH_SAMPLING_RATE").strip().isdigit()  # type: ignore
+            else 1.0
+        )
+        self.langsmith_default_run_name = os.getenv(
+            "LANGSMITH_DEFAULT_RUN_NAME", "LLMRun"
+        )
+        self.async_httpx_client = get_async_httpx_client(
+            llm_provider=httpxSpecialProvider.LoggingCallback
+        )
+        _batch_size = (
+            os.getenv("LANGSMITH_BATCH_SIZE", None) or litellm.langsmith_batch_size
+        )
+        if _batch_size:
+            self.batch_size = int(_batch_size)
+        self.log_queue: List[LangsmithQueueObject] = []
+        asyncio.create_task(self.periodic_flush())
+        self.flush_lock = asyncio.Lock()
+
+        super().__init__(**kwargs, flush_lock=self.flush_lock)
+
+    def get_credentials_from_env(
+        self,
+        langsmith_api_key: Optional[str] = None,
+        langsmith_project: Optional[str] = None,
+        langsmith_base_url: Optional[str] = None,
+    ) -> LangsmithCredentialsObject:
+
+        _credentials_api_key = langsmith_api_key or os.getenv("LANGSMITH_API_KEY")
+        if _credentials_api_key is None:
+            raise Exception(
+                "Invalid Langsmith API Key given. _credentials_api_key=None."
+            )
+        _credentials_project = (
+            langsmith_project or os.getenv("LANGSMITH_PROJECT") or "litellm-completion"
+        )
+        if _credentials_project is None:
+            raise Exception(
+                "Invalid Langsmith API Key given. _credentials_project=None."
+            )
+        _credentials_base_url = (
+            langsmith_base_url
+            or os.getenv("LANGSMITH_BASE_URL")
+            or "https://api.smith.langchain.com"
+        )
+        if _credentials_base_url is None:
+            raise Exception(
+                "Invalid Langsmith API Key given. _credentials_base_url=None."
+            )
+
+        return LangsmithCredentialsObject(
+            LANGSMITH_API_KEY=_credentials_api_key,
+            LANGSMITH_BASE_URL=_credentials_base_url,
+            LANGSMITH_PROJECT=_credentials_project,
+        )
+
+    def _prepare_log_data(
+        self,
+        kwargs,
+        response_obj,
+        start_time,
+        end_time,
+        credentials: LangsmithCredentialsObject,
+    ):
+        try:
+            _litellm_params = kwargs.get("litellm_params", {}) or {}
+            metadata = _litellm_params.get("metadata", {}) or {}
+            project_name = metadata.get(
+                "project_name", credentials["LANGSMITH_PROJECT"]
+            )
+            run_name = metadata.get("run_name", self.langsmith_default_run_name)
+            run_id = metadata.get("id", metadata.get("run_id", None))
+            parent_run_id = metadata.get("parent_run_id", None)
+            trace_id = metadata.get("trace_id", None)
+            session_id = metadata.get("session_id", None)
+            dotted_order = metadata.get("dotted_order", None)
+            verbose_logger.debug(
+                f"Langsmith Logging - project_name: {project_name}, run_name {run_name}"
+            )
+
+            # Ensure everything in the payload is converted to str
+            payload: Optional[StandardLoggingPayload] = kwargs.get(
+                "standard_logging_object", None
+            )
+
+            if payload is None:
+                raise Exception("Error logging request payload. Payload=none.")
+
+            metadata = payload[
+                "metadata"
+            ]  # ensure logged metadata is json serializable
+
+            data = {
+                "name": run_name,
+                "run_type": "llm",  # this should always be llm, since litellm always logs llm calls. Langsmith allow us to log "chain"
+                "inputs": payload,
+                "outputs": payload["response"],
+                "session_name": project_name,
+                "start_time": payload["startTime"],
+                "end_time": payload["endTime"],
+                "tags": payload["request_tags"],
+                "extra": metadata,
+            }
+
+            if payload["error_str"] is not None and payload["status"] == "failure":
+                data["error"] = payload["error_str"]
+
+            if run_id:
+                data["id"] = run_id
+
+            if parent_run_id:
+                data["parent_run_id"] = parent_run_id
+
+            if trace_id:
+                data["trace_id"] = trace_id
+
+            if session_id:
+                data["session_id"] = session_id
+
+            if dotted_order:
+                data["dotted_order"] = dotted_order
+
+            run_id: Optional[str] = data.get("id")  # type: ignore
+            if "id" not in data or data["id"] is None:
+                """
+                for /batch langsmith requires id, trace_id and dotted_order passed as params
+                """
+                run_id = str(uuid.uuid4())
+
+                data["id"] = run_id
+
+            if (
+                "trace_id" not in data
+                or data["trace_id"] is None
+                and (run_id is not None and isinstance(run_id, str))
+            ):
+                data["trace_id"] = run_id
+
+            if (
+                "dotted_order" not in data
+                or data["dotted_order"] is None
+                and (run_id is not None and isinstance(run_id, str))
+            ):
+                data["dotted_order"] = self.make_dot_order(run_id=run_id)  # type: ignore
+
+            verbose_logger.debug("Langsmith Logging data on langsmith: %s", data)
+
+            return data
+        except Exception:
+            raise
+
+    def log_success_event(self, kwargs, response_obj, start_time, end_time):
+        try:
+            sampling_rate = (
+                float(os.getenv("LANGSMITH_SAMPLING_RATE"))  # type: ignore
+                if os.getenv("LANGSMITH_SAMPLING_RATE") is not None
+                and os.getenv("LANGSMITH_SAMPLING_RATE").strip().isdigit()  # type: ignore
+                else 1.0
+            )
+            random_sample = random.random()
+            if random_sample > sampling_rate:
+                verbose_logger.info(
+                    "Skipping Langsmith logging. Sampling rate={}, random_sample={}".format(
+                        sampling_rate, random_sample
+                    )
+                )
+                return  # Skip logging
+            verbose_logger.debug(
+                "Langsmith Sync Layer Logging - kwargs: %s, response_obj: %s",
+                kwargs,
+                response_obj,
+            )
+            credentials = self._get_credentials_to_use_for_request(kwargs=kwargs)
+            data = self._prepare_log_data(
+                kwargs=kwargs,
+                response_obj=response_obj,
+                start_time=start_time,
+                end_time=end_time,
+                credentials=credentials,
+            )
+            self.log_queue.append(
+                LangsmithQueueObject(
+                    data=data,
+                    credentials=credentials,
+                )
+            )
+            verbose_logger.debug(
+                f"Langsmith, event added to queue. Will flush in {self.flush_interval} seconds..."
+            )
+
+            if len(self.log_queue) >= self.batch_size:
+                self._send_batch()
+
+        except Exception:
+            verbose_logger.exception("Langsmith Layer Error - log_success_event error")
+
+    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
+        try:
+            sampling_rate = self.sampling_rate
+            random_sample = random.random()
+            if random_sample > sampling_rate:
+                verbose_logger.info(
+                    "Skipping Langsmith logging. Sampling rate={}, random_sample={}".format(
+                        sampling_rate, random_sample
+                    )
+                )
+                return  # Skip logging
+            verbose_logger.debug(
+                "Langsmith Async Layer Logging - kwargs: %s, response_obj: %s",
+                kwargs,
+                response_obj,
+            )
+            credentials = self._get_credentials_to_use_for_request(kwargs=kwargs)
+            data = self._prepare_log_data(
+                kwargs=kwargs,
+                response_obj=response_obj,
+                start_time=start_time,
+                end_time=end_time,
+                credentials=credentials,
+            )
+            self.log_queue.append(
+                LangsmithQueueObject(
+                    data=data,
+                    credentials=credentials,
+                )
+            )
+            verbose_logger.debug(
+                "Langsmith logging: queue length %s, batch size %s",
+                len(self.log_queue),
+                self.batch_size,
+            )
+            if len(self.log_queue) >= self.batch_size:
+                await self.flush_queue()
+        except Exception:
+            verbose_logger.exception(
+                "Langsmith Layer Error - error logging async success event."
+            )
+
+    async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
+        sampling_rate = self.sampling_rate
+        random_sample = random.random()
+        if random_sample > sampling_rate:
+            verbose_logger.info(
+                "Skipping Langsmith logging. Sampling rate={}, random_sample={}".format(
+                    sampling_rate, random_sample
+                )
+            )
+            return  # Skip logging
+        verbose_logger.info("Langsmith Failure Event Logging!")
+        try:
+            credentials = self._get_credentials_to_use_for_request(kwargs=kwargs)
+            data = self._prepare_log_data(
+                kwargs=kwargs,
+                response_obj=response_obj,
+                start_time=start_time,
+                end_time=end_time,
+                credentials=credentials,
+            )
+            self.log_queue.append(
+                LangsmithQueueObject(
+                    data=data,
+                    credentials=credentials,
+                )
+            )
+            verbose_logger.debug(
+                "Langsmith logging: queue length %s, batch size %s",
+                len(self.log_queue),
+                self.batch_size,
+            )
+            if len(self.log_queue) >= self.batch_size:
+                await self.flush_queue()
+        except Exception:
+            verbose_logger.exception(
+                "Langsmith Layer Error - error logging async failure event."
+            )
+
+    async def async_send_batch(self):
+        """
+        Handles sending batches of runs to Langsmith
+
+        self.log_queue contains LangsmithQueueObjects
+            Each LangsmithQueueObject has the following:
+                - "credentials" - credentials to use for the request (langsmith_api_key, langsmith_project, langsmith_base_url)
+                - "data" - data to log on to langsmith for the request
+
+
+        This function
+         - groups the queue objects by credentials
+         - loops through each unique credentials and sends batches to Langsmith
+
+
+        This was added to support key/team based logging on langsmith
+        """
+        if not self.log_queue:
+            return
+
+        batch_groups = self._group_batches_by_credentials()
+        for batch_group in batch_groups.values():
+            await self._log_batch_on_langsmith(
+                credentials=batch_group.credentials,
+                queue_objects=batch_group.queue_objects,
+            )
+
+    def _add_endpoint_to_url(
+        self, url: str, endpoint: str, api_version: str = "/api/v1"
+    ) -> str:
+        if api_version not in url:
+            url = f"{url.rstrip('/')}{api_version}"
+
+        if url.endswith("/"):
+            return f"{url}{endpoint}"
+        return f"{url}/{endpoint}"
+
+    async def _log_batch_on_langsmith(
+        self,
+        credentials: LangsmithCredentialsObject,
+        queue_objects: List[LangsmithQueueObject],
+    ):
+        """
+        Logs a batch of runs to Langsmith
+        sends runs to /batch endpoint for the given credentials
+
+        Args:
+            credentials: LangsmithCredentialsObject
+            queue_objects: List[LangsmithQueueObject]
+
+        Returns: None
+
+        Raises: Does not raise an exception, will only verbose_logger.exception()
+        """
+        langsmith_api_base = credentials["LANGSMITH_BASE_URL"]
+        langsmith_api_key = credentials["LANGSMITH_API_KEY"]
+        url = self._add_endpoint_to_url(langsmith_api_base, "runs/batch")
+        headers = {"x-api-key": langsmith_api_key}
+        elements_to_log = [queue_object["data"] for queue_object in queue_objects]
+
+        try:
+            verbose_logger.debug(
+                "Sending batch of %s runs to Langsmith", len(elements_to_log)
+            )
+            response = await self.async_httpx_client.post(
+                url=url,
+                json={"post": elements_to_log},
+                headers=headers,
+            )
+            response.raise_for_status()
+
+            if response.status_code >= 300:
+                verbose_logger.error(
+                    f"Langsmith Error: {response.status_code} - {response.text}"
+                )
+            else:
+                verbose_logger.debug(
+                    f"Batch of {len(self.log_queue)} runs successfully created"
+                )
+        except httpx.HTTPStatusError as e:
+            verbose_logger.exception(
+                f"Langsmith HTTP Error: {e.response.status_code} - {e.response.text}"
+            )
+        except Exception:
+            verbose_logger.exception(
+                f"Langsmith Layer Error - {traceback.format_exc()}"
+            )
+
+    def _group_batches_by_credentials(self) -> Dict[CredentialsKey, BatchGroup]:
+        """Groups queue objects by credentials using a proper key structure"""
+        log_queue_by_credentials: Dict[CredentialsKey, BatchGroup] = {}
+
+        for queue_object in self.log_queue:
+            credentials = queue_object["credentials"]
+            key = CredentialsKey(
+                api_key=credentials["LANGSMITH_API_KEY"],
+                project=credentials["LANGSMITH_PROJECT"],
+                base_url=credentials["LANGSMITH_BASE_URL"],
+            )
+
+            if key not in log_queue_by_credentials:
+                log_queue_by_credentials[key] = BatchGroup(
+                    credentials=credentials, queue_objects=[]
+                )
+
+            log_queue_by_credentials[key].queue_objects.append(queue_object)
+
+        return log_queue_by_credentials
+
+    def _get_credentials_to_use_for_request(
+        self, kwargs: Dict[str, Any]
+    ) -> LangsmithCredentialsObject:
+        """
+        Handles key/team based logging
+
+        If standard_callback_dynamic_params are provided, use those credentials.
+
+        Otherwise, use the default credentials.
+        """
+        standard_callback_dynamic_params: Optional[StandardCallbackDynamicParams] = (
+            kwargs.get("standard_callback_dynamic_params", None)
+        )
+        if standard_callback_dynamic_params is not None:
+            credentials = self.get_credentials_from_env(
+                langsmith_api_key=standard_callback_dynamic_params.get(
+                    "langsmith_api_key", None
+                ),
+                langsmith_project=standard_callback_dynamic_params.get(
+                    "langsmith_project", None
+                ),
+                langsmith_base_url=standard_callback_dynamic_params.get(
+                    "langsmith_base_url", None
+                ),
+            )
+        else:
+            credentials = self.default_credentials
+        return credentials
+
+    def _send_batch(self):
+        """Calls async_send_batch in an event loop"""
+        if not self.log_queue:
+            return
+
+        try:
+            # Try to get the existing event loop
+            loop = asyncio.get_event_loop()
+            if loop.is_running():
+                # If we're already in an event loop, create a task
+                asyncio.create_task(self.async_send_batch())
+            else:
+                # If no event loop is running, run the coroutine directly
+                loop.run_until_complete(self.async_send_batch())
+        except RuntimeError:
+            # If we can't get an event loop, create a new one
+            asyncio.run(self.async_send_batch())
+
+    def get_run_by_id(self, run_id):
+
+        langsmith_api_key = self.default_credentials["LANGSMITH_API_KEY"]
+
+        langsmith_api_base = self.default_credentials["LANGSMITH_BASE_URL"]
+
+        url = f"{langsmith_api_base}/runs/{run_id}"
+        response = litellm.module_level_client.get(
+            url=url,
+            headers={"x-api-key": langsmith_api_key},
+        )
+
+        return response.json()
+
+    def make_dot_order(self, run_id: str):
+        st = datetime.now(timezone.utc)
+        id_ = run_id
+        return st.strftime("%Y%m%dT%H%M%S%fZ") + str(id_)