diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py | 556 |
1 files changed, 556 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py b/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py new file mode 100644 index 00000000..4724c7f9 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/llm_passthrough_endpoints.py @@ -0,0 +1,556 @@ +""" +What is this? + +Provider-specific Pass-Through Endpoints + +Use litellm with Anthropic SDK, Vertex AI SDK, Cohere SDK, etc. +""" + +from typing import Optional + +import httpx +from fastapi import APIRouter, Depends, HTTPException, Request, Response + +import litellm +from litellm.constants import BEDROCK_AGENT_RUNTIME_PASS_THROUGH_ROUTES +from litellm.proxy._types import * +from litellm.proxy.auth.route_checks import RouteChecks +from litellm.proxy.auth.user_api_key_auth import user_api_key_auth +from litellm.proxy.pass_through_endpoints.pass_through_endpoints import ( + create_pass_through_route, +) +from litellm.secret_managers.main import get_secret_str + +from .passthrough_endpoint_router import PassthroughEndpointRouter + +router = APIRouter() +default_vertex_config = None + +passthrough_endpoint_router = PassthroughEndpointRouter() + + +def create_request_copy(request: Request): + return { + "method": request.method, + "url": str(request.url), + "headers": dict(request.headers), + "cookies": request.cookies, + "query_params": dict(request.query_params), + } + + +@router.api_route( + "/gemini/{endpoint:path}", + methods=["GET", "POST", "PUT", "DELETE", "PATCH"], + tags=["Google AI Studio Pass-through", "pass-through"], +) +async def gemini_proxy_route( + endpoint: str, + request: Request, + fastapi_response: Response, +): + """ + [Docs](https://docs.litellm.ai/docs/pass_through/google_ai_studio) + """ + ## CHECK FOR LITELLM API KEY IN THE QUERY PARAMS - ?..key=LITELLM_API_KEY + google_ai_studio_api_key = request.query_params.get("key") or request.headers.get( + "x-goog-api-key" + ) + + user_api_key_dict = await user_api_key_auth( + request=request, api_key=f"Bearer {google_ai_studio_api_key}" + ) + + base_target_url = "https://generativelanguage.googleapis.com" + encoded_endpoint = httpx.URL(endpoint).path + + # Ensure endpoint starts with '/' for proper URL construction + if not encoded_endpoint.startswith("/"): + encoded_endpoint = "/" + encoded_endpoint + + # Construct the full target URL using httpx + base_url = httpx.URL(base_target_url) + updated_url = base_url.copy_with(path=encoded_endpoint) + + # Add or update query parameters + gemini_api_key: Optional[str] = passthrough_endpoint_router.get_credentials( + custom_llm_provider="gemini", + region_name=None, + ) + if gemini_api_key is None: + raise Exception( + "Required 'GEMINI_API_KEY' in environment to make pass-through calls to Google AI Studio." + ) + # Merge query parameters, giving precedence to those in updated_url + merged_params = dict(request.query_params) + merged_params.update({"key": gemini_api_key}) + + ## check for streaming + is_streaming_request = False + if "stream" in str(updated_url): + is_streaming_request = True + + ## CREATE PASS-THROUGH + endpoint_func = create_pass_through_route( + endpoint=endpoint, + target=str(updated_url), + ) # dynamically construct pass-through endpoint based on incoming path + received_value = await endpoint_func( + request, + fastapi_response, + user_api_key_dict, + query_params=merged_params, # type: ignore + stream=is_streaming_request, # type: ignore + ) + + return received_value + + +@router.api_route( + "/cohere/{endpoint:path}", + methods=["GET", "POST", "PUT", "DELETE", "PATCH"], + tags=["Cohere Pass-through", "pass-through"], +) +async def cohere_proxy_route( + endpoint: str, + request: Request, + fastapi_response: Response, + user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), +): + """ + [Docs](https://docs.litellm.ai/docs/pass_through/cohere) + """ + base_target_url = "https://api.cohere.com" + encoded_endpoint = httpx.URL(endpoint).path + + # Ensure endpoint starts with '/' for proper URL construction + if not encoded_endpoint.startswith("/"): + encoded_endpoint = "/" + encoded_endpoint + + # Construct the full target URL using httpx + base_url = httpx.URL(base_target_url) + updated_url = base_url.copy_with(path=encoded_endpoint) + + # Add or update query parameters + cohere_api_key = passthrough_endpoint_router.get_credentials( + custom_llm_provider="cohere", + region_name=None, + ) + + ## check for streaming + is_streaming_request = False + if "stream" in str(updated_url): + is_streaming_request = True + + ## CREATE PASS-THROUGH + endpoint_func = create_pass_through_route( + endpoint=endpoint, + target=str(updated_url), + custom_headers={"Authorization": "Bearer {}".format(cohere_api_key)}, + ) # dynamically construct pass-through endpoint based on incoming path + received_value = await endpoint_func( + request, + fastapi_response, + user_api_key_dict, + stream=is_streaming_request, # type: ignore + ) + + return received_value + + +@router.api_route( + "/anthropic/{endpoint:path}", + methods=["GET", "POST", "PUT", "DELETE", "PATCH"], + tags=["Anthropic Pass-through", "pass-through"], +) +async def anthropic_proxy_route( + endpoint: str, + request: Request, + fastapi_response: Response, + user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), +): + """ + [Docs](https://docs.litellm.ai/docs/anthropic_completion) + """ + base_target_url = "https://api.anthropic.com" + encoded_endpoint = httpx.URL(endpoint).path + + # Ensure endpoint starts with '/' for proper URL construction + if not encoded_endpoint.startswith("/"): + encoded_endpoint = "/" + encoded_endpoint + + # Construct the full target URL using httpx + base_url = httpx.URL(base_target_url) + updated_url = base_url.copy_with(path=encoded_endpoint) + + # Add or update query parameters + anthropic_api_key = passthrough_endpoint_router.get_credentials( + custom_llm_provider="anthropic", + region_name=None, + ) + + ## check for streaming + is_streaming_request = False + # anthropic is streaming when 'stream' = True is in the body + if request.method == "POST": + _request_body = await request.json() + if _request_body.get("stream"): + is_streaming_request = True + + ## CREATE PASS-THROUGH + endpoint_func = create_pass_through_route( + endpoint=endpoint, + target=str(updated_url), + custom_headers={"x-api-key": "{}".format(anthropic_api_key)}, + _forward_headers=True, + ) # dynamically construct pass-through endpoint based on incoming path + received_value = await endpoint_func( + request, + fastapi_response, + user_api_key_dict, + stream=is_streaming_request, # type: ignore + ) + + return received_value + + +@router.api_route( + "/bedrock/{endpoint:path}", + methods=["GET", "POST", "PUT", "DELETE", "PATCH"], + tags=["Bedrock Pass-through", "pass-through"], +) +async def bedrock_proxy_route( + endpoint: str, + request: Request, + fastapi_response: Response, + user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), +): + """ + [Docs](https://docs.litellm.ai/docs/pass_through/bedrock) + """ + create_request_copy(request) + + try: + from botocore.auth import SigV4Auth + from botocore.awsrequest import AWSRequest + from botocore.credentials import Credentials + except ImportError: + raise ImportError("Missing boto3 to call bedrock. Run 'pip install boto3'.") + + aws_region_name = litellm.utils.get_secret(secret_name="AWS_REGION_NAME") + if _is_bedrock_agent_runtime_route(endpoint=endpoint): # handle bedrock agents + base_target_url = ( + f"https://bedrock-agent-runtime.{aws_region_name}.amazonaws.com" + ) + else: + base_target_url = f"https://bedrock-runtime.{aws_region_name}.amazonaws.com" + encoded_endpoint = httpx.URL(endpoint).path + + # Ensure endpoint starts with '/' for proper URL construction + if not encoded_endpoint.startswith("/"): + encoded_endpoint = "/" + encoded_endpoint + + # Construct the full target URL using httpx + base_url = httpx.URL(base_target_url) + updated_url = base_url.copy_with(path=encoded_endpoint) + + # Add or update query parameters + from litellm.llms.bedrock.chat import BedrockConverseLLM + + credentials: Credentials = BedrockConverseLLM().get_credentials() + sigv4 = SigV4Auth(credentials, "bedrock", aws_region_name) + headers = {"Content-Type": "application/json"} + # Assuming the body contains JSON data, parse it + try: + data = await request.json() + except Exception as e: + raise HTTPException(status_code=400, detail={"error": e}) + _request = AWSRequest( + method="POST", url=str(updated_url), data=json.dumps(data), headers=headers + ) + sigv4.add_auth(_request) + prepped = _request.prepare() + + ## check for streaming + is_streaming_request = False + if "stream" in str(updated_url): + is_streaming_request = True + + ## CREATE PASS-THROUGH + endpoint_func = create_pass_through_route( + endpoint=endpoint, + target=str(prepped.url), + custom_headers=prepped.headers, # type: ignore + ) # dynamically construct pass-through endpoint based on incoming path + received_value = await endpoint_func( + request, + fastapi_response, + user_api_key_dict, + stream=is_streaming_request, # type: ignore + custom_body=data, # type: ignore + query_params={}, # type: ignore + ) + + return received_value + + +def _is_bedrock_agent_runtime_route(endpoint: str) -> bool: + """ + Return True, if the endpoint should be routed to the `bedrock-agent-runtime` endpoint. + """ + for _route in BEDROCK_AGENT_RUNTIME_PASS_THROUGH_ROUTES: + if _route in endpoint: + return True + return False + + +@router.api_route( + "/assemblyai/{endpoint:path}", + methods=["GET", "POST", "PUT", "DELETE", "PATCH"], + tags=["AssemblyAI Pass-through", "pass-through"], +) +@router.api_route( + "/eu.assemblyai/{endpoint:path}", + methods=["GET", "POST", "PUT", "DELETE", "PATCH"], + tags=["AssemblyAI EU Pass-through", "pass-through"], +) +async def assemblyai_proxy_route( + endpoint: str, + request: Request, + fastapi_response: Response, + user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), +): + from litellm.proxy.pass_through_endpoints.llm_provider_handlers.assembly_passthrough_logging_handler import ( + AssemblyAIPassthroughLoggingHandler, + ) + + """ + [Docs](https://api.assemblyai.com) + """ + # Set base URL based on the route + assembly_region = AssemblyAIPassthroughLoggingHandler._get_assembly_region_from_url( + url=str(request.url) + ) + base_target_url = ( + AssemblyAIPassthroughLoggingHandler._get_assembly_base_url_from_region( + region=assembly_region + ) + ) + encoded_endpoint = httpx.URL(endpoint).path + # Ensure endpoint starts with '/' for proper URL construction + if not encoded_endpoint.startswith("/"): + encoded_endpoint = "/" + encoded_endpoint + + # Construct the full target URL using httpx + base_url = httpx.URL(base_target_url) + updated_url = base_url.copy_with(path=encoded_endpoint) + + # Add or update query parameters + assemblyai_api_key = passthrough_endpoint_router.get_credentials( + custom_llm_provider="assemblyai", + region_name=assembly_region, + ) + + ## check for streaming + is_streaming_request = False + # assemblyai is streaming when 'stream' = True is in the body + if request.method == "POST": + _request_body = await request.json() + if _request_body.get("stream"): + is_streaming_request = True + + ## CREATE PASS-THROUGH + endpoint_func = create_pass_through_route( + endpoint=endpoint, + target=str(updated_url), + custom_headers={"Authorization": "{}".format(assemblyai_api_key)}, + ) # dynamically construct pass-through endpoint based on incoming path + received_value = await endpoint_func( + request=request, + fastapi_response=fastapi_response, + user_api_key_dict=user_api_key_dict, + stream=is_streaming_request, # type: ignore + ) + + return received_value + + +@router.api_route( + "/azure/{endpoint:path}", + methods=["GET", "POST", "PUT", "DELETE", "PATCH"], + tags=["Azure Pass-through", "pass-through"], +) +async def azure_proxy_route( + endpoint: str, + request: Request, + fastapi_response: Response, + user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), +): + """ + Call any azure endpoint using the proxy. + + Just use `{PROXY_BASE_URL}/azure/{endpoint:path}` + """ + base_target_url = get_secret_str(secret_name="AZURE_API_BASE") + if base_target_url is None: + raise Exception( + "Required 'AZURE_API_BASE' in environment to make pass-through calls to Azure." + ) + # Add or update query parameters + azure_api_key = passthrough_endpoint_router.get_credentials( + custom_llm_provider=litellm.LlmProviders.AZURE.value, + region_name=None, + ) + if azure_api_key is None: + raise Exception( + "Required 'AZURE_API_KEY' in environment to make pass-through calls to Azure." + ) + + return await BaseOpenAIPassThroughHandler._base_openai_pass_through_handler( + endpoint=endpoint, + request=request, + fastapi_response=fastapi_response, + user_api_key_dict=user_api_key_dict, + base_target_url=base_target_url, + api_key=azure_api_key, + custom_llm_provider=litellm.LlmProviders.AZURE, + ) + + +@router.api_route( + "/openai/{endpoint:path}", + methods=["GET", "POST", "PUT", "DELETE", "PATCH"], + tags=["OpenAI Pass-through", "pass-through"], +) +async def openai_proxy_route( + endpoint: str, + request: Request, + fastapi_response: Response, + user_api_key_dict: UserAPIKeyAuth = Depends(user_api_key_auth), +): + """ + Simple pass-through for OpenAI. Use this if you want to directly send a request to OpenAI. + + + """ + base_target_url = "https://api.openai.com/" + # Add or update query parameters + openai_api_key = passthrough_endpoint_router.get_credentials( + custom_llm_provider=litellm.LlmProviders.OPENAI.value, + region_name=None, + ) + if openai_api_key is None: + raise Exception( + "Required 'OPENAI_API_KEY' in environment to make pass-through calls to OpenAI." + ) + + return await BaseOpenAIPassThroughHandler._base_openai_pass_through_handler( + endpoint=endpoint, + request=request, + fastapi_response=fastapi_response, + user_api_key_dict=user_api_key_dict, + base_target_url=base_target_url, + api_key=openai_api_key, + custom_llm_provider=litellm.LlmProviders.OPENAI, + ) + + +class BaseOpenAIPassThroughHandler: + @staticmethod + async def _base_openai_pass_through_handler( + endpoint: str, + request: Request, + fastapi_response: Response, + user_api_key_dict: UserAPIKeyAuth, + base_target_url: str, + api_key: str, + custom_llm_provider: litellm.LlmProviders, + ): + encoded_endpoint = httpx.URL(endpoint).path + # Ensure endpoint starts with '/' for proper URL construction + if not encoded_endpoint.startswith("/"): + encoded_endpoint = "/" + encoded_endpoint + + # Construct the full target URL by properly joining the base URL and endpoint path + base_url = httpx.URL(base_target_url) + updated_url = BaseOpenAIPassThroughHandler._join_url_paths( + base_url=base_url, + path=encoded_endpoint, + custom_llm_provider=custom_llm_provider, + ) + + ## check for streaming + is_streaming_request = False + if "stream" in str(updated_url): + is_streaming_request = True + + ## CREATE PASS-THROUGH + endpoint_func = create_pass_through_route( + endpoint=endpoint, + target=str(updated_url), + custom_headers=BaseOpenAIPassThroughHandler._assemble_headers( + api_key=api_key, request=request + ), + ) # dynamically construct pass-through endpoint based on incoming path + received_value = await endpoint_func( + request, + fastapi_response, + user_api_key_dict, + stream=is_streaming_request, # type: ignore + query_params=dict(request.query_params), # type: ignore + ) + + return received_value + + @staticmethod + def _append_openai_beta_header(headers: dict, request: Request) -> dict: + """ + Appends the OpenAI-Beta header to the headers if the request is an OpenAI Assistants API request + """ + if ( + RouteChecks._is_assistants_api_request(request) is True + and "OpenAI-Beta" not in headers + ): + headers["OpenAI-Beta"] = "assistants=v2" + return headers + + @staticmethod + def _assemble_headers(api_key: str, request: Request) -> dict: + base_headers = { + "authorization": "Bearer {}".format(api_key), + "api-key": "{}".format(api_key), + } + return BaseOpenAIPassThroughHandler._append_openai_beta_header( + headers=base_headers, + request=request, + ) + + @staticmethod + def _join_url_paths( + base_url: httpx.URL, path: str, custom_llm_provider: litellm.LlmProviders + ) -> str: + """ + Properly joins a base URL with a path, preserving any existing path in the base URL. + """ + # Join paths correctly by removing trailing/leading slashes as needed + if not base_url.path or base_url.path == "/": + # If base URL has no path, just use the new path + joined_path_str = str(base_url.copy_with(path=path)) + else: + # Otherwise, combine the paths + base_path = base_url.path.rstrip("/") + clean_path = path.lstrip("/") + full_path = f"{base_path}/{clean_path}" + joined_path_str = str(base_url.copy_with(path=full_path)) + + # Apply OpenAI-specific path handling for both branches + if ( + custom_llm_provider == litellm.LlmProviders.OPENAI + and "/v1/" not in joined_path_str + ): + # Insert v1 after api.openai.com for OpenAI requests + joined_path_str = joined_path_str.replace( + "api.openai.com/", "api.openai.com/v1/" + ) + + return joined_path_str |