diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py | 287 |
1 files changed, 287 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py b/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py new file mode 100644 index 00000000..1f4ca84d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py @@ -0,0 +1,287 @@ +""" +Call Hook for LiteLLM Proxy which allows Langfuse prompt management. +""" + +import os +from functools import lru_cache +from typing import TYPE_CHECKING, Any, List, Literal, Optional, Tuple, Union, cast + +from packaging.version import Version +from typing_extensions import TypeAlias + +from litellm.integrations.custom_logger import CustomLogger +from litellm.integrations.prompt_management_base import PromptManagementClient +from litellm.litellm_core_utils.asyncify import run_async_function +from litellm.types.llms.openai import AllMessageValues, ChatCompletionSystemMessage +from litellm.types.utils import StandardCallbackDynamicParams, StandardLoggingPayload + +from ...litellm_core_utils.specialty_caches.dynamic_logging_cache import ( + DynamicLoggingCache, +) +from ..prompt_management_base import PromptManagementBase +from .langfuse import LangFuseLogger +from .langfuse_handler import LangFuseHandler + +if TYPE_CHECKING: + from langfuse import Langfuse + from langfuse.client import ChatPromptClient, TextPromptClient + + LangfuseClass: TypeAlias = Langfuse + + PROMPT_CLIENT = Union[TextPromptClient, ChatPromptClient] +else: + PROMPT_CLIENT = Any + LangfuseClass = Any + +in_memory_dynamic_logger_cache = DynamicLoggingCache() + + +@lru_cache(maxsize=10) +def langfuse_client_init( + langfuse_public_key=None, + langfuse_secret=None, + langfuse_secret_key=None, + langfuse_host=None, + flush_interval=1, +) -> LangfuseClass: + """ + Initialize Langfuse client with caching to prevent multiple initializations. + + Args: + langfuse_public_key (str, optional): Public key for Langfuse. Defaults to None. + langfuse_secret (str, optional): Secret key for Langfuse. Defaults to None. + langfuse_host (str, optional): Host URL for Langfuse. Defaults to None. + flush_interval (int, optional): Flush interval in seconds. Defaults to 1. + + Returns: + Langfuse: Initialized Langfuse client instance + + Raises: + Exception: If langfuse package is not installed + """ + try: + import langfuse + from langfuse import Langfuse + except Exception as e: + raise Exception( + f"\033[91mLangfuse not installed, try running 'pip install langfuse' to fix this error: {e}\n\033[0m" + ) + + # Instance variables + + secret_key = ( + langfuse_secret or langfuse_secret_key or os.getenv("LANGFUSE_SECRET_KEY") + ) + public_key = langfuse_public_key or os.getenv("LANGFUSE_PUBLIC_KEY") + langfuse_host = langfuse_host or os.getenv( + "LANGFUSE_HOST", "https://cloud.langfuse.com" + ) + + if not ( + langfuse_host.startswith("http://") or langfuse_host.startswith("https://") + ): + # add http:// if unset, assume communicating over private network - e.g. render + langfuse_host = "http://" + langfuse_host + + langfuse_release = os.getenv("LANGFUSE_RELEASE") + langfuse_debug = os.getenv("LANGFUSE_DEBUG") + + parameters = { + "public_key": public_key, + "secret_key": secret_key, + "host": langfuse_host, + "release": langfuse_release, + "debug": langfuse_debug, + "flush_interval": LangFuseLogger._get_langfuse_flush_interval( + flush_interval + ), # flush interval in seconds + } + + if Version(langfuse.version.__version__) >= Version("2.6.0"): + parameters["sdk_integration"] = "litellm" + + client = Langfuse(**parameters) + + return client + + +class LangfusePromptManagement(LangFuseLogger, PromptManagementBase, CustomLogger): + def __init__( + self, + langfuse_public_key=None, + langfuse_secret=None, + langfuse_host=None, + flush_interval=1, + ): + import langfuse + + self.langfuse_sdk_version = langfuse.version.__version__ + self.Langfuse = langfuse_client_init( + langfuse_public_key=langfuse_public_key, + langfuse_secret=langfuse_secret, + langfuse_host=langfuse_host, + flush_interval=flush_interval, + ) + + @property + def integration_name(self): + return "langfuse" + + def _get_prompt_from_id( + self, langfuse_prompt_id: str, langfuse_client: LangfuseClass + ) -> PROMPT_CLIENT: + return langfuse_client.get_prompt(langfuse_prompt_id) + + def _compile_prompt( + self, + langfuse_prompt_client: PROMPT_CLIENT, + langfuse_prompt_variables: Optional[dict], + call_type: Union[Literal["completion"], Literal["text_completion"]], + ) -> List[AllMessageValues]: + compiled_prompt: Optional[Union[str, list]] = None + + if langfuse_prompt_variables is None: + langfuse_prompt_variables = {} + + compiled_prompt = langfuse_prompt_client.compile(**langfuse_prompt_variables) + + if isinstance(compiled_prompt, str): + compiled_prompt = [ + ChatCompletionSystemMessage(role="system", content=compiled_prompt) + ] + else: + compiled_prompt = cast(List[AllMessageValues], compiled_prompt) + + return compiled_prompt + + def _get_optional_params_from_langfuse( + self, langfuse_prompt_client: PROMPT_CLIENT + ) -> dict: + config = langfuse_prompt_client.config + optional_params = {} + for k, v in config.items(): + if k != "model": + optional_params[k] = v + return optional_params + + async def async_get_chat_completion_prompt( + self, + model: str, + messages: List[AllMessageValues], + non_default_params: dict, + prompt_id: str, + prompt_variables: Optional[dict], + dynamic_callback_params: StandardCallbackDynamicParams, + ) -> Tuple[ + str, + List[AllMessageValues], + dict, + ]: + return self.get_chat_completion_prompt( + model, + messages, + non_default_params, + prompt_id, + prompt_variables, + dynamic_callback_params, + ) + + def should_run_prompt_management( + self, + prompt_id: str, + dynamic_callback_params: StandardCallbackDynamicParams, + ) -> bool: + langfuse_client = langfuse_client_init( + langfuse_public_key=dynamic_callback_params.get("langfuse_public_key"), + langfuse_secret=dynamic_callback_params.get("langfuse_secret"), + langfuse_secret_key=dynamic_callback_params.get("langfuse_secret_key"), + langfuse_host=dynamic_callback_params.get("langfuse_host"), + ) + langfuse_prompt_client = self._get_prompt_from_id( + langfuse_prompt_id=prompt_id, langfuse_client=langfuse_client + ) + return langfuse_prompt_client is not None + + def _compile_prompt_helper( + self, + prompt_id: str, + prompt_variables: Optional[dict], + dynamic_callback_params: StandardCallbackDynamicParams, + ) -> PromptManagementClient: + langfuse_client = langfuse_client_init( + langfuse_public_key=dynamic_callback_params.get("langfuse_public_key"), + langfuse_secret=dynamic_callback_params.get("langfuse_secret"), + langfuse_secret_key=dynamic_callback_params.get("langfuse_secret_key"), + langfuse_host=dynamic_callback_params.get("langfuse_host"), + ) + langfuse_prompt_client = self._get_prompt_from_id( + langfuse_prompt_id=prompt_id, langfuse_client=langfuse_client + ) + + ## SET PROMPT + compiled_prompt = self._compile_prompt( + langfuse_prompt_client=langfuse_prompt_client, + langfuse_prompt_variables=prompt_variables, + call_type="completion", + ) + + template_model = langfuse_prompt_client.config.get("model") + + template_optional_params = self._get_optional_params_from_langfuse( + langfuse_prompt_client + ) + + return PromptManagementClient( + prompt_id=prompt_id, + prompt_template=compiled_prompt, + prompt_template_model=template_model, + prompt_template_optional_params=template_optional_params, + completed_messages=None, + ) + + def log_success_event(self, kwargs, response_obj, start_time, end_time): + return run_async_function( + self.async_log_success_event, kwargs, response_obj, start_time, end_time + ) + + async def async_log_success_event(self, kwargs, response_obj, start_time, end_time): + standard_callback_dynamic_params = kwargs.get( + "standard_callback_dynamic_params" + ) + langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request( + globalLangfuseLogger=self, + standard_callback_dynamic_params=standard_callback_dynamic_params, + in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, + ) + langfuse_logger_to_use.log_event_on_langfuse( + kwargs=kwargs, + response_obj=response_obj, + start_time=start_time, + end_time=end_time, + user_id=kwargs.get("user", None), + ) + + async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time): + standard_callback_dynamic_params = kwargs.get( + "standard_callback_dynamic_params" + ) + langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request( + globalLangfuseLogger=self, + standard_callback_dynamic_params=standard_callback_dynamic_params, + in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache, + ) + standard_logging_object = cast( + Optional[StandardLoggingPayload], + kwargs.get("standard_logging_object", None), + ) + if standard_logging_object is None: + return + langfuse_logger_to_use.log_event_on_langfuse( + start_time=start_time, + end_time=end_time, + response_obj=None, + user_id=kwargs.get("user", None), + status_message=standard_logging_object["error_str"], + level="ERROR", + kwargs=kwargs, + ) |