diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/integrations/lago.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/lago.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/integrations/lago.py | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/lago.py b/.venv/lib/python3.12/site-packages/litellm/integrations/lago.py new file mode 100644 index 00000000..5dfb1ce0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/integrations/lago.py @@ -0,0 +1,202 @@ +# What is this? +## On Success events log cost to Lago - https://github.com/BerriAI/litellm/issues/3639 + +import json +import os +import uuid +from typing import Literal, Optional + +import httpx + +import litellm +from litellm._logging import verbose_logger +from litellm.integrations.custom_logger import CustomLogger +from litellm.llms.custom_httpx.http_handler import ( + HTTPHandler, + get_async_httpx_client, + httpxSpecialProvider, +) + + +def get_utc_datetime(): + import datetime as dt + from datetime import datetime + + if hasattr(dt, "UTC"): + return datetime.now(dt.UTC) # type: ignore + else: + return datetime.utcnow() # type: ignore + + +class LagoLogger(CustomLogger): + def __init__(self) -> None: + super().__init__() + self.validate_environment() + self.async_http_handler = get_async_httpx_client( + llm_provider=httpxSpecialProvider.LoggingCallback + ) + self.sync_http_handler = HTTPHandler() + + def validate_environment(self): + """ + Expects + LAGO_API_BASE, + LAGO_API_KEY, + LAGO_API_EVENT_CODE, + + Optional: + LAGO_API_CHARGE_BY + + in the environment + """ + missing_keys = [] + if os.getenv("LAGO_API_KEY", None) is None: + missing_keys.append("LAGO_API_KEY") + + if os.getenv("LAGO_API_BASE", None) is None: + missing_keys.append("LAGO_API_BASE") + + if os.getenv("LAGO_API_EVENT_CODE", None) is None: + missing_keys.append("LAGO_API_EVENT_CODE") + + if len(missing_keys) > 0: + raise Exception("Missing keys={} in environment.".format(missing_keys)) + + def _common_logic(self, kwargs: dict, response_obj) -> dict: + response_obj.get("id", kwargs.get("litellm_call_id")) + get_utc_datetime().isoformat() + cost = kwargs.get("response_cost", None) + model = kwargs.get("model") + usage = {} + + if ( + isinstance(response_obj, litellm.ModelResponse) + or isinstance(response_obj, litellm.EmbeddingResponse) + ) and hasattr(response_obj, "usage"): + usage = { + "prompt_tokens": response_obj["usage"].get("prompt_tokens", 0), + "completion_tokens": response_obj["usage"].get("completion_tokens", 0), + "total_tokens": response_obj["usage"].get("total_tokens"), + } + + litellm_params = kwargs.get("litellm_params", {}) or {} + proxy_server_request = litellm_params.get("proxy_server_request") or {} + end_user_id = proxy_server_request.get("body", {}).get("user", None) + user_id = litellm_params["metadata"].get("user_api_key_user_id", None) + team_id = litellm_params["metadata"].get("user_api_key_team_id", None) + litellm_params["metadata"].get("user_api_key_org_id", None) + + charge_by: Literal["end_user_id", "team_id", "user_id"] = "end_user_id" + external_customer_id: Optional[str] = None + + if os.getenv("LAGO_API_CHARGE_BY", None) is not None and isinstance( + os.environ["LAGO_API_CHARGE_BY"], str + ): + if os.environ["LAGO_API_CHARGE_BY"] in [ + "end_user_id", + "user_id", + "team_id", + ]: + charge_by = os.environ["LAGO_API_CHARGE_BY"] # type: ignore + else: + raise Exception("invalid LAGO_API_CHARGE_BY set") + + if charge_by == "end_user_id": + external_customer_id = end_user_id + elif charge_by == "team_id": + external_customer_id = team_id + elif charge_by == "user_id": + external_customer_id = user_id + + if external_customer_id is None: + raise Exception( + "External Customer ID is not set. Charge_by={}. User_id={}. End_user_id={}. Team_id={}".format( + charge_by, user_id, end_user_id, team_id + ) + ) + + returned_val = { + "event": { + "transaction_id": str(uuid.uuid4()), + "external_subscription_id": external_customer_id, + "code": os.getenv("LAGO_API_EVENT_CODE"), + "properties": {"model": model, "response_cost": cost, **usage}, + } + } + + verbose_logger.debug( + "\033[91mLogged Lago Object:\n{}\033[0m\n".format(returned_val) + ) + return returned_val + + def log_success_event(self, kwargs, response_obj, start_time, end_time): + _url = os.getenv("LAGO_API_BASE") + assert _url is not None and isinstance( + _url, str + ), "LAGO_API_BASE missing or not set correctly. LAGO_API_BASE={}".format(_url) + if _url.endswith("/"): + _url += "api/v1/events" + else: + _url += "/api/v1/events" + + api_key = os.getenv("LAGO_API_KEY") + + _data = self._common_logic(kwargs=kwargs, response_obj=response_obj) + _headers = { + "Content-Type": "application/json", + "Authorization": "Bearer {}".format(api_key), + } + + try: + response = self.sync_http_handler.post( + url=_url, + data=json.dumps(_data), + headers=_headers, + ) + + response.raise_for_status() + except Exception as e: + error_response = getattr(e, "response", None) + if error_response is not None and hasattr(error_response, "text"): + verbose_logger.debug(f"\nError Message: {error_response.text}") + raise e + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + try: + verbose_logger.debug("ENTERS LAGO CALLBACK") + _url = os.getenv("LAGO_API_BASE") + assert _url is not None and isinstance( + _url, str + ), "LAGO_API_BASE missing or not set correctly. LAGO_API_BASE={}".format( + _url + ) + if _url.endswith("/"): + _url += "api/v1/events" + else: + _url += "/api/v1/events" + + api_key = os.getenv("LAGO_API_KEY") + + _data = self._common_logic(kwargs=kwargs, response_obj=response_obj) + _headers = { + "Content-Type": "application/json", + "Authorization": "Bearer {}".format(api_key), + } + except Exception as e: + raise e + + response: Optional[httpx.Response] = None + try: + response = await self.async_http_handler.post( + url=_url, + data=json.dumps(_data), + headers=_headers, + ) + + response.raise_for_status() + + verbose_logger.debug(f"Logged Lago Object: {response.text}") + except Exception as e: + if response is not None and hasattr(response, "text"): + verbose_logger.debug(f"\nError Message: {response.text}") + raise e |