diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/langsmith.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/integrations/langsmith.py | 500 |
1 files changed, 500 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/langsmith.py b/.venv/lib/python3.12/site-packages/litellm/integrations/langsmith.py new file mode 100644 index 00000000..1ef90c18 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/integrations/langsmith.py @@ -0,0 +1,500 @@ +#### What this does #### +# On success, logs events to Langsmith +import asyncio +import os +import random +import traceback +import types +import uuid +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional + +import httpx +from pydantic import BaseModel # type: ignore + +import litellm +from litellm._logging import verbose_logger +from litellm.integrations.custom_batch_logger import CustomBatchLogger +from litellm.llms.custom_httpx.http_handler import ( + get_async_httpx_client, + httpxSpecialProvider, +) +from litellm.types.integrations.langsmith import * +from litellm.types.utils import StandardCallbackDynamicParams, StandardLoggingPayload + + +def is_serializable(value): + non_serializable_types = ( + types.CoroutineType, + types.FunctionType, + types.GeneratorType, + BaseModel, + ) + return not isinstance(value, non_serializable_types) + + +class LangsmithLogger(CustomBatchLogger): + def __init__( + self, + langsmith_api_key: Optional[str] = None, + langsmith_project: Optional[str] = None, + langsmith_base_url: Optional[str] = None, + **kwargs, + ): + self.default_credentials = self.get_credentials_from_env( + langsmith_api_key=langsmith_api_key, + langsmith_project=langsmith_project, + langsmith_base_url=langsmith_base_url, + ) + self.sampling_rate: float = ( + float(os.getenv("LANGSMITH_SAMPLING_RATE")) # type: ignore + if os.getenv("LANGSMITH_SAMPLING_RATE") is not None + and os.getenv("LANGSMITH_SAMPLING_RATE").strip().isdigit() # type: ignore + else 1.0 + ) + self.langsmith_default_run_name = os.getenv( + "LANGSMITH_DEFAULT_RUN_NAME", "LLMRun" + ) + self.async_httpx_client = get_async_httpx_client( + llm_provider=httpxSpecialProvider.LoggingCallback + ) + _batch_size = ( + os.getenv("LANGSMITH_BATCH_SIZE", None) or litellm.langsmith_batch_size + ) + if _batch_size: + self.batch_size = int(_batch_size) + self.log_queue: List[LangsmithQueueObject] = [] + asyncio.create_task(self.periodic_flush()) + self.flush_lock = asyncio.Lock() + + super().__init__(**kwargs, flush_lock=self.flush_lock) + + def get_credentials_from_env( + self, + langsmith_api_key: Optional[str] = None, + langsmith_project: Optional[str] = None, + langsmith_base_url: Optional[str] = None, + ) -> LangsmithCredentialsObject: + + _credentials_api_key = langsmith_api_key or os.getenv("LANGSMITH_API_KEY") + if _credentials_api_key is None: + raise Exception( + "Invalid Langsmith API Key given. _credentials_api_key=None." + ) + _credentials_project = ( + langsmith_project or os.getenv("LANGSMITH_PROJECT") or "litellm-completion" + ) + if _credentials_project is None: + raise Exception( + "Invalid Langsmith API Key given. _credentials_project=None." + ) + _credentials_base_url = ( + langsmith_base_url + or os.getenv("LANGSMITH_BASE_URL") + or "https://api.smith.langchain.com" + ) + if _credentials_base_url is None: + raise Exception( + "Invalid Langsmith API Key given. _credentials_base_url=None." + ) + + return LangsmithCredentialsObject( + LANGSMITH_API_KEY=_credentials_api_key, + LANGSMITH_BASE_URL=_credentials_base_url, + LANGSMITH_PROJECT=_credentials_project, + ) + + def _prepare_log_data( + self, + kwargs, + response_obj, + start_time, + end_time, + credentials: LangsmithCredentialsObject, + ): + try: + _litellm_params = kwargs.get("litellm_params", {}) or {} + metadata = _litellm_params.get("metadata", {}) or {} + project_name = metadata.get( + "project_name", credentials["LANGSMITH_PROJECT"] + ) + run_name = metadata.get("run_name", self.langsmith_default_run_name) + run_id = metadata.get("id", metadata.get("run_id", None)) + parent_run_id = metadata.get("parent_run_id", None) + trace_id = metadata.get("trace_id", None) + session_id = metadata.get("session_id", None) + dotted_order = metadata.get("dotted_order", None) + verbose_logger.debug( + f"Langsmith Logging - project_name: {project_name}, run_name {run_name}" + ) + + # Ensure everything in the payload is converted to str + payload: Optional[StandardLoggingPayload] = kwargs.get( + "standard_logging_object", None + ) + + if payload is None: + raise Exception("Error logging request payload. Payload=none.") + + metadata = payload[ + "metadata" + ] # ensure logged metadata is json serializable + + data = { + "name": run_name, + "run_type": "llm", # this should always be llm, since litellm always logs llm calls. Langsmith allow us to log "chain" + "inputs": payload, + "outputs": payload["response"], + "session_name": project_name, + "start_time": payload["startTime"], + "end_time": payload["endTime"], + "tags": payload["request_tags"], + "extra": metadata, + } + + if payload["error_str"] is not None and payload["status"] == "failure": + data["error"] = payload["error_str"] + + if run_id: + data["id"] = run_id + + if parent_run_id: + data["parent_run_id"] = parent_run_id + + if trace_id: + data["trace_id"] = trace_id + + if session_id: + data["session_id"] = session_id + + if dotted_order: + data["dotted_order"] = dotted_order + + run_id: Optional[str] = data.get("id") # type: ignore + if "id" not in data or data["id"] is None: + """ + for /batch langsmith requires id, trace_id and dotted_order passed as params + """ + run_id = str(uuid.uuid4()) + + data["id"] = run_id + + if ( + "trace_id" not in data + or data["trace_id"] is None + and (run_id is not None and isinstance(run_id, str)) + ): + data["trace_id"] = run_id + + if ( + "dotted_order" not in data + or data["dotted_order"] is None + and (run_id is not None and isinstance(run_id, str)) + ): + data["dotted_order"] = self.make_dot_order(run_id=run_id) # type: ignore + + verbose_logger.debug("Langsmith Logging data on langsmith: %s", data) + + return data + except Exception: + raise + + def log_success_event(self, kwargs, response_obj, start_time, end_time): + try: + sampling_rate = ( + float(os.getenv("LANGSMITH_SAMPLING_RATE")) # type: ignore + if os.getenv("LANGSMITH_SAMPLING_RATE") is not None + and os.getenv("LANGSMITH_SAMPLING_RATE").strip().isdigit() # type: ignore + else 1.0 + ) + random_sample = random.random() + if random_sample > sampling_rate: + verbose_logger.info( + "Skipping Langsmith logging. Sampling rate={}, random_sample={}".format( + sampling_rate, random_sample + ) + ) + return # Skip logging + verbose_logger.debug( + "Langsmith Sync Layer Logging - kwargs: %s, response_obj: %s", + kwargs, + response_obj, + ) + credentials = self._get_credentials_to_use_for_request(kwargs=kwargs) + data = self._prepare_log_data( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + credentials=credentials, + ) + self.log_queue.append( + LangsmithQueueObject( + data=data, + credentials=credentials, + ) + ) + verbose_logger.debug( + f"Langsmith, event added to queue. Will flush in {self.flush_interval} seconds..." + ) + + if len(self.log_queue) >= self.batch_size: + self._send_batch() + + except Exception: + verbose_logger.exception("Langsmith Layer Error - log_success_event error") + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + try: + sampling_rate = self.sampling_rate + random_sample = random.random() + if random_sample > sampling_rate: + verbose_logger.info( + "Skipping Langsmith logging. Sampling rate={}, random_sample={}".format( + sampling_rate, random_sample + ) + ) + return # Skip logging + verbose_logger.debug( + "Langsmith Async Layer Logging - kwargs: %s, response_obj: %s", + kwargs, + response_obj, + ) + credentials = self._get_credentials_to_use_for_request(kwargs=kwargs) + data = self._prepare_log_data( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + credentials=credentials, + ) + self.log_queue.append( + LangsmithQueueObject( + data=data, + credentials=credentials, + ) + ) + verbose_logger.debug( + "Langsmith logging: queue length %s, batch size %s", + len(self.log_queue), + self.batch_size, + ) + if len(self.log_queue) >= self.batch_size: + await self.flush_queue() + except Exception: + verbose_logger.exception( + "Langsmith Layer Error - error logging async success event." + ) + + async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): + sampling_rate = self.sampling_rate + random_sample = random.random() + if random_sample > sampling_rate: + verbose_logger.info( + "Skipping Langsmith logging. Sampling rate={}, random_sample={}".format( + sampling_rate, random_sample + ) + ) + return # Skip logging + verbose_logger.info("Langsmith Failure Event Logging!") + try: + credentials = self._get_credentials_to_use_for_request(kwargs=kwargs) + data = self._prepare_log_data( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + credentials=credentials, + ) + self.log_queue.append( + LangsmithQueueObject( + data=data, + credentials=credentials, + ) + ) + verbose_logger.debug( + "Langsmith logging: queue length %s, batch size %s", + len(self.log_queue), + self.batch_size, + ) + if len(self.log_queue) >= self.batch_size: + await self.flush_queue() + except Exception: + verbose_logger.exception( + "Langsmith Layer Error - error logging async failure event." + ) + + async def async_send_batch(self): + """ + Handles sending batches of runs to Langsmith + + self.log_queue contains LangsmithQueueObjects + Each LangsmithQueueObject has the following: + - "credentials" - credentials to use for the request (langsmith_api_key, langsmith_project, langsmith_base_url) + - "data" - data to log on to langsmith for the request + + + This function + - groups the queue objects by credentials + - loops through each unique credentials and sends batches to Langsmith + + + This was added to support key/team based logging on langsmith + """ + if not self.log_queue: + return + + batch_groups = self._group_batches_by_credentials() + for batch_group in batch_groups.values(): + await self._log_batch_on_langsmith( + credentials=batch_group.credentials, + queue_objects=batch_group.queue_objects, + ) + + def _add_endpoint_to_url( + self, url: str, endpoint: str, api_version: str = "/api/v1" + ) -> str: + if api_version not in url: + url = f"{url.rstrip('/')}{api_version}" + + if url.endswith("/"): + return f"{url}{endpoint}" + return f"{url}/{endpoint}" + + async def _log_batch_on_langsmith( + self, + credentials: LangsmithCredentialsObject, + queue_objects: List[LangsmithQueueObject], + ): + """ + Logs a batch of runs to Langsmith + sends runs to /batch endpoint for the given credentials + + Args: + credentials: LangsmithCredentialsObject + queue_objects: List[LangsmithQueueObject] + + Returns: None + + Raises: Does not raise an exception, will only verbose_logger.exception() + """ + langsmith_api_base = credentials["LANGSMITH_BASE_URL"] + langsmith_api_key = credentials["LANGSMITH_API_KEY"] + url = self._add_endpoint_to_url(langsmith_api_base, "runs/batch") + headers = {"x-api-key": langsmith_api_key} + elements_to_log = [queue_object["data"] for queue_object in queue_objects] + + try: + verbose_logger.debug( + "Sending batch of %s runs to Langsmith", len(elements_to_log) + ) + response = await self.async_httpx_client.post( + url=url, + json={"post": elements_to_log}, + headers=headers, + ) + response.raise_for_status() + + if response.status_code >= 300: + verbose_logger.error( + f"Langsmith Error: {response.status_code} - {response.text}" + ) + else: + verbose_logger.debug( + f"Batch of {len(self.log_queue)} runs successfully created" + ) + except httpx.HTTPStatusError as e: + verbose_logger.exception( + f"Langsmith HTTP Error: {e.response.status_code} - {e.response.text}" + ) + except Exception: + verbose_logger.exception( + f"Langsmith Layer Error - {traceback.format_exc()}" + ) + + def _group_batches_by_credentials(self) -> Dict[CredentialsKey, BatchGroup]: + """Groups queue objects by credentials using a proper key structure""" + log_queue_by_credentials: Dict[CredentialsKey, BatchGroup] = {} + + for queue_object in self.log_queue: + credentials = queue_object["credentials"] + key = CredentialsKey( + api_key=credentials["LANGSMITH_API_KEY"], + project=credentials["LANGSMITH_PROJECT"], + base_url=credentials["LANGSMITH_BASE_URL"], + ) + + if key not in log_queue_by_credentials: + log_queue_by_credentials[key] = BatchGroup( + credentials=credentials, queue_objects=[] + ) + + log_queue_by_credentials[key].queue_objects.append(queue_object) + + return log_queue_by_credentials + + def _get_credentials_to_use_for_request( + self, kwargs: Dict[str, Any] + ) -> LangsmithCredentialsObject: + """ + Handles key/team based logging + + If standard_callback_dynamic_params are provided, use those credentials. + + Otherwise, use the default credentials. + """ + standard_callback_dynamic_params: Optional[StandardCallbackDynamicParams] = ( + kwargs.get("standard_callback_dynamic_params", None) + ) + if standard_callback_dynamic_params is not None: + credentials = self.get_credentials_from_env( + langsmith_api_key=standard_callback_dynamic_params.get( + "langsmith_api_key", None + ), + langsmith_project=standard_callback_dynamic_params.get( + "langsmith_project", None + ), + langsmith_base_url=standard_callback_dynamic_params.get( + "langsmith_base_url", None + ), + ) + else: + credentials = self.default_credentials + return credentials + + def _send_batch(self): + """Calls async_send_batch in an event loop""" + if not self.log_queue: + return + + try: + # Try to get the existing event loop + loop = asyncio.get_event_loop() + if loop.is_running(): + # If we're already in an event loop, create a task + asyncio.create_task(self.async_send_batch()) + else: + # If no event loop is running, run the coroutine directly + loop.run_until_complete(self.async_send_batch()) + except RuntimeError: + # If we can't get an event loop, create a new one + asyncio.run(self.async_send_batch()) + + def get_run_by_id(self, run_id): + + langsmith_api_key = self.default_credentials["LANGSMITH_API_KEY"] + + langsmith_api_base = self.default_credentials["LANGSMITH_BASE_URL"] + + url = f"{langsmith_api_base}/runs/{run_id}" + response = litellm.module_level_client.get( + url=url, + headers={"x-api-key": langsmith_api_key}, + ) + + return response.json() + + def make_dot_order(self, run_id: str): + st = datetime.now(timezone.utc) + id_ = run_id + return st.strftime("%Y%m%dT%H%M%S%fZ") + str(id_) |