about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/integrations/langfuse
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/integrations/langfuse')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse.py955
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_handler.py169
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py287
3 files changed, 1411 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse.py b/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse.py
new file mode 100644
index 00000000..f990a316
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse.py
@@ -0,0 +1,955 @@
+#### What this does ####
+#    On success, logs events to Langfuse
+import copy
+import os
+import traceback
+from datetime import datetime
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
+
+from packaging.version import Version
+
+import litellm
+from litellm._logging import verbose_logger
+from litellm.litellm_core_utils.redact_messages import redact_user_api_key_info
+from litellm.llms.custom_httpx.http_handler import _get_httpx_client
+from litellm.secret_managers.main import str_to_bool
+from litellm.types.integrations.langfuse import *
+from litellm.types.llms.openai import HttpxBinaryResponseContent
+from litellm.types.utils import (
+    EmbeddingResponse,
+    ImageResponse,
+    ModelResponse,
+    RerankResponse,
+    StandardLoggingPayload,
+    StandardLoggingPromptManagementMetadata,
+    TextCompletionResponse,
+    TranscriptionResponse,
+)
+
+if TYPE_CHECKING:
+    from litellm.litellm_core_utils.litellm_logging import DynamicLoggingCache
+else:
+    DynamicLoggingCache = Any
+
+
+class LangFuseLogger:
+    # Class variables or attributes
+    def __init__(
+        self,
+        langfuse_public_key=None,
+        langfuse_secret=None,
+        langfuse_host=None,
+        flush_interval=1,
+    ):
+        try:
+            import langfuse
+            from langfuse import Langfuse
+        except Exception as e:
+            raise Exception(
+                f"\033[91mLangfuse not installed, try running 'pip install langfuse' to fix this error: {e}\n{traceback.format_exc()}\033[0m"
+            )
+        # Instance variables
+        self.secret_key = langfuse_secret or os.getenv("LANGFUSE_SECRET_KEY")
+        self.public_key = langfuse_public_key or os.getenv("LANGFUSE_PUBLIC_KEY")
+        self.langfuse_host = langfuse_host or os.getenv(
+            "LANGFUSE_HOST", "https://cloud.langfuse.com"
+        )
+        if not (
+            self.langfuse_host.startswith("http://")
+            or self.langfuse_host.startswith("https://")
+        ):
+            # add http:// if unset, assume communicating over private network - e.g. render
+            self.langfuse_host = "http://" + self.langfuse_host
+        self.langfuse_release = os.getenv("LANGFUSE_RELEASE")
+        self.langfuse_debug = os.getenv("LANGFUSE_DEBUG")
+        self.langfuse_flush_interval = LangFuseLogger._get_langfuse_flush_interval(
+            flush_interval
+        )
+        http_client = _get_httpx_client()
+        self.langfuse_client = http_client.client
+
+        parameters = {
+            "public_key": self.public_key,
+            "secret_key": self.secret_key,
+            "host": self.langfuse_host,
+            "release": self.langfuse_release,
+            "debug": self.langfuse_debug,
+            "flush_interval": self.langfuse_flush_interval,  # flush interval in seconds
+            "httpx_client": self.langfuse_client,
+        }
+        self.langfuse_sdk_version: str = langfuse.version.__version__
+
+        if Version(self.langfuse_sdk_version) >= Version("2.6.0"):
+            parameters["sdk_integration"] = "litellm"
+
+        self.Langfuse = Langfuse(**parameters)
+
+        # set the current langfuse project id in the environ
+        # this is used by Alerting to link to the correct project
+        try:
+            project_id = self.Langfuse.client.projects.get().data[0].id
+            os.environ["LANGFUSE_PROJECT_ID"] = project_id
+        except Exception:
+            project_id = None
+
+        if os.getenv("UPSTREAM_LANGFUSE_SECRET_KEY") is not None:
+            upstream_langfuse_debug = (
+                str_to_bool(self.upstream_langfuse_debug)
+                if self.upstream_langfuse_debug is not None
+                else None
+            )
+            self.upstream_langfuse_secret_key = os.getenv(
+                "UPSTREAM_LANGFUSE_SECRET_KEY"
+            )
+            self.upstream_langfuse_public_key = os.getenv(
+                "UPSTREAM_LANGFUSE_PUBLIC_KEY"
+            )
+            self.upstream_langfuse_host = os.getenv("UPSTREAM_LANGFUSE_HOST")
+            self.upstream_langfuse_release = os.getenv("UPSTREAM_LANGFUSE_RELEASE")
+            self.upstream_langfuse_debug = os.getenv("UPSTREAM_LANGFUSE_DEBUG")
+            self.upstream_langfuse = Langfuse(
+                public_key=self.upstream_langfuse_public_key,
+                secret_key=self.upstream_langfuse_secret_key,
+                host=self.upstream_langfuse_host,
+                release=self.upstream_langfuse_release,
+                debug=(
+                    upstream_langfuse_debug
+                    if upstream_langfuse_debug is not None
+                    else False
+                ),
+            )
+        else:
+            self.upstream_langfuse = None
+
+    @staticmethod
+    def add_metadata_from_header(litellm_params: dict, metadata: dict) -> dict:
+        """
+        Adds metadata from proxy request headers to Langfuse logging if keys start with "langfuse_"
+        and overwrites litellm_params.metadata if already included.
+
+        For example if you want to append your trace to an existing `trace_id` via header, send
+        `headers: { ..., langfuse_existing_trace_id: your-existing-trace-id }` via proxy request.
+        """
+        if litellm_params is None:
+            return metadata
+
+        if litellm_params.get("proxy_server_request") is None:
+            return metadata
+
+        if metadata is None:
+            metadata = {}
+
+        proxy_headers = (
+            litellm_params.get("proxy_server_request", {}).get("headers", {}) or {}
+        )
+
+        for metadata_param_key in proxy_headers:
+            if metadata_param_key.startswith("langfuse_"):
+                trace_param_key = metadata_param_key.replace("langfuse_", "", 1)
+                if trace_param_key in metadata:
+                    verbose_logger.warning(
+                        f"Overwriting Langfuse `{trace_param_key}` from request header"
+                    )
+                else:
+                    verbose_logger.debug(
+                        f"Found Langfuse `{trace_param_key}` in request header"
+                    )
+                metadata[trace_param_key] = proxy_headers.get(metadata_param_key)
+
+        return metadata
+
+    def log_event_on_langfuse(
+        self,
+        kwargs: dict,
+        response_obj: Union[
+            None,
+            dict,
+            EmbeddingResponse,
+            ModelResponse,
+            TextCompletionResponse,
+            ImageResponse,
+            TranscriptionResponse,
+            RerankResponse,
+            HttpxBinaryResponseContent,
+        ],
+        start_time: Optional[datetime] = None,
+        end_time: Optional[datetime] = None,
+        user_id: Optional[str] = None,
+        level: str = "DEFAULT",
+        status_message: Optional[str] = None,
+    ) -> dict:
+        """
+        Logs a success or error event on Langfuse
+        """
+        try:
+            verbose_logger.debug(
+                f"Langfuse Logging - Enters logging function for model {kwargs}"
+            )
+
+            # set default values for input/output for langfuse logging
+            input = None
+            output = None
+
+            litellm_params = kwargs.get("litellm_params", {})
+            litellm_call_id = kwargs.get("litellm_call_id", None)
+            metadata = (
+                litellm_params.get("metadata", {}) or {}
+            )  # if litellm_params['metadata'] == None
+            metadata = self.add_metadata_from_header(litellm_params, metadata)
+            optional_params = copy.deepcopy(kwargs.get("optional_params", {}))
+
+            prompt = {"messages": kwargs.get("messages")}
+
+            functions = optional_params.pop("functions", None)
+            tools = optional_params.pop("tools", None)
+            if functions is not None:
+                prompt["functions"] = functions
+            if tools is not None:
+                prompt["tools"] = tools
+
+            # langfuse only accepts str, int, bool, float for logging
+            for param, value in optional_params.items():
+                if not isinstance(value, (str, int, bool, float)):
+                    try:
+                        optional_params[param] = str(value)
+                    except Exception:
+                        # if casting value to str fails don't block logging
+                        pass
+
+            input, output = self._get_langfuse_input_output_content(
+                kwargs=kwargs,
+                response_obj=response_obj,
+                prompt=prompt,
+                level=level,
+                status_message=status_message,
+            )
+            verbose_logger.debug(
+                f"OUTPUT IN LANGFUSE: {output}; original: {response_obj}"
+            )
+            trace_id = None
+            generation_id = None
+            if self._is_langfuse_v2():
+                trace_id, generation_id = self._log_langfuse_v2(
+                    user_id=user_id,
+                    metadata=metadata,
+                    litellm_params=litellm_params,
+                    output=output,
+                    start_time=start_time,
+                    end_time=end_time,
+                    kwargs=kwargs,
+                    optional_params=optional_params,
+                    input=input,
+                    response_obj=response_obj,
+                    level=level,
+                    litellm_call_id=litellm_call_id,
+                )
+            elif response_obj is not None:
+                self._log_langfuse_v1(
+                    user_id=user_id,
+                    metadata=metadata,
+                    output=output,
+                    start_time=start_time,
+                    end_time=end_time,
+                    kwargs=kwargs,
+                    optional_params=optional_params,
+                    input=input,
+                    response_obj=response_obj,
+                )
+            verbose_logger.debug(
+                f"Langfuse Layer Logging - final response object: {response_obj}"
+            )
+            verbose_logger.info("Langfuse Layer Logging - logging success")
+
+            return {"trace_id": trace_id, "generation_id": generation_id}
+        except Exception as e:
+            verbose_logger.exception(
+                "Langfuse Layer Error(): Exception occured - {}".format(str(e))
+            )
+            return {"trace_id": None, "generation_id": None}
+
+    def _get_langfuse_input_output_content(
+        self,
+        kwargs: dict,
+        response_obj: Union[
+            None,
+            dict,
+            EmbeddingResponse,
+            ModelResponse,
+            TextCompletionResponse,
+            ImageResponse,
+            TranscriptionResponse,
+            RerankResponse,
+            HttpxBinaryResponseContent,
+        ],
+        prompt: dict,
+        level: str,
+        status_message: Optional[str],
+    ) -> Tuple[Optional[dict], Optional[Union[str, dict, list]]]:
+        """
+        Get the input and output content for Langfuse logging
+
+        Args:
+            kwargs: The keyword arguments passed to the function
+            response_obj: The response object returned by the function
+            prompt: The prompt used to generate the response
+            level: The level of the log message
+            status_message: The status message of the log message
+
+        Returns:
+            input: The input content for Langfuse logging
+            output: The output content for Langfuse logging
+        """
+        input = None
+        output: Optional[Union[str, dict, List[Any]]] = None
+        if (
+            level == "ERROR"
+            and status_message is not None
+            and isinstance(status_message, str)
+        ):
+            input = prompt
+            output = status_message
+        elif response_obj is not None and (
+            kwargs.get("call_type", None) == "embedding"
+            or isinstance(response_obj, litellm.EmbeddingResponse)
+        ):
+            input = prompt
+            output = None
+        elif response_obj is not None and isinstance(
+            response_obj, litellm.ModelResponse
+        ):
+            input = prompt
+            output = self._get_chat_content_for_langfuse(response_obj)
+        elif response_obj is not None and isinstance(
+            response_obj, litellm.HttpxBinaryResponseContent
+        ):
+            input = prompt
+            output = "speech-output"
+        elif response_obj is not None and isinstance(
+            response_obj, litellm.TextCompletionResponse
+        ):
+            input = prompt
+            output = self._get_text_completion_content_for_langfuse(response_obj)
+        elif response_obj is not None and isinstance(
+            response_obj, litellm.ImageResponse
+        ):
+            input = prompt
+            output = response_obj.get("data", None)
+        elif response_obj is not None and isinstance(
+            response_obj, litellm.TranscriptionResponse
+        ):
+            input = prompt
+            output = response_obj.get("text", None)
+        elif response_obj is not None and isinstance(
+            response_obj, litellm.RerankResponse
+        ):
+            input = prompt
+            output = response_obj.results
+        elif (
+            kwargs.get("call_type") is not None
+            and kwargs.get("call_type") == "_arealtime"
+            and response_obj is not None
+            and isinstance(response_obj, list)
+        ):
+            input = kwargs.get("input")
+            output = response_obj
+        elif (
+            kwargs.get("call_type") is not None
+            and kwargs.get("call_type") == "pass_through_endpoint"
+            and response_obj is not None
+            and isinstance(response_obj, dict)
+        ):
+            input = prompt
+            output = response_obj.get("response", "")
+        return input, output
+
+    async def _async_log_event(
+        self, kwargs, response_obj, start_time, end_time, user_id
+    ):
+        """
+        Langfuse SDK uses a background thread to log events
+
+        This approach does not impact latency and runs in the background
+        """
+
+    def _is_langfuse_v2(self):
+        import langfuse
+
+        return Version(langfuse.version.__version__) >= Version("2.0.0")
+
+    def _log_langfuse_v1(
+        self,
+        user_id,
+        metadata,
+        output,
+        start_time,
+        end_time,
+        kwargs,
+        optional_params,
+        input,
+        response_obj,
+    ):
+        from langfuse.model import CreateGeneration, CreateTrace  # type: ignore
+
+        verbose_logger.warning(
+            "Please upgrade langfuse to v2.0.0 or higher: https://github.com/langfuse/langfuse-python/releases/tag/v2.0.1"
+        )
+
+        trace = self.Langfuse.trace(  # type: ignore
+            CreateTrace(  # type: ignore
+                name=metadata.get("generation_name", "litellm-completion"),
+                input=input,
+                output=output,
+                userId=user_id,
+            )
+        )
+
+        trace.generation(
+            CreateGeneration(
+                name=metadata.get("generation_name", "litellm-completion"),
+                startTime=start_time,
+                endTime=end_time,
+                model=kwargs["model"],
+                modelParameters=optional_params,
+                prompt=input,
+                completion=output,
+                usage={
+                    "prompt_tokens": response_obj.usage.prompt_tokens,
+                    "completion_tokens": response_obj.usage.completion_tokens,
+                },
+                metadata=metadata,
+            )
+        )
+
+    def _log_langfuse_v2(  # noqa: PLR0915
+        self,
+        user_id: Optional[str],
+        metadata: dict,
+        litellm_params: dict,
+        output: Optional[Union[str, dict, list]],
+        start_time: Optional[datetime],
+        end_time: Optional[datetime],
+        kwargs: dict,
+        optional_params: dict,
+        input: Optional[dict],
+        response_obj,
+        level: str,
+        litellm_call_id: Optional[str],
+    ) -> tuple:
+        verbose_logger.debug("Langfuse Layer Logging - logging to langfuse v2")
+
+        try:
+            metadata = metadata or {}
+            standard_logging_object: Optional[StandardLoggingPayload] = cast(
+                Optional[StandardLoggingPayload],
+                kwargs.get("standard_logging_object", None),
+            )
+            tags = (
+                self._get_langfuse_tags(standard_logging_object=standard_logging_object)
+                if self._supports_tags()
+                else []
+            )
+
+            if standard_logging_object is None:
+                end_user_id = None
+                prompt_management_metadata: Optional[
+                    StandardLoggingPromptManagementMetadata
+                ] = None
+            else:
+                end_user_id = standard_logging_object["metadata"].get(
+                    "user_api_key_end_user_id", None
+                )
+
+                prompt_management_metadata = cast(
+                    Optional[StandardLoggingPromptManagementMetadata],
+                    standard_logging_object["metadata"].get(
+                        "prompt_management_metadata", None
+                    ),
+                )
+
+            # Clean Metadata before logging - never log raw metadata
+            # the raw metadata can contain circular references which leads to infinite recursion
+            # we clean out all extra litellm metadata params before logging
+            clean_metadata: Dict[str, Any] = {}
+            if prompt_management_metadata is not None:
+                clean_metadata["prompt_management_metadata"] = (
+                    prompt_management_metadata
+                )
+            if isinstance(metadata, dict):
+                for key, value in metadata.items():
+                    # generate langfuse tags - Default Tags sent to Langfuse from LiteLLM Proxy
+                    if (
+                        litellm.langfuse_default_tags is not None
+                        and isinstance(litellm.langfuse_default_tags, list)
+                        and key in litellm.langfuse_default_tags
+                    ):
+                        tags.append(f"{key}:{value}")
+
+                    # clean litellm metadata before logging
+                    if key in [
+                        "headers",
+                        "endpoint",
+                        "caching_groups",
+                        "previous_models",
+                    ]:
+                        continue
+                    else:
+                        clean_metadata[key] = value
+
+            # Add default langfuse tags
+            tags = self.add_default_langfuse_tags(
+                tags=tags, kwargs=kwargs, metadata=metadata
+            )
+
+            session_id = clean_metadata.pop("session_id", None)
+            trace_name = cast(Optional[str], clean_metadata.pop("trace_name", None))
+            trace_id = clean_metadata.pop("trace_id", litellm_call_id)
+            existing_trace_id = clean_metadata.pop("existing_trace_id", None)
+            update_trace_keys = cast(list, clean_metadata.pop("update_trace_keys", []))
+            debug = clean_metadata.pop("debug_langfuse", None)
+            mask_input = clean_metadata.pop("mask_input", False)
+            mask_output = clean_metadata.pop("mask_output", False)
+
+            clean_metadata = redact_user_api_key_info(metadata=clean_metadata)
+
+            if trace_name is None and existing_trace_id is None:
+                # just log `litellm-{call_type}` as the trace name
+                ## DO NOT SET TRACE_NAME if trace-id set. this can lead to overwriting of past traces.
+                trace_name = f"litellm-{kwargs.get('call_type', 'completion')}"
+
+            if existing_trace_id is not None:
+                trace_params: Dict[str, Any] = {"id": existing_trace_id}
+
+                # Update the following keys for this trace
+                for metadata_param_key in update_trace_keys:
+                    trace_param_key = metadata_param_key.replace("trace_", "")
+                    if trace_param_key not in trace_params:
+                        updated_trace_value = clean_metadata.pop(
+                            metadata_param_key, None
+                        )
+                        if updated_trace_value is not None:
+                            trace_params[trace_param_key] = updated_trace_value
+
+                # Pop the trace specific keys that would have been popped if there were a new trace
+                for key in list(
+                    filter(lambda key: key.startswith("trace_"), clean_metadata.keys())
+                ):
+                    clean_metadata.pop(key, None)
+
+                # Special keys that are found in the function arguments and not the metadata
+                if "input" in update_trace_keys:
+                    trace_params["input"] = (
+                        input if not mask_input else "redacted-by-litellm"
+                    )
+                if "output" in update_trace_keys:
+                    trace_params["output"] = (
+                        output if not mask_output else "redacted-by-litellm"
+                    )
+            else:  # don't overwrite an existing trace
+                trace_params = {
+                    "id": trace_id,
+                    "name": trace_name,
+                    "session_id": session_id,
+                    "input": input if not mask_input else "redacted-by-litellm",
+                    "version": clean_metadata.pop(
+                        "trace_version", clean_metadata.get("version", None)
+                    ),  # If provided just version, it will applied to the trace as well, if applied a trace version it will take precedence
+                    "user_id": end_user_id,
+                }
+                for key in list(
+                    filter(lambda key: key.startswith("trace_"), clean_metadata.keys())
+                ):
+                    trace_params[key.replace("trace_", "")] = clean_metadata.pop(
+                        key, None
+                    )
+
+                if level == "ERROR":
+                    trace_params["status_message"] = output
+                else:
+                    trace_params["output"] = (
+                        output if not mask_output else "redacted-by-litellm"
+                    )
+
+            if debug is True or (isinstance(debug, str) and debug.lower() == "true"):
+                if "metadata" in trace_params:
+                    # log the raw_metadata in the trace
+                    trace_params["metadata"]["metadata_passed_to_litellm"] = metadata
+                else:
+                    trace_params["metadata"] = {"metadata_passed_to_litellm": metadata}
+
+            cost = kwargs.get("response_cost", None)
+            verbose_logger.debug(f"trace: {cost}")
+
+            clean_metadata["litellm_response_cost"] = cost
+            if standard_logging_object is not None:
+                clean_metadata["hidden_params"] = standard_logging_object[
+                    "hidden_params"
+                ]
+
+            if (
+                litellm.langfuse_default_tags is not None
+                and isinstance(litellm.langfuse_default_tags, list)
+                and "proxy_base_url" in litellm.langfuse_default_tags
+            ):
+                proxy_base_url = os.environ.get("PROXY_BASE_URL", None)
+                if proxy_base_url is not None:
+                    tags.append(f"proxy_base_url:{proxy_base_url}")
+
+            api_base = litellm_params.get("api_base", None)
+            if api_base:
+                clean_metadata["api_base"] = api_base
+
+            vertex_location = kwargs.get("vertex_location", None)
+            if vertex_location:
+                clean_metadata["vertex_location"] = vertex_location
+
+            aws_region_name = kwargs.get("aws_region_name", None)
+            if aws_region_name:
+                clean_metadata["aws_region_name"] = aws_region_name
+
+            if self._supports_tags():
+                if "cache_hit" in kwargs:
+                    if kwargs["cache_hit"] is None:
+                        kwargs["cache_hit"] = False
+                    clean_metadata["cache_hit"] = kwargs["cache_hit"]
+                if existing_trace_id is None:
+                    trace_params.update({"tags": tags})
+
+            proxy_server_request = litellm_params.get("proxy_server_request", None)
+            if proxy_server_request:
+                proxy_server_request.get("method", None)
+                proxy_server_request.get("url", None)
+                headers = proxy_server_request.get("headers", None)
+                clean_headers = {}
+                if headers:
+                    for key, value in headers.items():
+                        # these headers can leak our API keys and/or JWT tokens
+                        if key.lower() not in ["authorization", "cookie", "referer"]:
+                            clean_headers[key] = value
+
+                # clean_metadata["request"] = {
+                #     "method": method,
+                #     "url": url,
+                #     "headers": clean_headers,
+                # }
+            trace = self.Langfuse.trace(**trace_params)
+
+            # Log provider specific information as a span
+            log_provider_specific_information_as_span(trace, clean_metadata)
+
+            generation_id = None
+            usage = None
+            if response_obj is not None:
+                if (
+                    hasattr(response_obj, "id")
+                    and response_obj.get("id", None) is not None
+                ):
+                    generation_id = litellm.utils.get_logging_id(
+                        start_time, response_obj
+                    )
+                _usage_obj = getattr(response_obj, "usage", None)
+
+                if _usage_obj:
+                    usage = {
+                        "prompt_tokens": _usage_obj.prompt_tokens,
+                        "completion_tokens": _usage_obj.completion_tokens,
+                        "total_cost": cost if self._supports_costs() else None,
+                    }
+            generation_name = clean_metadata.pop("generation_name", None)
+            if generation_name is None:
+                # if `generation_name` is None, use sensible default values
+                # If using litellm proxy user `key_alias` if not None
+                # If `key_alias` is None, just log `litellm-{call_type}` as the generation name
+                _user_api_key_alias = cast(
+                    Optional[str], clean_metadata.get("user_api_key_alias", None)
+                )
+                generation_name = (
+                    f"litellm-{cast(str, kwargs.get('call_type', 'completion'))}"
+                )
+                if _user_api_key_alias is not None:
+                    generation_name = f"litellm:{_user_api_key_alias}"
+
+            if response_obj is not None:
+                system_fingerprint = getattr(response_obj, "system_fingerprint", None)
+            else:
+                system_fingerprint = None
+
+            if system_fingerprint is not None:
+                optional_params["system_fingerprint"] = system_fingerprint
+
+            generation_params = {
+                "name": generation_name,
+                "id": clean_metadata.pop("generation_id", generation_id),
+                "start_time": start_time,
+                "end_time": end_time,
+                "model": kwargs["model"],
+                "model_parameters": optional_params,
+                "input": input if not mask_input else "redacted-by-litellm",
+                "output": output if not mask_output else "redacted-by-litellm",
+                "usage": usage,
+                "metadata": log_requester_metadata(clean_metadata),
+                "level": level,
+                "version": clean_metadata.pop("version", None),
+            }
+
+            parent_observation_id = metadata.get("parent_observation_id", None)
+            if parent_observation_id is not None:
+                generation_params["parent_observation_id"] = parent_observation_id
+
+            if self._supports_prompt():
+                generation_params = _add_prompt_to_generation_params(
+                    generation_params=generation_params,
+                    clean_metadata=clean_metadata,
+                    prompt_management_metadata=prompt_management_metadata,
+                    langfuse_client=self.Langfuse,
+                )
+            if output is not None and isinstance(output, str) and level == "ERROR":
+                generation_params["status_message"] = output
+
+            if self._supports_completion_start_time():
+                generation_params["completion_start_time"] = kwargs.get(
+                    "completion_start_time", None
+                )
+
+            generation_client = trace.generation(**generation_params)
+
+            return generation_client.trace_id, generation_id
+        except Exception:
+            verbose_logger.error(f"Langfuse Layer Error - {traceback.format_exc()}")
+            return None, None
+
+    @staticmethod
+    def _get_chat_content_for_langfuse(
+        response_obj: ModelResponse,
+    ):
+        """
+        Get the chat content for Langfuse logging
+        """
+        if response_obj.choices and len(response_obj.choices) > 0:
+            output = response_obj["choices"][0]["message"].json()
+            return output
+        else:
+            return None
+
+    @staticmethod
+    def _get_text_completion_content_for_langfuse(
+        response_obj: TextCompletionResponse,
+    ):
+        """
+        Get the text completion content for Langfuse logging
+        """
+        if response_obj.choices and len(response_obj.choices) > 0:
+            return response_obj.choices[0].text
+        else:
+            return None
+
+    @staticmethod
+    def _get_langfuse_tags(
+        standard_logging_object: Optional[StandardLoggingPayload],
+    ) -> List[str]:
+        if standard_logging_object is None:
+            return []
+        return standard_logging_object.get("request_tags", []) or []
+
+    def add_default_langfuse_tags(self, tags, kwargs, metadata):
+        """
+        Helper function to add litellm default langfuse tags
+
+        - Special LiteLLM tags:
+            - cache_hit
+            - cache_key
+
+        """
+        if litellm.langfuse_default_tags is not None and isinstance(
+            litellm.langfuse_default_tags, list
+        ):
+            if "cache_hit" in litellm.langfuse_default_tags:
+                _cache_hit_value = kwargs.get("cache_hit", False)
+                tags.append(f"cache_hit:{_cache_hit_value}")
+            if "cache_key" in litellm.langfuse_default_tags:
+                _hidden_params = metadata.get("hidden_params", {}) or {}
+                _cache_key = _hidden_params.get("cache_key", None)
+                if _cache_key is None and litellm.cache is not None:
+                    # fallback to using "preset_cache_key"
+                    _preset_cache_key = litellm.cache._get_preset_cache_key_from_kwargs(
+                        **kwargs
+                    )
+                    _cache_key = _preset_cache_key
+                tags.append(f"cache_key:{_cache_key}")
+        return tags
+
+    def _supports_tags(self):
+        """Check if current langfuse version supports tags"""
+        return Version(self.langfuse_sdk_version) >= Version("2.6.3")
+
+    def _supports_prompt(self):
+        """Check if current langfuse version supports prompt"""
+        return Version(self.langfuse_sdk_version) >= Version("2.7.3")
+
+    def _supports_costs(self):
+        """Check if current langfuse version supports costs"""
+        return Version(self.langfuse_sdk_version) >= Version("2.7.3")
+
+    def _supports_completion_start_time(self):
+        """Check if current langfuse version supports completion start time"""
+        return Version(self.langfuse_sdk_version) >= Version("2.7.3")
+
+    @staticmethod
+    def _get_langfuse_flush_interval(flush_interval: int) -> int:
+        """
+        Get the langfuse flush interval to initialize the Langfuse client
+
+        Reads `LANGFUSE_FLUSH_INTERVAL` from the environment variable.
+        If not set, uses the flush interval passed in as an argument.
+
+        Args:
+            flush_interval: The flush interval to use if LANGFUSE_FLUSH_INTERVAL is not set
+
+        Returns:
+            [int] The flush interval to use to initialize the Langfuse client
+        """
+        return int(os.getenv("LANGFUSE_FLUSH_INTERVAL") or flush_interval)
+
+
+def _add_prompt_to_generation_params(
+    generation_params: dict,
+    clean_metadata: dict,
+    prompt_management_metadata: Optional[StandardLoggingPromptManagementMetadata],
+    langfuse_client: Any,
+) -> dict:
+    from langfuse import Langfuse
+    from langfuse.model import (
+        ChatPromptClient,
+        Prompt_Chat,
+        Prompt_Text,
+        TextPromptClient,
+    )
+
+    langfuse_client = cast(Langfuse, langfuse_client)
+
+    user_prompt = clean_metadata.pop("prompt", None)
+    if user_prompt is None and prompt_management_metadata is None:
+        pass
+    elif isinstance(user_prompt, dict):
+        if user_prompt.get("type", "") == "chat":
+            _prompt_chat = Prompt_Chat(**user_prompt)
+            generation_params["prompt"] = ChatPromptClient(prompt=_prompt_chat)
+        elif user_prompt.get("type", "") == "text":
+            _prompt_text = Prompt_Text(**user_prompt)
+            generation_params["prompt"] = TextPromptClient(prompt=_prompt_text)
+        elif "version" in user_prompt and "prompt" in user_prompt:
+            # prompts
+            if isinstance(user_prompt["prompt"], str):
+                prompt_text_params = getattr(
+                    Prompt_Text, "model_fields", Prompt_Text.__fields__
+                )
+                _data = {
+                    "name": user_prompt["name"],
+                    "prompt": user_prompt["prompt"],
+                    "version": user_prompt["version"],
+                    "config": user_prompt.get("config", None),
+                }
+                if "labels" in prompt_text_params and "tags" in prompt_text_params:
+                    _data["labels"] = user_prompt.get("labels", []) or []
+                    _data["tags"] = user_prompt.get("tags", []) or []
+                _prompt_obj = Prompt_Text(**_data)  # type: ignore
+                generation_params["prompt"] = TextPromptClient(prompt=_prompt_obj)
+
+            elif isinstance(user_prompt["prompt"], list):
+                prompt_chat_params = getattr(
+                    Prompt_Chat, "model_fields", Prompt_Chat.__fields__
+                )
+                _data = {
+                    "name": user_prompt["name"],
+                    "prompt": user_prompt["prompt"],
+                    "version": user_prompt["version"],
+                    "config": user_prompt.get("config", None),
+                }
+                if "labels" in prompt_chat_params and "tags" in prompt_chat_params:
+                    _data["labels"] = user_prompt.get("labels", []) or []
+                    _data["tags"] = user_prompt.get("tags", []) or []
+
+                _prompt_obj = Prompt_Chat(**_data)  # type: ignore
+
+                generation_params["prompt"] = ChatPromptClient(prompt=_prompt_obj)
+            else:
+                verbose_logger.error(
+                    "[Non-blocking] Langfuse Logger: Invalid prompt format"
+                )
+        else:
+            verbose_logger.error(
+                "[Non-blocking] Langfuse Logger: Invalid prompt format. No prompt logged to Langfuse"
+            )
+    elif (
+        prompt_management_metadata is not None
+        and prompt_management_metadata["prompt_integration"] == "langfuse"
+    ):
+        try:
+            generation_params["prompt"] = langfuse_client.get_prompt(
+                prompt_management_metadata["prompt_id"]
+            )
+        except Exception as e:
+            verbose_logger.debug(
+                f"[Non-blocking] Langfuse Logger: Error getting prompt client for logging: {e}"
+            )
+            pass
+
+    else:
+        generation_params["prompt"] = user_prompt
+
+    return generation_params
+
+
+def log_provider_specific_information_as_span(
+    trace,
+    clean_metadata,
+):
+    """
+    Logs provider-specific information as spans.
+
+    Parameters:
+        trace: The tracing object used to log spans.
+        clean_metadata: A dictionary containing metadata to be logged.
+
+    Returns:
+        None
+    """
+
+    _hidden_params = clean_metadata.get("hidden_params", None)
+    if _hidden_params is None:
+        return
+
+    vertex_ai_grounding_metadata = _hidden_params.get(
+        "vertex_ai_grounding_metadata", None
+    )
+
+    if vertex_ai_grounding_metadata is not None:
+        if isinstance(vertex_ai_grounding_metadata, list):
+            for elem in vertex_ai_grounding_metadata:
+                if isinstance(elem, dict):
+                    for key, value in elem.items():
+                        trace.span(
+                            name=key,
+                            input=value,
+                        )
+                else:
+                    trace.span(
+                        name="vertex_ai_grounding_metadata",
+                        input=elem,
+                    )
+        else:
+            trace.span(
+                name="vertex_ai_grounding_metadata",
+                input=vertex_ai_grounding_metadata,
+            )
+
+
+def log_requester_metadata(clean_metadata: dict):
+    returned_metadata = {}
+    requester_metadata = clean_metadata.get("requester_metadata") or {}
+    for k, v in clean_metadata.items():
+        if k not in requester_metadata:
+            returned_metadata[k] = v
+
+    returned_metadata.update({"requester_metadata": requester_metadata})
+
+    return returned_metadata
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_handler.py b/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_handler.py
new file mode 100644
index 00000000..aebe1461
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_handler.py
@@ -0,0 +1,169 @@
+"""
+This file contains the LangFuseHandler class
+
+Used to get the LangFuseLogger for a given request
+
+Handles Key/Team Based Langfuse Logging
+"""
+
+from typing import TYPE_CHECKING, Any, Dict, Optional
+
+from litellm.litellm_core_utils.litellm_logging import StandardCallbackDynamicParams
+
+from .langfuse import LangFuseLogger, LangfuseLoggingConfig
+
+if TYPE_CHECKING:
+    from litellm.litellm_core_utils.litellm_logging import DynamicLoggingCache
+else:
+    DynamicLoggingCache = Any
+
+
+class LangFuseHandler:
+
+    @staticmethod
+    def get_langfuse_logger_for_request(
+        standard_callback_dynamic_params: StandardCallbackDynamicParams,
+        in_memory_dynamic_logger_cache: DynamicLoggingCache,
+        globalLangfuseLogger: Optional[LangFuseLogger] = None,
+    ) -> LangFuseLogger:
+        """
+        This function is used to get the LangFuseLogger for a given request
+
+        1. If dynamic credentials are passed
+            - check if a LangFuseLogger is cached for the dynamic credentials
+            - if cached LangFuseLogger is not found, create a new LangFuseLogger and cache it
+
+        2. If dynamic credentials are not passed return the globalLangfuseLogger
+
+        """
+        temp_langfuse_logger: Optional[LangFuseLogger] = globalLangfuseLogger
+        if (
+            LangFuseHandler._dynamic_langfuse_credentials_are_passed(
+                standard_callback_dynamic_params
+            )
+            is False
+        ):
+            return LangFuseHandler._return_global_langfuse_logger(
+                globalLangfuseLogger=globalLangfuseLogger,
+                in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
+            )
+
+        # get langfuse logging config to use for this request, based on standard_callback_dynamic_params
+        _credentials = LangFuseHandler.get_dynamic_langfuse_logging_config(
+            globalLangfuseLogger=globalLangfuseLogger,
+            standard_callback_dynamic_params=standard_callback_dynamic_params,
+        )
+        credentials_dict = dict(_credentials)
+
+        # check if langfuse logger is already cached
+        temp_langfuse_logger = in_memory_dynamic_logger_cache.get_cache(
+            credentials=credentials_dict, service_name="langfuse"
+        )
+
+        # if not cached, create a new langfuse logger and cache it
+        if temp_langfuse_logger is None:
+            temp_langfuse_logger = (
+                LangFuseHandler._create_langfuse_logger_from_credentials(
+                    credentials=credentials_dict,
+                    in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
+                )
+            )
+
+        return temp_langfuse_logger
+
+    @staticmethod
+    def _return_global_langfuse_logger(
+        globalLangfuseLogger: Optional[LangFuseLogger],
+        in_memory_dynamic_logger_cache: DynamicLoggingCache,
+    ) -> LangFuseLogger:
+        """
+        Returns the Global LangfuseLogger set on litellm
+
+        (this is the default langfuse logger - used when no dynamic credentials are passed)
+
+        If no Global LangfuseLogger is set, it will check in_memory_dynamic_logger_cache for a cached LangFuseLogger
+        This function is used to return the globalLangfuseLogger if it exists, otherwise it will check in_memory_dynamic_logger_cache for a cached LangFuseLogger
+        """
+        if globalLangfuseLogger is not None:
+            return globalLangfuseLogger
+
+        credentials_dict: Dict[str, Any] = (
+            {}
+        )  # the global langfuse logger uses Environment Variables, there are no dynamic credentials
+        globalLangfuseLogger = in_memory_dynamic_logger_cache.get_cache(
+            credentials=credentials_dict,
+            service_name="langfuse",
+        )
+        if globalLangfuseLogger is None:
+            globalLangfuseLogger = (
+                LangFuseHandler._create_langfuse_logger_from_credentials(
+                    credentials=credentials_dict,
+                    in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
+                )
+            )
+        return globalLangfuseLogger
+
+    @staticmethod
+    def _create_langfuse_logger_from_credentials(
+        credentials: Dict,
+        in_memory_dynamic_logger_cache: DynamicLoggingCache,
+    ) -> LangFuseLogger:
+        """
+        This function is used to
+        1. create a LangFuseLogger from the credentials
+        2. cache the LangFuseLogger to prevent re-creating it for the same credentials
+        """
+
+        langfuse_logger = LangFuseLogger(
+            langfuse_public_key=credentials.get("langfuse_public_key"),
+            langfuse_secret=credentials.get("langfuse_secret"),
+            langfuse_host=credentials.get("langfuse_host"),
+        )
+        in_memory_dynamic_logger_cache.set_cache(
+            credentials=credentials,
+            service_name="langfuse",
+            logging_obj=langfuse_logger,
+        )
+        return langfuse_logger
+
+    @staticmethod
+    def get_dynamic_langfuse_logging_config(
+        standard_callback_dynamic_params: StandardCallbackDynamicParams,
+        globalLangfuseLogger: Optional[LangFuseLogger] = None,
+    ) -> LangfuseLoggingConfig:
+        """
+        This function is used to get the Langfuse logging config to use for a given request.
+
+        It checks if the dynamic parameters are provided in the standard_callback_dynamic_params and uses them to get the Langfuse logging config.
+
+        If no dynamic parameters are provided, it uses the `globalLangfuseLogger` values
+        """
+        # only use dynamic params if langfuse credentials are passed dynamically
+        return LangfuseLoggingConfig(
+            langfuse_secret=standard_callback_dynamic_params.get("langfuse_secret")
+            or standard_callback_dynamic_params.get("langfuse_secret_key"),
+            langfuse_public_key=standard_callback_dynamic_params.get(
+                "langfuse_public_key"
+            ),
+            langfuse_host=standard_callback_dynamic_params.get("langfuse_host"),
+        )
+
+    @staticmethod
+    def _dynamic_langfuse_credentials_are_passed(
+        standard_callback_dynamic_params: StandardCallbackDynamicParams,
+    ) -> bool:
+        """
+        This function is used to check if the dynamic langfuse credentials are passed in standard_callback_dynamic_params
+
+        Returns:
+            bool: True if the dynamic langfuse credentials are passed, False otherwise
+        """
+
+        if (
+            standard_callback_dynamic_params.get("langfuse_host") is not None
+            or standard_callback_dynamic_params.get("langfuse_public_key") is not None
+            or standard_callback_dynamic_params.get("langfuse_secret") is not None
+            or standard_callback_dynamic_params.get("langfuse_secret_key") is not None
+        ):
+            return True
+        return False
diff --git a/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py b/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py
new file mode 100644
index 00000000..1f4ca84d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/integrations/langfuse/langfuse_prompt_management.py
@@ -0,0 +1,287 @@
+"""
+Call Hook for LiteLLM Proxy which allows Langfuse prompt management.
+"""
+
+import os
+from functools import lru_cache
+from typing import TYPE_CHECKING, Any, List, Literal, Optional, Tuple, Union, cast
+
+from packaging.version import Version
+from typing_extensions import TypeAlias
+
+from litellm.integrations.custom_logger import CustomLogger
+from litellm.integrations.prompt_management_base import PromptManagementClient
+from litellm.litellm_core_utils.asyncify import run_async_function
+from litellm.types.llms.openai import AllMessageValues, ChatCompletionSystemMessage
+from litellm.types.utils import StandardCallbackDynamicParams, StandardLoggingPayload
+
+from ...litellm_core_utils.specialty_caches.dynamic_logging_cache import (
+    DynamicLoggingCache,
+)
+from ..prompt_management_base import PromptManagementBase
+from .langfuse import LangFuseLogger
+from .langfuse_handler import LangFuseHandler
+
+if TYPE_CHECKING:
+    from langfuse import Langfuse
+    from langfuse.client import ChatPromptClient, TextPromptClient
+
+    LangfuseClass: TypeAlias = Langfuse
+
+    PROMPT_CLIENT = Union[TextPromptClient, ChatPromptClient]
+else:
+    PROMPT_CLIENT = Any
+    LangfuseClass = Any
+
+in_memory_dynamic_logger_cache = DynamicLoggingCache()
+
+
+@lru_cache(maxsize=10)
+def langfuse_client_init(
+    langfuse_public_key=None,
+    langfuse_secret=None,
+    langfuse_secret_key=None,
+    langfuse_host=None,
+    flush_interval=1,
+) -> LangfuseClass:
+    """
+    Initialize Langfuse client with caching to prevent multiple initializations.
+
+    Args:
+        langfuse_public_key (str, optional): Public key for Langfuse. Defaults to None.
+        langfuse_secret (str, optional): Secret key for Langfuse. Defaults to None.
+        langfuse_host (str, optional): Host URL for Langfuse. Defaults to None.
+        flush_interval (int, optional): Flush interval in seconds. Defaults to 1.
+
+    Returns:
+        Langfuse: Initialized Langfuse client instance
+
+    Raises:
+        Exception: If langfuse package is not installed
+    """
+    try:
+        import langfuse
+        from langfuse import Langfuse
+    except Exception as e:
+        raise Exception(
+            f"\033[91mLangfuse not installed, try running 'pip install langfuse' to fix this error: {e}\n\033[0m"
+        )
+
+    # Instance variables
+
+    secret_key = (
+        langfuse_secret or langfuse_secret_key or os.getenv("LANGFUSE_SECRET_KEY")
+    )
+    public_key = langfuse_public_key or os.getenv("LANGFUSE_PUBLIC_KEY")
+    langfuse_host = langfuse_host or os.getenv(
+        "LANGFUSE_HOST", "https://cloud.langfuse.com"
+    )
+
+    if not (
+        langfuse_host.startswith("http://") or langfuse_host.startswith("https://")
+    ):
+        # add http:// if unset, assume communicating over private network - e.g. render
+        langfuse_host = "http://" + langfuse_host
+
+    langfuse_release = os.getenv("LANGFUSE_RELEASE")
+    langfuse_debug = os.getenv("LANGFUSE_DEBUG")
+
+    parameters = {
+        "public_key": public_key,
+        "secret_key": secret_key,
+        "host": langfuse_host,
+        "release": langfuse_release,
+        "debug": langfuse_debug,
+        "flush_interval": LangFuseLogger._get_langfuse_flush_interval(
+            flush_interval
+        ),  # flush interval in seconds
+    }
+
+    if Version(langfuse.version.__version__) >= Version("2.6.0"):
+        parameters["sdk_integration"] = "litellm"
+
+    client = Langfuse(**parameters)
+
+    return client
+
+
+class LangfusePromptManagement(LangFuseLogger, PromptManagementBase, CustomLogger):
+    def __init__(
+        self,
+        langfuse_public_key=None,
+        langfuse_secret=None,
+        langfuse_host=None,
+        flush_interval=1,
+    ):
+        import langfuse
+
+        self.langfuse_sdk_version = langfuse.version.__version__
+        self.Langfuse = langfuse_client_init(
+            langfuse_public_key=langfuse_public_key,
+            langfuse_secret=langfuse_secret,
+            langfuse_host=langfuse_host,
+            flush_interval=flush_interval,
+        )
+
+    @property
+    def integration_name(self):
+        return "langfuse"
+
+    def _get_prompt_from_id(
+        self, langfuse_prompt_id: str, langfuse_client: LangfuseClass
+    ) -> PROMPT_CLIENT:
+        return langfuse_client.get_prompt(langfuse_prompt_id)
+
+    def _compile_prompt(
+        self,
+        langfuse_prompt_client: PROMPT_CLIENT,
+        langfuse_prompt_variables: Optional[dict],
+        call_type: Union[Literal["completion"], Literal["text_completion"]],
+    ) -> List[AllMessageValues]:
+        compiled_prompt: Optional[Union[str, list]] = None
+
+        if langfuse_prompt_variables is None:
+            langfuse_prompt_variables = {}
+
+        compiled_prompt = langfuse_prompt_client.compile(**langfuse_prompt_variables)
+
+        if isinstance(compiled_prompt, str):
+            compiled_prompt = [
+                ChatCompletionSystemMessage(role="system", content=compiled_prompt)
+            ]
+        else:
+            compiled_prompt = cast(List[AllMessageValues], compiled_prompt)
+
+        return compiled_prompt
+
+    def _get_optional_params_from_langfuse(
+        self, langfuse_prompt_client: PROMPT_CLIENT
+    ) -> dict:
+        config = langfuse_prompt_client.config
+        optional_params = {}
+        for k, v in config.items():
+            if k != "model":
+                optional_params[k] = v
+        return optional_params
+
+    async def async_get_chat_completion_prompt(
+        self,
+        model: str,
+        messages: List[AllMessageValues],
+        non_default_params: dict,
+        prompt_id: str,
+        prompt_variables: Optional[dict],
+        dynamic_callback_params: StandardCallbackDynamicParams,
+    ) -> Tuple[
+        str,
+        List[AllMessageValues],
+        dict,
+    ]:
+        return self.get_chat_completion_prompt(
+            model,
+            messages,
+            non_default_params,
+            prompt_id,
+            prompt_variables,
+            dynamic_callback_params,
+        )
+
+    def should_run_prompt_management(
+        self,
+        prompt_id: str,
+        dynamic_callback_params: StandardCallbackDynamicParams,
+    ) -> bool:
+        langfuse_client = langfuse_client_init(
+            langfuse_public_key=dynamic_callback_params.get("langfuse_public_key"),
+            langfuse_secret=dynamic_callback_params.get("langfuse_secret"),
+            langfuse_secret_key=dynamic_callback_params.get("langfuse_secret_key"),
+            langfuse_host=dynamic_callback_params.get("langfuse_host"),
+        )
+        langfuse_prompt_client = self._get_prompt_from_id(
+            langfuse_prompt_id=prompt_id, langfuse_client=langfuse_client
+        )
+        return langfuse_prompt_client is not None
+
+    def _compile_prompt_helper(
+        self,
+        prompt_id: str,
+        prompt_variables: Optional[dict],
+        dynamic_callback_params: StandardCallbackDynamicParams,
+    ) -> PromptManagementClient:
+        langfuse_client = langfuse_client_init(
+            langfuse_public_key=dynamic_callback_params.get("langfuse_public_key"),
+            langfuse_secret=dynamic_callback_params.get("langfuse_secret"),
+            langfuse_secret_key=dynamic_callback_params.get("langfuse_secret_key"),
+            langfuse_host=dynamic_callback_params.get("langfuse_host"),
+        )
+        langfuse_prompt_client = self._get_prompt_from_id(
+            langfuse_prompt_id=prompt_id, langfuse_client=langfuse_client
+        )
+
+        ## SET PROMPT
+        compiled_prompt = self._compile_prompt(
+            langfuse_prompt_client=langfuse_prompt_client,
+            langfuse_prompt_variables=prompt_variables,
+            call_type="completion",
+        )
+
+        template_model = langfuse_prompt_client.config.get("model")
+
+        template_optional_params = self._get_optional_params_from_langfuse(
+            langfuse_prompt_client
+        )
+
+        return PromptManagementClient(
+            prompt_id=prompt_id,
+            prompt_template=compiled_prompt,
+            prompt_template_model=template_model,
+            prompt_template_optional_params=template_optional_params,
+            completed_messages=None,
+        )
+
+    def log_success_event(self, kwargs, response_obj, start_time, end_time):
+        return run_async_function(
+            self.async_log_success_event, kwargs, response_obj, start_time, end_time
+        )
+
+    async def async_log_success_event(self, kwargs, response_obj, start_time, end_time):
+        standard_callback_dynamic_params = kwargs.get(
+            "standard_callback_dynamic_params"
+        )
+        langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request(
+            globalLangfuseLogger=self,
+            standard_callback_dynamic_params=standard_callback_dynamic_params,
+            in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
+        )
+        langfuse_logger_to_use.log_event_on_langfuse(
+            kwargs=kwargs,
+            response_obj=response_obj,
+            start_time=start_time,
+            end_time=end_time,
+            user_id=kwargs.get("user", None),
+        )
+
+    async def async_log_failure_event(self, kwargs, response_obj, start_time, end_time):
+        standard_callback_dynamic_params = kwargs.get(
+            "standard_callback_dynamic_params"
+        )
+        langfuse_logger_to_use = LangFuseHandler.get_langfuse_logger_for_request(
+            globalLangfuseLogger=self,
+            standard_callback_dynamic_params=standard_callback_dynamic_params,
+            in_memory_dynamic_logger_cache=in_memory_dynamic_logger_cache,
+        )
+        standard_logging_object = cast(
+            Optional[StandardLoggingPayload],
+            kwargs.get("standard_logging_object", None),
+        )
+        if standard_logging_object is None:
+            return
+        langfuse_logger_to_use.log_event_on_langfuse(
+            start_time=start_time,
+            end_time=end_time,
+            response_obj=None,
+            user_id=kwargs.get("user", None),
+            status_message=standard_logging_object["error_str"],
+            level="ERROR",
+            kwargs=kwargs,
+        )