about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py556
1 files changed, 556 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py b/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py
new file mode 100644
index 00000000..4724c7f9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py
@@ -0,0 +1,556 @@
+"""
+What is this? 
+
+Provider-specific Pass-Through Endpoints
+
+Use litellm with Anthropic SDK, Vertex AI SDK, Cohere SDK, etc.
+"""
+
+from typing import Optional
+
+import httpx
+from fastapi import APIRouter, Depends, HTTPException, Request, Response
+
+import litellm
+from litellm.constants import BEDROCK_AGENT_RUNTIME_PASS_THROUGH_ROUTES
+from litellm.proxy._types import *
+from litellm.proxy.auth.route_checks import RouteChecks
+from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
+from litellm.proxy.pass_through_endpoints.pass_through_endpoints import (
+    create_pass_through_route,
+)
+from litellm.secret_managers.main import get_secret_str
+
+from .passthrough_endpoint_router import PassthroughEndpointRouter
+
+router = APIRouter()
+default_vertex_config = None
+
+passthrough_endpoint_router = PassthroughEndpointRouter()
+
+
+def create_request_copy(request: Request):
+    return {
+        "method": request.method,
+        "url": str(request.url),
+        "headers": dict(request.headers),
+        "cookies": request.cookies,
+        "query_params": dict(request.query_params),
+    }
+
+
+@router.api_route(
+    "/gemini/{endpoint:path}",
+    methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+    tags=["Google AI Studio Pass-through", "pass-through"],
+)
+async def gemini_proxy_route(
+    endpoint: str,
+    request: Request,
+    fastapi_response: Response,
+):
+    """
+    [Docs](https://docs.litellm.ai/docs/pass_through/google_ai_studio)
+    """
+    ## CHECK FOR LITELLM API KEY IN THE QUERY PARAMS - ?..key=LITELLM_API_KEY
+    google_ai_studio_api_key = request.query_params.get("key") or request.headers.get(
+        "x-goog-api-key"
+    )
+
+    user_api_key_dict = await user_api_key_auth(
+        request=request, api_key=f"Bearer {google_ai_studio_api_key}"
+    )
+
+    base_target_url = "https://generativelanguage.googleapis.com"
+    encoded_endpoint = httpx.URL(endpoint).path
+
+    # Ensure endpoint starts with '/' for proper URL construction
+    if not encoded_endpoint.startswith("/"):
+        encoded_endpoint = "/" + encoded_endpoint
+
+    # Construct the full target URL using httpx
+    base_url = httpx.URL(base_target_url)
+    updated_url = base_url.copy_with(path=encoded_endpoint)
+
+    # Add or update query parameters
+    gemini_api_key: Optional[str] = passthrough_endpoint_router.get_credentials(
+        custom_llm_provider="gemini",
+        region_name=None,
+    )
+    if gemini_api_key is None:
+        raise Exception(
+            "Required 'GEMINI_API_KEY' in environment to make pass-through calls to Google AI Studio."
+        )
+    # Merge query parameters, giving precedence to those in updated_url
+    merged_params = dict(request.query_params)
+    merged_params.update({"key": gemini_api_key})
+
+    ## check for streaming
+    is_streaming_request = False
+    if "stream" in str(updated_url):
+        is_streaming_request = True
+
+    ## CREATE PASS-THROUGH
+    endpoint_func = create_pass_through_route(
+        endpoint=endpoint,
+        target=str(updated_url),
+    )  # dynamically construct pass-through endpoint based on incoming path
+    received_value = await endpoint_func(
+        request,
+        fastapi_response,
+        user_api_key_dict,
+        query_params=merged_params,  # type: ignore
+        stream=is_streaming_request,  # type: ignore
+    )
+
+    return received_value
+
+
+@router.api_route(
+    "/cohere/{endpoint:path}",
+    methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+    tags=["Cohere Pass-through", "pass-through"],
+)
+async def cohere_proxy_route(
+    endpoint: str,
+    request: Request,
+    fastapi_response: Response,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """
+    [Docs](https://docs.litellm.ai/docs/pass_through/cohere)
+    """
+    base_target_url = "https://api.cohere.com"
+    encoded_endpoint = httpx.URL(endpoint).path
+
+    # Ensure endpoint starts with '/' for proper URL construction
+    if not encoded_endpoint.startswith("/"):
+        encoded_endpoint = "/" + encoded_endpoint
+
+    # Construct the full target URL using httpx
+    base_url = httpx.URL(base_target_url)
+    updated_url = base_url.copy_with(path=encoded_endpoint)
+
+    # Add or update query parameters
+    cohere_api_key = passthrough_endpoint_router.get_credentials(
+        custom_llm_provider="cohere",
+        region_name=None,
+    )
+
+    ## check for streaming
+    is_streaming_request = False
+    if "stream" in str(updated_url):
+        is_streaming_request = True
+
+    ## CREATE PASS-THROUGH
+    endpoint_func = create_pass_through_route(
+        endpoint=endpoint,
+        target=str(updated_url),
+        custom_headers={"Authorization": "Bearer {}".format(cohere_api_key)},
+    )  # dynamically construct pass-through endpoint based on incoming path
+    received_value = await endpoint_func(
+        request,
+        fastapi_response,
+        user_api_key_dict,
+        stream=is_streaming_request,  # type: ignore
+    )
+
+    return received_value
+
+
+@router.api_route(
+    "/anthropic/{endpoint:path}",
+    methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+    tags=["Anthropic Pass-through", "pass-through"],
+)
+async def anthropic_proxy_route(
+    endpoint: str,
+    request: Request,
+    fastapi_response: Response,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """
+    [Docs](https://docs.litellm.ai/docs/anthropic_completion)
+    """
+    base_target_url = "https://api.anthropic.com"
+    encoded_endpoint = httpx.URL(endpoint).path
+
+    # Ensure endpoint starts with '/' for proper URL construction
+    if not encoded_endpoint.startswith("/"):
+        encoded_endpoint = "/" + encoded_endpoint
+
+    # Construct the full target URL using httpx
+    base_url = httpx.URL(base_target_url)
+    updated_url = base_url.copy_with(path=encoded_endpoint)
+
+    # Add or update query parameters
+    anthropic_api_key = passthrough_endpoint_router.get_credentials(
+        custom_llm_provider="anthropic",
+        region_name=None,
+    )
+
+    ## check for streaming
+    is_streaming_request = False
+    # anthropic is streaming when 'stream' = True is in the body
+    if request.method == "POST":
+        _request_body = await request.json()
+        if _request_body.get("stream"):
+            is_streaming_request = True
+
+    ## CREATE PASS-THROUGH
+    endpoint_func = create_pass_through_route(
+        endpoint=endpoint,
+        target=str(updated_url),
+        custom_headers={"x-api-key": "{}".format(anthropic_api_key)},
+        _forward_headers=True,
+    )  # dynamically construct pass-through endpoint based on incoming path
+    received_value = await endpoint_func(
+        request,
+        fastapi_response,
+        user_api_key_dict,
+        stream=is_streaming_request,  # type: ignore
+    )
+
+    return received_value
+
+
+@router.api_route(
+    "/bedrock/{endpoint:path}",
+    methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+    tags=["Bedrock Pass-through", "pass-through"],
+)
+async def bedrock_proxy_route(
+    endpoint: str,
+    request: Request,
+    fastapi_response: Response,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """
+    [Docs](https://docs.litellm.ai/docs/pass_through/bedrock)
+    """
+    create_request_copy(request)
+
+    try:
+        from botocore.auth import SigV4Auth
+        from botocore.awsrequest import AWSRequest
+        from botocore.credentials import Credentials
+    except ImportError:
+        raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.")
+
+    aws_region_name = litellm.utils.get_secret(secret_name="AWS_REGION_NAME")
+    if _is_bedrock_agent_runtime_route(endpoint=endpoint):  # handle bedrock agents
+        base_target_url = (
+            f"https://bedrock-agent-runtime.{aws_region_name}.amazonaws.com"
+        )
+    else:
+        base_target_url = f"https://bedrock-runtime.{aws_region_name}.amazonaws.com"
+    encoded_endpoint = httpx.URL(endpoint).path
+
+    # Ensure endpoint starts with '/' for proper URL construction
+    if not encoded_endpoint.startswith("/"):
+        encoded_endpoint = "/" + encoded_endpoint
+
+    # Construct the full target URL using httpx
+    base_url = httpx.URL(base_target_url)
+    updated_url = base_url.copy_with(path=encoded_endpoint)
+
+    # Add or update query parameters
+    from litellm.llms.bedrock.chat import BedrockConverseLLM
+
+    credentials: Credentials = BedrockConverseLLM().get_credentials()
+    sigv4 = SigV4Auth(credentials, "bedrock", aws_region_name)
+    headers = {"Content-Type": "application/json"}
+    # Assuming the body contains JSON data, parse it
+    try:
+        data = await request.json()
+    except Exception as e:
+        raise HTTPException(status_code=400, detail={"error": e})
+    _request = AWSRequest(
+        method="POST", url=str(updated_url), data=json.dumps(data), headers=headers
+    )
+    sigv4.add_auth(_request)
+    prepped = _request.prepare()
+
+    ## check for streaming
+    is_streaming_request = False
+    if "stream" in str(updated_url):
+        is_streaming_request = True
+
+    ## CREATE PASS-THROUGH
+    endpoint_func = create_pass_through_route(
+        endpoint=endpoint,
+        target=str(prepped.url),
+        custom_headers=prepped.headers,  # type: ignore
+    )  # dynamically construct pass-through endpoint based on incoming path
+    received_value = await endpoint_func(
+        request,
+        fastapi_response,
+        user_api_key_dict,
+        stream=is_streaming_request,  # type: ignore
+        custom_body=data,  # type: ignore
+        query_params={},  # type: ignore
+    )
+
+    return received_value
+
+
+def _is_bedrock_agent_runtime_route(endpoint: str) -> bool:
+    """
+    Return True, if the endpoint should be routed to the `bedrock-agent-runtime` endpoint.
+    """
+    for _route in BEDROCK_AGENT_RUNTIME_PASS_THROUGH_ROUTES:
+        if _route in endpoint:
+            return True
+    return False
+
+
+@router.api_route(
+    "/assemblyai/{endpoint:path}",
+    methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+    tags=["AssemblyAI Pass-through", "pass-through"],
+)
+@router.api_route(
+    "/eu.assemblyai/{endpoint:path}",
+    methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+    tags=["AssemblyAI EU Pass-through", "pass-through"],
+)
+async def assemblyai_proxy_route(
+    endpoint: str,
+    request: Request,
+    fastapi_response: Response,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    from litellm.proxy.pass_through_endpoints.llm_provider_handlers.assembly_passthrough_logging_handler import (
+        AssemblyAIPassthroughLoggingHandler,
+    )
+
+    """
+    [Docs](https://api.assemblyai.com)
+    """
+    # Set base URL based on the route
+    assembly_region = AssemblyAIPassthroughLoggingHandler._get_assembly_region_from_url(
+        url=str(request.url)
+    )
+    base_target_url = (
+        AssemblyAIPassthroughLoggingHandler._get_assembly_base_url_from_region(
+            region=assembly_region
+        )
+    )
+    encoded_endpoint = httpx.URL(endpoint).path
+    # Ensure endpoint starts with '/' for proper URL construction
+    if not encoded_endpoint.startswith("/"):
+        encoded_endpoint = "/" + encoded_endpoint
+
+    # Construct the full target URL using httpx
+    base_url = httpx.URL(base_target_url)
+    updated_url = base_url.copy_with(path=encoded_endpoint)
+
+    # Add or update query parameters
+    assemblyai_api_key = passthrough_endpoint_router.get_credentials(
+        custom_llm_provider="assemblyai",
+        region_name=assembly_region,
+    )
+
+    ## check for streaming
+    is_streaming_request = False
+    # assemblyai is streaming when 'stream' = True is in the body
+    if request.method == "POST":
+        _request_body = await request.json()
+        if _request_body.get("stream"):
+            is_streaming_request = True
+
+    ## CREATE PASS-THROUGH
+    endpoint_func = create_pass_through_route(
+        endpoint=endpoint,
+        target=str(updated_url),
+        custom_headers={"Authorization": "{}".format(assemblyai_api_key)},
+    )  # dynamically construct pass-through endpoint based on incoming path
+    received_value = await endpoint_func(
+        request=request,
+        fastapi_response=fastapi_response,
+        user_api_key_dict=user_api_key_dict,
+        stream=is_streaming_request,  # type: ignore
+    )
+
+    return received_value
+
+
+@router.api_route(
+    "/azure/{endpoint:path}",
+    methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+    tags=["Azure Pass-through", "pass-through"],
+)
+async def azure_proxy_route(
+    endpoint: str,
+    request: Request,
+    fastapi_response: Response,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """
+    Call any azure endpoint using the proxy.
+
+    Just use `{PROXY_BASE_URL}/azure/{endpoint:path}`
+    """
+    base_target_url = get_secret_str(secret_name="AZURE_API_BASE")
+    if base_target_url is None:
+        raise Exception(
+            "Required 'AZURE_API_BASE' in environment to make pass-through calls to Azure."
+        )
+    # Add or update query parameters
+    azure_api_key = passthrough_endpoint_router.get_credentials(
+        custom_llm_provider=litellm.LlmProviders.AZURE.value,
+        region_name=None,
+    )
+    if azure_api_key is None:
+        raise Exception(
+            "Required 'AZURE_API_KEY' in environment to make pass-through calls to Azure."
+        )
+
+    return await BaseOpenAIPassThroughHandler._base_openai_pass_through_handler(
+        endpoint=endpoint,
+        request=request,
+        fastapi_response=fastapi_response,
+        user_api_key_dict=user_api_key_dict,
+        base_target_url=base_target_url,
+        api_key=azure_api_key,
+        custom_llm_provider=litellm.LlmProviders.AZURE,
+    )
+
+
+@router.api_route(
+    "/openai/{endpoint:path}",
+    methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+    tags=["OpenAI Pass-through", "pass-through"],
+)
+async def openai_proxy_route(
+    endpoint: str,
+    request: Request,
+    fastapi_response: Response,
+    user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+    """
+    Simple pass-through for OpenAI. Use this if you want to directly send a request to OpenAI.
+
+
+    """
+    base_target_url = "https://api.openai.com/"
+    # Add or update query parameters
+    openai_api_key = passthrough_endpoint_router.get_credentials(
+        custom_llm_provider=litellm.LlmProviders.OPENAI.value,
+        region_name=None,
+    )
+    if openai_api_key is None:
+        raise Exception(
+            "Required 'OPENAI_API_KEY' in environment to make pass-through calls to OpenAI."
+        )
+
+    return await BaseOpenAIPassThroughHandler._base_openai_pass_through_handler(
+        endpoint=endpoint,
+        request=request,
+        fastapi_response=fastapi_response,
+        user_api_key_dict=user_api_key_dict,
+        base_target_url=base_target_url,
+        api_key=openai_api_key,
+        custom_llm_provider=litellm.LlmProviders.OPENAI,
+    )
+
+
+class BaseOpenAIPassThroughHandler:
+    @staticmethod
+    async def _base_openai_pass_through_handler(
+        endpoint: str,
+        request: Request,
+        fastapi_response: Response,
+        user_api_key_dict: UserAPIKeyAuth,
+        base_target_url: str,
+        api_key: str,
+        custom_llm_provider: litellm.LlmProviders,
+    ):
+        encoded_endpoint = httpx.URL(endpoint).path
+        # Ensure endpoint starts with '/' for proper URL construction
+        if not encoded_endpoint.startswith("/"):
+            encoded_endpoint = "/" + encoded_endpoint
+
+        # Construct the full target URL by properly joining the base URL and endpoint path
+        base_url = httpx.URL(base_target_url)
+        updated_url = BaseOpenAIPassThroughHandler._join_url_paths(
+            base_url=base_url,
+            path=encoded_endpoint,
+            custom_llm_provider=custom_llm_provider,
+        )
+
+        ## check for streaming
+        is_streaming_request = False
+        if "stream" in str(updated_url):
+            is_streaming_request = True
+
+        ## CREATE PASS-THROUGH
+        endpoint_func = create_pass_through_route(
+            endpoint=endpoint,
+            target=str(updated_url),
+            custom_headers=BaseOpenAIPassThroughHandler._assemble_headers(
+                api_key=api_key, request=request
+            ),
+        )  # dynamically construct pass-through endpoint based on incoming path
+        received_value = await endpoint_func(
+            request,
+            fastapi_response,
+            user_api_key_dict,
+            stream=is_streaming_request,  # type: ignore
+            query_params=dict(request.query_params),  # type: ignore
+        )
+
+        return received_value
+
+    @staticmethod
+    def _append_openai_beta_header(headers: dict, request: Request) -> dict:
+        """
+        Appends the OpenAI-Beta header to the headers if the request is an OpenAI Assistants API request
+        """
+        if (
+            RouteChecks._is_assistants_api_request(request) is True
+            and "OpenAI-Beta" not in headers
+        ):
+            headers["OpenAI-Beta"] = "assistants=v2"
+        return headers
+
+    @staticmethod
+    def _assemble_headers(api_key: str, request: Request) -> dict:
+        base_headers = {
+            "authorization": "Bearer {}".format(api_key),
+            "api-key": "{}".format(api_key),
+        }
+        return BaseOpenAIPassThroughHandler._append_openai_beta_header(
+            headers=base_headers,
+            request=request,
+        )
+
+    @staticmethod
+    def _join_url_paths(
+        base_url: httpx.URL, path: str, custom_llm_provider: litellm.LlmProviders
+    ) -> str:
+        """
+        Properly joins a base URL with a path, preserving any existing path in the base URL.
+        """
+        # Join paths correctly by removing trailing/leading slashes as needed
+        if not base_url.path or base_url.path == "/":
+            # If base URL has no path, just use the new path
+            joined_path_str = str(base_url.copy_with(path=path))
+        else:
+            # Otherwise, combine the paths
+            base_path = base_url.path.rstrip("/")
+            clean_path = path.lstrip("/")
+            full_path = f"{base_path}/{clean_path}"
+            joined_path_str = str(base_url.copy_with(path=full_path))
+
+        # Apply OpenAI-specific path handling for both branches
+        if (
+            custom_llm_provider == litellm.LlmProviders.OPENAI
+            and "/v1/" not in joined_path_str
+        ):
+            # Insert v1 after api.openai.com for OpenAI requests
+            joined_path_str = joined_path_str.replace(
+                "api.openai.com/", "api.openai.com/v1/"
+            )
+
+        return joined_path_str