about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/lago.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/lago.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/lago.py202
1 files changed, 202 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/lago.py b/.venv/lib/python3.12/site-packages/litellm/integrations/lago.py
new file mode 100644
index 00000000..5dfb1ce0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/lago.py
@@ -0,0 +1,202 @@
+# What is this?
+## On Success events log cost to Lago - https://github.com/BerriAI/litellm/issues/3639
+
+import json
+import os
+import uuid
+from typing import Literal, Optional
+
+import httpx
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.integrations.custom_logger import CustomLogger
+from litellm.llms.custom_httpx.http_handler import (
+    HTTPHandler,
+    get_async_httpx_client,
+    httpxSpecialProvider,
+)
+
+
+def get_utc_datetime():
+    import datetime as dt
+    from datetime import datetime
+
+    if hasattr(dt, "UTC"):
+        return datetime.now(dt.UTC)  # type: ignore
+    else:
+        return datetime.utcnow()  # type: ignore
+
+
+class LagoLogger(CustomLogger):
+    def __init__(self) -> None:
+        super().__init__()
+        self.validate_environment()
+        self.async_http_handler = get_async_httpx_client(
+            llm_provider=httpxSpecialProvider.LoggingCallback
+        )
+        self.sync_http_handler = HTTPHandler()
+
+    def validate_environment(self):
+        """
+        Expects
+        LAGO_API_BASE,
+        LAGO_API_KEY,
+        LAGO_API_EVENT_CODE,
+
+        Optional:
+        LAGO_API_CHARGE_BY
+
+        in the environment
+        """
+        missing_keys = []
+        if os.getenv("LAGO_API_KEY", None) is None:
+            missing_keys.append("LAGO_API_KEY")
+
+        if os.getenv("LAGO_API_BASE", None) is None:
+            missing_keys.append("LAGO_API_BASE")
+
+        if os.getenv("LAGO_API_EVENT_CODE", None) is None:
+            missing_keys.append("LAGO_API_EVENT_CODE")
+
+        if len(missing_keys) > 0:
+            raise Exception("Missing keys={} in environment.".format(missing_keys))
+
+    def _common_logic(self, kwargs: dict, response_obj) -> dict:
+        response_obj.get("id", kwargs.get("litellm_call_id"))
+        get_utc_datetime().isoformat()
+        cost = kwargs.get("response_cost", None)
+        model = kwargs.get("model")
+        usage = {}
+
+        if (
+            isinstance(response_obj, litellm.ModelResponse)
+            or isinstance(response_obj, litellm.EmbeddingResponse)
+        ) and hasattr(response_obj, "usage"):
+            usage = {
+                "prompt_tokens": response_obj["usage"].get("prompt_tokens", 0),
+                "completion_tokens": response_obj["usage"].get("completion_tokens", 0),
+                "total_tokens": response_obj["usage"].get("total_tokens"),
+            }
+
+        litellm_params = kwargs.get("litellm_params", {}) or {}
+        proxy_server_request = litellm_params.get("proxy_server_request") or {}
+        end_user_id = proxy_server_request.get("body", {}).get("user", None)
+        user_id = litellm_params["metadata"].get("user_api_key_user_id", None)
+        team_id = litellm_params["metadata"].get("user_api_key_team_id", None)
+        litellm_params["metadata"].get("user_api_key_org_id", None)
+
+        charge_by: Literal["end_user_id", "team_id", "user_id"] = "end_user_id"
+        external_customer_id: Optional[str] = None
+
+        if os.getenv("LAGO_API_CHARGE_BY", None) is not None and isinstance(
+            os.environ["LAGO_API_CHARGE_BY"], str
+        ):
+            if os.environ["LAGO_API_CHARGE_BY"] in [
+                "end_user_id",
+                "user_id",
+                "team_id",
+            ]:
+                charge_by = os.environ["LAGO_API_CHARGE_BY"]  # type: ignore
+            else:
+                raise Exception("invalid LAGO_API_CHARGE_BY set")
+
+        if charge_by == "end_user_id":
+            external_customer_id = end_user_id
+        elif charge_by == "team_id":
+            external_customer_id = team_id
+        elif charge_by == "user_id":
+            external_customer_id = user_id
+
+        if external_customer_id is None:
+            raise Exception(
+                "External Customer ID is not set. Charge_by={}. User_id={}. End_user_id={}. Team_id={}".format(
+                    charge_by, user_id, end_user_id, team_id
+                )
+            )
+
+        returned_val = {
+            "event": {
+                "transaction_id": str(uuid.uuid4()),
+                "external_subscription_id": external_customer_id,
+                "code": os.getenv("LAGO_API_EVENT_CODE"),
+                "properties": {"model": model, "response_cost": cost, **usage},
+            }
+        }
+
+        verbose_logger.debug(
+            "\033[91mLogged Lago Object:\n{}\033[0m\n".format(returned_val)
+        )
+        return returned_val
+
+    def log_success_event(self, kwargs, response_obj, start_time, end_time):
+        _url = os.getenv("LAGO_API_BASE")
+        assert _url is not None and isinstance(
+            _url, str
+        ), "LAGO_API_BASE missing or not set correctly. LAGO_API_BASE={}".format(_url)
+        if _url.endswith("/"):
+            _url += "api/v1/events"
+        else:
+            _url += "/api/v1/events"
+
+        api_key = os.getenv("LAGO_API_KEY")
+
+        _data = self._common_logic(kwargs=kwargs, response_obj=response_obj)
+        _headers = {
+            "Content-Type": "application/json",
+            "Authorization": "Bearer {}".format(api_key),
+        }
+
+        try:
+            response = self.sync_http_handler.post(
+                url=_url,
+                data=json.dumps(_data),
+                headers=_headers,
+            )
+
+            response.raise_for_status()
+        except Exception as e:
+            error_response = getattr(e, "response", None)
+            if error_response is not None and hasattr(error_response, "text"):
+                verbose_logger.debug(f"\nError Message: {error_response.text}")
+            raise e
+
+    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
+        try:
+            verbose_logger.debug("ENTERS LAGO CALLBACK")
+            _url = os.getenv("LAGO_API_BASE")
+            assert _url is not None and isinstance(
+                _url, str
+            ), "LAGO_API_BASE missing or not set correctly. LAGO_API_BASE={}".format(
+                _url
+            )
+            if _url.endswith("/"):
+                _url += "api/v1/events"
+            else:
+                _url += "/api/v1/events"
+
+            api_key = os.getenv("LAGO_API_KEY")
+
+            _data = self._common_logic(kwargs=kwargs, response_obj=response_obj)
+            _headers = {
+                "Content-Type": "application/json",
+                "Authorization": "Bearer {}".format(api_key),
+            }
+        except Exception as e:
+            raise e
+
+        response: Optional[httpx.Response] = None
+        try:
+            response = await self.async_http_handler.post(
+                url=_url,
+                data=json.dumps(_data),
+                headers=_headers,
+            )
+
+            response.raise_for_status()
+
+            verbose_logger.debug(f"Logged Lago Object: {response.text}")
+        except Exception as e:
+            if response is not None and hasattr(response, "text"):
+                verbose_logger.debug(f"\nError Message: {response.text}")
+            raise e