aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py287
1 files changed, 287 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py b/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py
new file mode 100644
index 00000000..1f4ca84d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py
@@ -0,0 +1,287 @@
+"""
+Call Hook for LiteLLM Proxy which allows Langfuse prompt management.
+"""
+
+import os
+from functools import lru_cache
+from typing import TYPE_CHECKING, Any, List, Literal, Optional, Tuple, Union, cast
+
+from packaging.version import Version
+from typing_extensions import TypeAlias
+
+from litellm.integrations.custom_logger import CustomLogger
+from litellm.integrations.prompt_management_base import PromptManagementClient
+from litellm.litellm_core_utils.asyncify import run_async_function
+from litellm.types.llms.openai import AllMessageValues, ChatCompletionSystemMessage
+from litellm.types.utils import StandardCallbackDynamicParams, StandardLoggingPayload
+
+from ...litellm_core_utils.specialty_caches.dynamic_logging_cache import (
+ DynamicLoggingCache,
+)
+from ..prompt_management_base import PromptManagementBase
+from .langfuse import LangFuseLogger
+from .langfuse_handler import LangFuseHandler
+
+if TYPE_CHECKING:
+ from langfuse import Langfuse
+ from langfuse.client import ChatPromptClient, TextPromptClient
+
+ LangfuseClass: TypeAlias = Langfuse
+
+ PROMPT_CLIENT = Union[TextPromptClient, ChatPromptClient]
+else:
+ PROMPT_CLIENT = Any
+ LangfuseClass = Any
+
+in_memory_dynamic_logger_cache = DynamicLoggingCache()
+
+
+@lru_cache(maxsize=10)
+def langfuse_client_init(
+ langfuse_public_key=None,
+ langfuse_secret=None,
+ langfuse_secret_key=None,
+ langfuse_host=None,
+ flush_interval=1,
+) -> LangfuseClass:
+ """
+ Initialize Langfuse client with caching to prevent multiple initializations.
+
+ Args:
+ langfuse_public_key (str, optional): Public key for Langfuse. Defaults to None.
+ langfuse_secret (str, optional): Secret key for Langfuse. Defaults to None.
+ langfuse_host (str, optional): Host URL for Langfuse. Defaults to None.
+ flush_interval (int, optional): Flush interval in seconds. Defaults to 1.
+
+ Returns:
+ Langfuse: Initialized Langfuse client instance
+
+ Raises:
+ Exception: If langfuse package is not installed
+ """
+ try:
+ import langfuse
+ from langfuse import Langfuse
+ except Exception as e:
+ raise Exception(
+ f"\033[91mLangfuse not installed, try running 'pip install langfuse' to fix this error: {e}\n\033[0m"
+ )
+
+ # Instance variables
+
+ secret_key = (
+ langfuse_secret or langfuse_secret_key or os.getenv("LANGFUSE_SECRET_KEY")
+ )
+ public_key = langfuse_public_key or os.getenv("LANGFUSE_PUBLIC_KEY")
+ langfuse_host = langfuse_host or os.getenv(
+ "LANGFUSE_HOST", "https://cloud.langfuse.com"
+ )
+
+ if not (
+ langfuse_host.startswith("http://") or langfuse_host.startswith("https://")
+ ):
+ # add http:// if unset, assume communicating over private network - e.g. render
+ langfuse_host = "http://" + langfuse_host
+
+ langfuse_release = os.getenv("LANGFUSE_RELEASE")
+ langfuse_debug = os.getenv("LANGFUSE_DEBUG")
+
+ parameters = {
+ "public_key": public_key,
+ "secret_key": secret_key,
+ "host": langfuse_host,
+ "release": langfuse_release,
+ "debug": langfuse_debug,
+ "flush_interval": LangFuseLogger._get_langfuse_flush_interval(
+ flush_interval
+ ), # flush interval in seconds
+ }
+
+ if Version(langfuse.version.__version__) >= Version("2.6.0"):
+ parameters["sdk_integration"] = "litellm"
+
+ client = Langfuse(**parameters)
+
+ return client
+
+
+class LangfusePromptManagement(LangFuseLogger, PromptManagementBase, CustomLogger):
+ def __init__(
+ self,
+ langfuse_public_key=None,
+ langfuse_secret=None,
+ langfuse_host=None,
+ flush_interval=1,
+ ):
+ import langfuse
+
+ self.langfuse_sdk_version = langfuse.version.__version__
+ self.Langfuse = langfuse_client_init(
+ langfuse_public_key=langfuse_public_key,
+ langfuse_secret=langfuse_secret,
+ langfuse_host=langfuse_host,
+ flush_interval=flush_interval,
+ )
+
+ @property
+ def integration_name(self):
+ return "langfuse"
+
+ def _get_prompt_from_id(
+ self, langfuse_prompt_id: str, langfuse_client: LangfuseClass
+ ) -> PROMPT_CLIENT:
+ return langfuse_client.get_prompt(langfuse_prompt_id)
+
+ def _compile_prompt(
+ self,
+ langfuse_prompt_client: PROMPT_CLIENT,
+ langfuse_prompt_variables: Optional[dict],
+ call_type: Union[Literal["completion"], Literal["text_completion"]],
+ ) -> List[AllMessageValues]:
+ compiled_prompt: Optional[Union[str, list]] = None
+
+ if langfuse_prompt_variables is None:
+ langfuse_prompt_variables = {}
+
+ compiled_prompt = langfuse_prompt_client.compile(**langfuse_prompt_variables)
+
+ if isinstance(compiled_prompt, str):
+ compiled_prompt = [
+ ChatCompletionSystemMessage(role="system", content=compiled_prompt)
+ ]
+ else:
+ compiled_prompt = cast(List[AllMessageValues], compiled_prompt)
+
+ return compiled_prompt
+
+ def _get_optional_params_from_langfuse(
+ self, langfuse_prompt_client: PROMPT_CLIENT
+ ) -> dict:
+ config = langfuse_prompt_client.config
+ optional_params = {}
+ for k, v in config.items():
+ if k != "model":
+ optional_params[k] = v
+ return optional_params
+
+ async def async_get_chat_completion_prompt(
+ self,
+ model: str,
+ messages: List[AllMessageValues],
+ non_default_params: dict,
+ prompt_id: str,
+ prompt_variables: Optional[dict],
+ dynamic_callback_params: StandardCallbackDynamicParams,
+ ) -> Tuple[
+ str,
+ List[AllMessageValues],
+ dict,
+ ]:
+ return self.get_chat_completion_prompt(
+ model,
+ messages,
+ non_default_params,
+ prompt_id,
+ prompt_variables,
+ dynamic_callback_params,
+ )
+
+ def should_run_prompt_management(
+ self,
+ prompt_id: str,
+ dynamic_callback_params: StandardCallbackDynamicParams,
+ ) -> bool:
+ langfuse_client = langfuse_client_init(
+ langfuse_public_key=dynamic_callback_params.get("langfuse_public_key"),
+ langfuse_secret=dynamic_callback_params.get("langfuse_secret"),
+ langfuse_secret_key=dynamic_callback_params.get("langfuse_secret_key"),
+ langfuse_host=dynamic_callback_params.get("langfuse_host"),
+ )
+ langfuse_prompt_client = self._get_prompt_from_id(
+ langfuse_prompt_id=prompt_id, langfuse_client=langfuse_client
+ )
+ return langfuse_prompt_client is not None
+
+ def _compile_prompt_helper(
+ self,
+ prompt_id: str,
+ prompt_variables: Optional[dict],
+ dynamic_callback_params: StandardCallbackDynamicParams,
+ ) -> PromptManagementClient:
+ langfuse_client = langfuse_client_init(
+ langfuse_public_key=dynamic_callback_params.get("langfuse_public_key"),
+ langfuse_secret=dynamic_callback_params.get("langfuse_secret"),
+ langfuse_secret_key=dynamic_callback_params.get("langfuse_secret_key"),
+ langfuse_host=dynamic_callback_params.get("langfuse_host"),
+ )
+ langfuse_prompt_client = self._get_prompt_from_id(
+ langfuse_prompt_id=prompt_id, langfuse_client=langfuse_client
+ )
+
+ ## SET PROMPT
+ compiled_prompt = self._compile_prompt(
+ langfuse_prompt_client=langfuse_prompt_client,
+ langfuse_prompt_variables=prompt_variables,
+ call_type="completion",
+ )
+
+ template_model = langfuse_prompt_client.config.get("model")
+
+ template_optional_params = self._get_optional_params_from_langfuse(
+ langfuse_prompt_client
+ )
+
+ return PromptManagementClient(
+ prompt_id=prompt_id,
+ prompt_template=compiled_prompt,
+ prompt_template_model=template_model,
+ prompt_template_optional_params=template_optional_params,
+ completed_messages=None,
+ )
+
+ def log_success_event(self, kwargs, response_obj, start_time, end_time):
+ return run_async_function(
+ self.async_log_success_event, kwargs, response_obj, start_time, end_time
+ )
+
+ async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
+ standard_callback_dynamic_params = kwargs.get(
+ "standard_callback_dynamic_params"
+ )
+ langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request(
+ globalLangfuseLogger=self,
+ standard_callback_dynamic_params=standard_callback_dynamic_params,
+ in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
+ )
+ langfuse_logger_to_use.log_event_on_langfuse(
+ kwargs=kwargs,
+ response_obj=response_obj,
+ start_time=start_time,
+ end_time=end_time,
+ user_id=kwargs.get("user", None),
+ )
+
+ async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
+ standard_callback_dynamic_params = kwargs.get(
+ "standard_callback_dynamic_params"
+ )
+ langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request(
+ globalLangfuseLogger=self,
+ standard_callback_dynamic_params=standard_callback_dynamic_params,
+ in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
+ )
+ standard_logging_object = cast(
+ Optional[StandardLoggingPayload],
+ kwargs.get("standard_logging_object", None),
+ )
+ if standard_logging_object is None:
+ return
+ langfuse_logger_to_use.log_event_on_langfuse(
+ start_time=start_time,
+ end_time=end_time,
+ response_obj=None,
+ user_id=kwargs.get("user", None),
+ status_message=standard_logging_object["error_str"],
+ level="ERROR",
+ kwargs=kwargs,
+ )