aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/streaming_handler.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/streaming_handler.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/streaming_handler.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/streaming_handler.py160
1 files changed, 160 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/streaming_handler.py b/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/streaming_handler.py
new file mode 100644
index 00000000..b022bf1d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/proxy/pass_through_endpoints/streaming_handler.py
@@ -0,0 +1,160 @@
+import asyncio
+import threading
+from datetime import datetime
+from typing import List, Optional
+
+import httpx
+
+from litellm._logging import verbose_proxy_logger
+from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
+from litellm.proxy._types import PassThroughEndpointLoggingResultValues
+from litellm.types.utils import StandardPassThroughResponseObject
+
+from .llm_provider_handlers.anthropic_passthrough_logging_handler import (
+ AnthropicPassthroughLoggingHandler,
+)
+from .llm_provider_handlers.vertex_passthrough_logging_handler import (
+ VertexPassthroughLoggingHandler,
+)
+from .success_handler import PassThroughEndpointLogging
+from .types import EndpointType
+
+
+class PassThroughStreamingHandler:
+
+ @staticmethod
+ async def chunk_processor(
+ response: httpx.Response,
+ request_body: Optional[dict],
+ litellm_logging_obj: LiteLLMLoggingObj,
+ endpoint_type: EndpointType,
+ start_time: datetime,
+ passthrough_success_handler_obj: PassThroughEndpointLogging,
+ url_route: str,
+ ):
+ """
+ - Yields chunks from the response
+ - Collect non-empty chunks for post-processing (logging)
+ """
+ try:
+ raw_bytes: List[bytes] = []
+ async for chunk in response.aiter_bytes():
+ raw_bytes.append(chunk)
+ yield chunk
+
+ # After all chunks are processed, handle post-processing
+ end_time = datetime.now()
+
+ asyncio.create_task(
+ PassThroughStreamingHandler._route_streaming_logging_to_handler(
+ litellm_logging_obj=litellm_logging_obj,
+ passthrough_success_handler_obj=passthrough_success_handler_obj,
+ url_route=url_route,
+ request_body=request_body or {},
+ endpoint_type=endpoint_type,
+ start_time=start_time,
+ raw_bytes=raw_bytes,
+ end_time=end_time,
+ )
+ )
+ except Exception as e:
+ verbose_proxy_logger.error(f"Error in chunk_processor: {str(e)}")
+ raise
+
+ @staticmethod
+ async def _route_streaming_logging_to_handler(
+ litellm_logging_obj: LiteLLMLoggingObj,
+ passthrough_success_handler_obj: PassThroughEndpointLogging,
+ url_route: str,
+ request_body: dict,
+ endpoint_type: EndpointType,
+ start_time: datetime,
+ raw_bytes: List[bytes],
+ end_time: datetime,
+ ):
+ """
+ Route the logging for the collected chunks to the appropriate handler
+
+ Supported endpoint types:
+ - Anthropic
+ - Vertex AI
+ """
+ all_chunks = PassThroughStreamingHandler._convert_raw_bytes_to_str_lines(
+ raw_bytes
+ )
+ standard_logging_response_object: Optional[
+ PassThroughEndpointLoggingResultValues
+ ] = None
+ kwargs: dict = {}
+ if endpoint_type == EndpointType.ANTHROPIC:
+ anthropic_passthrough_logging_handler_result = AnthropicPassthroughLoggingHandler._handle_logging_anthropic_collected_chunks(
+ litellm_logging_obj=litellm_logging_obj,
+ passthrough_success_handler_obj=passthrough_success_handler_obj,
+ url_route=url_route,
+ request_body=request_body,
+ endpoint_type=endpoint_type,
+ start_time=start_time,
+ all_chunks=all_chunks,
+ end_time=end_time,
+ )
+ standard_logging_response_object = (
+ anthropic_passthrough_logging_handler_result["result"]
+ )
+ kwargs = anthropic_passthrough_logging_handler_result["kwargs"]
+ elif endpoint_type == EndpointType.VERTEX_AI:
+ vertex_passthrough_logging_handler_result = (
+ VertexPassthroughLoggingHandler._handle_logging_vertex_collected_chunks(
+ litellm_logging_obj=litellm_logging_obj,
+ passthrough_success_handler_obj=passthrough_success_handler_obj,
+ url_route=url_route,
+ request_body=request_body,
+ endpoint_type=endpoint_type,
+ start_time=start_time,
+ all_chunks=all_chunks,
+ end_time=end_time,
+ )
+ )
+ standard_logging_response_object = (
+ vertex_passthrough_logging_handler_result["result"]
+ )
+ kwargs = vertex_passthrough_logging_handler_result["kwargs"]
+
+ if standard_logging_response_object is None:
+ standard_logging_response_object = StandardPassThroughResponseObject(
+ response=f"cannot parse chunks to standard response object. Chunks={all_chunks}"
+ )
+ threading.Thread(
+ target=litellm_logging_obj.success_handler,
+ args=(
+ standard_logging_response_object,
+ start_time,
+ end_time,
+ False,
+ ),
+ ).start()
+ await litellm_logging_obj.async_success_handler(
+ result=standard_logging_response_object,
+ start_time=start_time,
+ end_time=end_time,
+ cache_hit=False,
+ **kwargs,
+ )
+
+ @staticmethod
+ def _convert_raw_bytes_to_str_lines(raw_bytes: List[bytes]) -> List[str]:
+ """
+ Converts a list of raw bytes into a list of string lines, similar to aiter_lines()
+
+ Args:
+ raw_bytes: List of bytes chunks from aiter.bytes()
+
+ Returns:
+ List of string lines, with each line being a complete data: {} chunk
+ """
+ # Combine all bytes and decode to string
+ combined_str = b"".join(raw_bytes).decode("utf-8")
+
+ # Split by newlines and filter out empty lines
+ lines = [line.strip() for line in combined_str.split("\n") if line.strip()]
+
+ return lines