aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py556
1 files changed, 556 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py b/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py
new file mode 100644
index 00000000..4724c7f9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py
@@ -0,0 +1,556 @@
+"""
+What is this?
+
+Provider-specific Pass-Through Endpoints
+
+Use litellm with Anthropic SDK, Vertex AI SDK, Cohere SDK, etc.
+"""
+
+from typing import Optional
+
+import httpx
+from fastapi import APIRouter, Depends, HTTPException, Request, Response
+
+import litellm
+from litellm.constants import BEDROCK_AGENT_RUNTIME_PASS_THROUGH_ROUTES
+from litellm.proxy._types import *
+from litellm.proxy.auth.route_checks import RouteChecks
+from litellm.proxy.auth.user_api_key_auth import user_api_key_auth
+from litellm.proxy.pass_through_endpoints.pass_through_endpoints import (
+ create_pass_through_route,
+)
+from litellm.secret_managers.main import get_secret_str
+
+from .passthrough_endpoint_router import PassthroughEndpointRouter
+
+router = APIRouter()
+default_vertex_config = None
+
+passthrough_endpoint_router = PassthroughEndpointRouter()
+
+
+def create_request_copy(request: Request):
+ return {
+ "method": request.method,
+ "url": str(request.url),
+ "headers": dict(request.headers),
+ "cookies": request.cookies,
+ "query_params": dict(request.query_params),
+ }
+
+
+@router.api_route(
+ "/gemini/{endpoint:path}",
+ methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+ tags=["Google AI Studio Pass-through", "pass-through"],
+)
+async def gemini_proxy_route(
+ endpoint: str,
+ request: Request,
+ fastapi_response: Response,
+):
+ """
+ [Docs](https://docs.litellm.ai/docs/pass_through/google_ai_studio)
+ """
+ ## CHECK FOR LITELLM API KEY IN THE QUERY PARAMS - ?..key=LITELLM_API_KEY
+ google_ai_studio_api_key = request.query_params.get("key") or request.headers.get(
+ "x-goog-api-key"
+ )
+
+ user_api_key_dict = await user_api_key_auth(
+ request=request, api_key=f"Bearer {google_ai_studio_api_key}"
+ )
+
+ base_target_url = "https://generativelanguage.googleapis.com"
+ encoded_endpoint = httpx.URL(endpoint).path
+
+ # Ensure endpoint starts with '/' for proper URL construction
+ if not encoded_endpoint.startswith("/"):
+ encoded_endpoint = "/" + encoded_endpoint
+
+ # Construct the full target URL using httpx
+ base_url = httpx.URL(base_target_url)
+ updated_url = base_url.copy_with(path=encoded_endpoint)
+
+ # Add or update query parameters
+ gemini_api_key: Optional[str] = passthrough_endpoint_router.get_credentials(
+ custom_llm_provider="gemini",
+ region_name=None,
+ )
+ if gemini_api_key is None:
+ raise Exception(
+ "Required 'GEMINI_API_KEY' in environment to make pass-through calls to Google AI Studio."
+ )
+ # Merge query parameters, giving precedence to those in updated_url
+ merged_params = dict(request.query_params)
+ merged_params.update({"key": gemini_api_key})
+
+ ## check for streaming
+ is_streaming_request = False
+ if "stream" in str(updated_url):
+ is_streaming_request = True
+
+ ## CREATE PASS-THROUGH
+ endpoint_func = create_pass_through_route(
+ endpoint=endpoint,
+ target=str(updated_url),
+ ) # dynamically construct pass-through endpoint based on incoming path
+ received_value = await endpoint_func(
+ request,
+ fastapi_response,
+ user_api_key_dict,
+ query_params=merged_params, # type: ignore
+ stream=is_streaming_request, # type: ignore
+ )
+
+ return received_value
+
+
+@router.api_route(
+ "/cohere/{endpoint:path}",
+ methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+ tags=["Cohere Pass-through", "pass-through"],
+)
+async def cohere_proxy_route(
+ endpoint: str,
+ request: Request,
+ fastapi_response: Response,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """
+ [Docs](https://docs.litellm.ai/docs/pass_through/cohere)
+ """
+ base_target_url = "https://api.cohere.com"
+ encoded_endpoint = httpx.URL(endpoint).path
+
+ # Ensure endpoint starts with '/' for proper URL construction
+ if not encoded_endpoint.startswith("/"):
+ encoded_endpoint = "/" + encoded_endpoint
+
+ # Construct the full target URL using httpx
+ base_url = httpx.URL(base_target_url)
+ updated_url = base_url.copy_with(path=encoded_endpoint)
+
+ # Add or update query parameters
+ cohere_api_key = passthrough_endpoint_router.get_credentials(
+ custom_llm_provider="cohere",
+ region_name=None,
+ )
+
+ ## check for streaming
+ is_streaming_request = False
+ if "stream" in str(updated_url):
+ is_streaming_request = True
+
+ ## CREATE PASS-THROUGH
+ endpoint_func = create_pass_through_route(
+ endpoint=endpoint,
+ target=str(updated_url),
+ custom_headers={"Authorization": "Bearer {}".format(cohere_api_key)},
+ ) # dynamically construct pass-through endpoint based on incoming path
+ received_value = await endpoint_func(
+ request,
+ fastapi_response,
+ user_api_key_dict,
+ stream=is_streaming_request, # type: ignore
+ )
+
+ return received_value
+
+
+@router.api_route(
+ "/anthropic/{endpoint:path}",
+ methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+ tags=["Anthropic Pass-through", "pass-through"],
+)
+async def anthropic_proxy_route(
+ endpoint: str,
+ request: Request,
+ fastapi_response: Response,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """
+ [Docs](https://docs.litellm.ai/docs/anthropic_completion)
+ """
+ base_target_url = "https://api.anthropic.com"
+ encoded_endpoint = httpx.URL(endpoint).path
+
+ # Ensure endpoint starts with '/' for proper URL construction
+ if not encoded_endpoint.startswith("/"):
+ encoded_endpoint = "/" + encoded_endpoint
+
+ # Construct the full target URL using httpx
+ base_url = httpx.URL(base_target_url)
+ updated_url = base_url.copy_with(path=encoded_endpoint)
+
+ # Add or update query parameters
+ anthropic_api_key = passthrough_endpoint_router.get_credentials(
+ custom_llm_provider="anthropic",
+ region_name=None,
+ )
+
+ ## check for streaming
+ is_streaming_request = False
+ # anthropic is streaming when 'stream' = True is in the body
+ if request.method == "POST":
+ _request_body = await request.json()
+ if _request_body.get("stream"):
+ is_streaming_request = True
+
+ ## CREATE PASS-THROUGH
+ endpoint_func = create_pass_through_route(
+ endpoint=endpoint,
+ target=str(updated_url),
+ custom_headers={"x-api-key": "{}".format(anthropic_api_key)},
+ _forward_headers=True,
+ ) # dynamically construct pass-through endpoint based on incoming path
+ received_value = await endpoint_func(
+ request,
+ fastapi_response,
+ user_api_key_dict,
+ stream=is_streaming_request, # type: ignore
+ )
+
+ return received_value
+
+
+@router.api_route(
+ "/bedrock/{endpoint:path}",
+ methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+ tags=["Bedrock Pass-through", "pass-through"],
+)
+async def bedrock_proxy_route(
+ endpoint: str,
+ request: Request,
+ fastapi_response: Response,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """
+ [Docs](https://docs.litellm.ai/docs/pass_through/bedrock)
+ """
+ create_request_copy(request)
+
+ try:
+ from botocore.auth import SigV4Auth
+ from botocore.awsrequest import AWSRequest
+ from botocore.credentials import Credentials
+ except ImportError:
+ raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.")
+
+ aws_region_name = litellm.utils.get_secret(secret_name="AWS_REGION_NAME")
+ if _is_bedrock_agent_runtime_route(endpoint=endpoint): # handle bedrock agents
+ base_target_url = (
+ f"https://bedrock-agent-runtime.{aws_region_name}.amazonaws.com"
+ )
+ else:
+ base_target_url = f"https://bedrock-runtime.{aws_region_name}.amazonaws.com"
+ encoded_endpoint = httpx.URL(endpoint).path
+
+ # Ensure endpoint starts with '/' for proper URL construction
+ if not encoded_endpoint.startswith("/"):
+ encoded_endpoint = "/" + encoded_endpoint
+
+ # Construct the full target URL using httpx
+ base_url = httpx.URL(base_target_url)
+ updated_url = base_url.copy_with(path=encoded_endpoint)
+
+ # Add or update query parameters
+ from litellm.llms.bedrock.chat import BedrockConverseLLM
+
+ credentials: Credentials = BedrockConverseLLM().get_credentials()
+ sigv4 = SigV4Auth(credentials, "bedrock", aws_region_name)
+ headers = {"Content-Type": "application/json"}
+ # Assuming the body contains JSON data, parse it
+ try:
+ data = await request.json()
+ except Exception as e:
+ raise HTTPException(status_code=400, detail={"error": e})
+ _request = AWSRequest(
+ method="POST", url=str(updated_url), data=json.dumps(data), headers=headers
+ )
+ sigv4.add_auth(_request)
+ prepped = _request.prepare()
+
+ ## check for streaming
+ is_streaming_request = False
+ if "stream" in str(updated_url):
+ is_streaming_request = True
+
+ ## CREATE PASS-THROUGH
+ endpoint_func = create_pass_through_route(
+ endpoint=endpoint,
+ target=str(prepped.url),
+ custom_headers=prepped.headers, # type: ignore
+ ) # dynamically construct pass-through endpoint based on incoming path
+ received_value = await endpoint_func(
+ request,
+ fastapi_response,
+ user_api_key_dict,
+ stream=is_streaming_request, # type: ignore
+ custom_body=data, # type: ignore
+ query_params={}, # type: ignore
+ )
+
+ return received_value
+
+
+def _is_bedrock_agent_runtime_route(endpoint: str) -> bool:
+ """
+ Return True, if the endpoint should be routed to the `bedrock-agent-runtime` endpoint.
+ """
+ for _route in BEDROCK_AGENT_RUNTIME_PASS_THROUGH_ROUTES:
+ if _route in endpoint:
+ return True
+ return False
+
+
+@router.api_route(
+ "/assemblyai/{endpoint:path}",
+ methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+ tags=["AssemblyAI Pass-through", "pass-through"],
+)
+@router.api_route(
+ "/eu.assemblyai/{endpoint:path}",
+ methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+ tags=["AssemblyAI EU Pass-through", "pass-through"],
+)
+async def assemblyai_proxy_route(
+ endpoint: str,
+ request: Request,
+ fastapi_response: Response,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ from litellm.proxy.pass_through_endpoints.llm_provider_handlers.assembly_passthrough_logging_handler import (
+ AssemblyAIPassthroughLoggingHandler,
+ )
+
+ """
+ [Docs](https://api.assemblyai.com)
+ """
+ # Set base URL based on the route
+ assembly_region = AssemblyAIPassthroughLoggingHandler._get_assembly_region_from_url(
+ url=str(request.url)
+ )
+ base_target_url = (
+ AssemblyAIPassthroughLoggingHandler._get_assembly_base_url_from_region(
+ region=assembly_region
+ )
+ )
+ encoded_endpoint = httpx.URL(endpoint).path
+ # Ensure endpoint starts with '/' for proper URL construction
+ if not encoded_endpoint.startswith("/"):
+ encoded_endpoint = "/" + encoded_endpoint
+
+ # Construct the full target URL using httpx
+ base_url = httpx.URL(base_target_url)
+ updated_url = base_url.copy_with(path=encoded_endpoint)
+
+ # Add or update query parameters
+ assemblyai_api_key = passthrough_endpoint_router.get_credentials(
+ custom_llm_provider="assemblyai",
+ region_name=assembly_region,
+ )
+
+ ## check for streaming
+ is_streaming_request = False
+ # assemblyai is streaming when 'stream' = True is in the body
+ if request.method == "POST":
+ _request_body = await request.json()
+ if _request_body.get("stream"):
+ is_streaming_request = True
+
+ ## CREATE PASS-THROUGH
+ endpoint_func = create_pass_through_route(
+ endpoint=endpoint,
+ target=str(updated_url),
+ custom_headers={"Authorization": "{}".format(assemblyai_api_key)},
+ ) # dynamically construct pass-through endpoint based on incoming path
+ received_value = await endpoint_func(
+ request=request,
+ fastapi_response=fastapi_response,
+ user_api_key_dict=user_api_key_dict,
+ stream=is_streaming_request, # type: ignore
+ )
+
+ return received_value
+
+
+@router.api_route(
+ "/azure/{endpoint:path}",
+ methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+ tags=["Azure Pass-through", "pass-through"],
+)
+async def azure_proxy_route(
+ endpoint: str,
+ request: Request,
+ fastapi_response: Response,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """
+ Call any azure endpoint using the proxy.
+
+ Just use `{PROXY_BASE_URL}/azure/{endpoint:path}`
+ """
+ base_target_url = get_secret_str(secret_name="AZURE_API_BASE")
+ if base_target_url is None:
+ raise Exception(
+ "Required 'AZURE_API_BASE' in environment to make pass-through calls to Azure."
+ )
+ # Add or update query parameters
+ azure_api_key = passthrough_endpoint_router.get_credentials(
+ custom_llm_provider=litellm.LlmProviders.AZURE.value,
+ region_name=None,
+ )
+ if azure_api_key is None:
+ raise Exception(
+ "Required 'AZURE_API_KEY' in environment to make pass-through calls to Azure."
+ )
+
+ return await BaseOpenAIPassThroughHandler._base_openai_pass_through_handler(
+ endpoint=endpoint,
+ request=request,
+ fastapi_response=fastapi_response,
+ user_api_key_dict=user_api_key_dict,
+ base_target_url=base_target_url,
+ api_key=azure_api_key,
+ custom_llm_provider=litellm.LlmProviders.AZURE,
+ )
+
+
+@router.api_route(
+ "/openai/{endpoint:path}",
+ methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
+ tags=["OpenAI Pass-through", "pass-through"],
+)
+async def openai_proxy_route(
+ endpoint: str,
+ request: Request,
+ fastapi_response: Response,
+ user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth),
+):
+ """
+ Simple pass-through for OpenAI. Use this if you want to directly send a request to OpenAI.
+
+
+ """
+ base_target_url = "https://api.openai.com/"
+ # Add or update query parameters
+ openai_api_key = passthrough_endpoint_router.get_credentials(
+ custom_llm_provider=litellm.LlmProviders.OPENAI.value,
+ region_name=None,
+ )
+ if openai_api_key is None:
+ raise Exception(
+ "Required 'OPENAI_API_KEY' in environment to make pass-through calls to OpenAI."
+ )
+
+ return await BaseOpenAIPassThroughHandler._base_openai_pass_through_handler(
+ endpoint=endpoint,
+ request=request,
+ fastapi_response=fastapi_response,
+ user_api_key_dict=user_api_key_dict,
+ base_target_url=base_target_url,
+ api_key=openai_api_key,
+ custom_llm_provider=litellm.LlmProviders.OPENAI,
+ )
+
+
+class BaseOpenAIPassThroughHandler:
+ @staticmethod
+ async def _base_openai_pass_through_handler(
+ endpoint: str,
+ request: Request,
+ fastapi_response: Response,
+ user_api_key_dict: UserAPIKeyAuth,
+ base_target_url: str,
+ api_key: str,
+ custom_llm_provider: litellm.LlmProviders,
+ ):
+ encoded_endpoint = httpx.URL(endpoint).path
+ # Ensure endpoint starts with '/' for proper URL construction
+ if not encoded_endpoint.startswith("/"):
+ encoded_endpoint = "/" + encoded_endpoint
+
+ # Construct the full target URL by properly joining the base URL and endpoint path
+ base_url = httpx.URL(base_target_url)
+ updated_url = BaseOpenAIPassThroughHandler._join_url_paths(
+ base_url=base_url,
+ path=encoded_endpoint,
+ custom_llm_provider=custom_llm_provider,
+ )
+
+ ## check for streaming
+ is_streaming_request = False
+ if "stream" in str(updated_url):
+ is_streaming_request = True
+
+ ## CREATE PASS-THROUGH
+ endpoint_func = create_pass_through_route(
+ endpoint=endpoint,
+ target=str(updated_url),
+ custom_headers=BaseOpenAIPassThroughHandler._assemble_headers(
+ api_key=api_key, request=request
+ ),
+ ) # dynamically construct pass-through endpoint based on incoming path
+ received_value = await endpoint_func(
+ request,
+ fastapi_response,
+ user_api_key_dict,
+ stream=is_streaming_request, # type: ignore
+ query_params=dict(request.query_params), # type: ignore
+ )
+
+ return received_value
+
+ @staticmethod
+ def _append_openai_beta_header(headers: dict, request: Request) -> dict:
+ """
+ Appends the OpenAI-Beta header to the headers if the request is an OpenAI Assistants API request
+ """
+ if (
+ RouteChecks._is_assistants_api_request(request) is True
+ and "OpenAI-Beta" not in headers
+ ):
+ headers["OpenAI-Beta"] = "assistants=v2"
+ return headers
+
+ @staticmethod
+ def _assemble_headers(api_key: str, request: Request) -> dict:
+ base_headers = {
+ "authorization": "Bearer {}".format(api_key),
+ "api-key": "{}".format(api_key),
+ }
+ return BaseOpenAIPassThroughHandler._append_openai_beta_header(
+ headers=base_headers,
+ request=request,
+ )
+
+ @staticmethod
+ def _join_url_paths(
+ base_url: httpx.URL, path: str, custom_llm_provider: litellm.LlmProviders
+ ) -> str:
+ """
+ Properly joins a base URL with a path, preserving any existing path in the base URL.
+ """
+ # Join paths correctly by removing trailing/leading slashes as needed
+ if not base_url.path or base_url.path == "/":
+ # If base URL has no path, just use the new path
+ joined_path_str = str(base_url.copy_with(path=path))
+ else:
+ # Otherwise, combine the paths
+ base_path = base_url.path.rstrip("/")
+ clean_path = path.lstrip("/")
+ full_path = f"{base_path}/{clean_path}"
+ joined_path_str = str(base_url.copy_with(path=full_path))
+
+ # Apply OpenAI-specific path handling for both branches
+ if (
+ custom_llm_provider == litellm.LlmProviders.OPENAI
+ and "/v1/" not in joined_path_str
+ ):
+ # Insert v1 after api.openai.com for OpenAI requests
+ joined_path_str = joined_path_str.replace(
+ "api.openai.com/", "api.openai.com/v1/"
+ )
+
+ return joined_path_str