about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/responses/streaming_iterator.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/responses/streaming_iterator.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/responses/streaming_iterator.py270
1 files changed, 270 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/responses/streaming_iterator.py b/.venv/lib/python3.12/site-packages/litellm/responses/streaming_iterator.py
new file mode 100644
index 00000000..3039efb9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/responses/streaming_iterator.py
@@ -0,0 +1,270 @@
+import asyncio
+import json
+from datetime import datetime
+from typing import Optional
+
+import httpx
+
+from litellm.constants import STREAM_SSE_DONE_STRING
+from litellm.litellm_core_utils.asyncify import run_async_function
+from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
+from litellm.litellm_core_utils.thread_pool_executor import executor
+from litellm.llms.base_llm.responses.transformation import BaseResponsesAPIConfig
+from litellm.types.llms.openai import (
+    ResponseCompletedEvent,
+    ResponsesAPIStreamEvents,
+    ResponsesAPIStreamingResponse,
+)
+from litellm.utils import CustomStreamWrapper
+
+
+class BaseResponsesAPIStreamingIterator:
+    """
+    Base class for streaming iterators that process responses from the Responses API.
+
+    This class contains shared logic for both synchronous and asynchronous iterators.
+    """
+
+    def __init__(
+        self,
+        response: httpx.Response,
+        model: str,
+        responses_api_provider_config: BaseResponsesAPIConfig,
+        logging_obj: LiteLLMLoggingObj,
+    ):
+        self.response = response
+        self.model = model
+        self.logging_obj = logging_obj
+        self.finished = False
+        self.responses_api_provider_config = responses_api_provider_config
+        self.completed_response: Optional[ResponsesAPIStreamingResponse] = None
+        self.start_time = datetime.now()
+
+    def _process_chunk(self, chunk):
+        """Process a single chunk of data from the stream"""
+        if not chunk:
+            return None
+
+        # Handle SSE format (data: {...})
+        chunk = CustomStreamWrapper._strip_sse_data_from_chunk(chunk)
+        if chunk is None:
+            return None
+
+        # Handle "[DONE]" marker
+        if chunk == STREAM_SSE_DONE_STRING:
+            self.finished = True
+            return None
+
+        try:
+            # Parse the JSON chunk
+            parsed_chunk = json.loads(chunk)
+
+            # Format as ResponsesAPIStreamingResponse
+            if isinstance(parsed_chunk, dict):
+                openai_responses_api_chunk = (
+                    self.responses_api_provider_config.transform_streaming_response(
+                        model=self.model,
+                        parsed_chunk=parsed_chunk,
+                        logging_obj=self.logging_obj,
+                    )
+                )
+                # Store the completed response
+                if (
+                    openai_responses_api_chunk
+                    and openai_responses_api_chunk.type
+                    == ResponsesAPIStreamEvents.RESPONSE_COMPLETED
+                ):
+                    self.completed_response = openai_responses_api_chunk
+                    self._handle_logging_completed_response()
+
+                return openai_responses_api_chunk
+
+            return None
+        except json.JSONDecodeError:
+            # If we can't parse the chunk, continue
+            return None
+
+    def _handle_logging_completed_response(self):
+        """Base implementation - should be overridden by subclasses"""
+        pass
+
+
+class ResponsesAPIStreamingIterator(BaseResponsesAPIStreamingIterator):
+    """
+    Async iterator for processing streaming responses from the Responses API.
+    """
+
+    def __init__(
+        self,
+        response: httpx.Response,
+        model: str,
+        responses_api_provider_config: BaseResponsesAPIConfig,
+        logging_obj: LiteLLMLoggingObj,
+    ):
+        super().__init__(response, model, responses_api_provider_config, logging_obj)
+        self.stream_iterator = response.aiter_lines()
+
+    def __aiter__(self):
+        return self
+
+    async def __anext__(self) -> ResponsesAPIStreamingResponse:
+        try:
+            while True:
+                # Get the next chunk from the stream
+                try:
+                    chunk = await self.stream_iterator.__anext__()
+                except StopAsyncIteration:
+                    self.finished = True
+                    raise StopAsyncIteration
+
+                result = self._process_chunk(chunk)
+
+                if self.finished:
+                    raise StopAsyncIteration
+                elif result is not None:
+                    return result
+                # If result is None, continue the loop to get the next chunk
+
+        except httpx.HTTPError as e:
+            # Handle HTTP errors
+            self.finished = True
+            raise e
+
+    def _handle_logging_completed_response(self):
+        """Handle logging for completed responses in async context"""
+        asyncio.create_task(
+            self.logging_obj.async_success_handler(
+                result=self.completed_response,
+                start_time=self.start_time,
+                end_time=datetime.now(),
+                cache_hit=None,
+            )
+        )
+
+        executor.submit(
+            self.logging_obj.success_handler,
+            result=self.completed_response,
+            cache_hit=None,
+            start_time=self.start_time,
+            end_time=datetime.now(),
+        )
+
+
+class SyncResponsesAPIStreamingIterator(BaseResponsesAPIStreamingIterator):
+    """
+    Synchronous iterator for processing streaming responses from the Responses API.
+    """
+
+    def __init__(
+        self,
+        response: httpx.Response,
+        model: str,
+        responses_api_provider_config: BaseResponsesAPIConfig,
+        logging_obj: LiteLLMLoggingObj,
+    ):
+        super().__init__(response, model, responses_api_provider_config, logging_obj)
+        self.stream_iterator = response.iter_lines()
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        try:
+            while True:
+                # Get the next chunk from the stream
+                try:
+                    chunk = next(self.stream_iterator)
+                except StopIteration:
+                    self.finished = True
+                    raise StopIteration
+
+                result = self._process_chunk(chunk)
+
+                if self.finished:
+                    raise StopIteration
+                elif result is not None:
+                    return result
+                # If result is None, continue the loop to get the next chunk
+
+        except httpx.HTTPError as e:
+            # Handle HTTP errors
+            self.finished = True
+            raise e
+
+    def _handle_logging_completed_response(self):
+        """Handle logging for completed responses in sync context"""
+        run_async_function(
+            async_function=self.logging_obj.async_success_handler,
+            result=self.completed_response,
+            start_time=self.start_time,
+            end_time=datetime.now(),
+            cache_hit=None,
+        )
+
+        executor.submit(
+            self.logging_obj.success_handler,
+            result=self.completed_response,
+            cache_hit=None,
+            start_time=self.start_time,
+            end_time=datetime.now(),
+        )
+
+
+class MockResponsesAPIStreamingIterator(BaseResponsesAPIStreamingIterator):
+    """
+    mock iterator - some models like o1-pro do not support streaming, we need to fake a stream
+    """
+
+    def __init__(
+        self,
+        response: httpx.Response,
+        model: str,
+        responses_api_provider_config: BaseResponsesAPIConfig,
+        logging_obj: LiteLLMLoggingObj,
+    ):
+        self.raw_http_response = response
+        super().__init__(
+            response=response,
+            model=model,
+            responses_api_provider_config=responses_api_provider_config,
+            logging_obj=logging_obj,
+        )
+        self.is_done = False
+
+    def __aiter__(self):
+        return self
+
+    async def __anext__(self) -> ResponsesAPIStreamingResponse:
+        if self.is_done:
+            raise StopAsyncIteration
+        self.is_done = True
+        transformed_response = (
+            self.responses_api_provider_config.transform_response_api_response(
+                model=self.model,
+                raw_response=self.raw_http_response,
+                logging_obj=self.logging_obj,
+            )
+        )
+        return ResponseCompletedEvent(
+            type=ResponsesAPIStreamEvents.RESPONSE_COMPLETED,
+            response=transformed_response,
+        )
+
+    def __iter__(self):
+        return self
+
+    def __next__(self) -> ResponsesAPIStreamingResponse:
+        if self.is_done:
+            raise StopIteration
+        self.is_done = True
+        transformed_response = (
+            self.responses_api_provider_config.transform_response_api_response(
+                model=self.model,
+                raw_response=self.raw_http_response,
+                logging_obj=self.logging_obj,
+            )
+        )
+        return ResponseCompletedEvent(
+            type=ResponsesAPIStreamEvents.RESPONSE_COMPLETED,
+            response=transformed_response,
+        )