about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py137
1 files changed, 137 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py b/.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py
new file mode 100644
index 00000000..67b1466c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py
@@ -0,0 +1,137 @@
+import json
+from abc import abstractmethod
+from typing import Optional, Union
+
+from litellm.types.utils import GenericStreamingChunk, ModelResponseStream
+
+
+class BaseModelResponseIterator:
+    def __init__(
+        self, streaming_response, sync_stream: bool, json_mode: Optional[bool] = False
+    ):
+        self.streaming_response = streaming_response
+        self.response_iterator = self.streaming_response
+        self.json_mode = json_mode
+
+    def chunk_parser(
+        self, chunk: dict
+    ) -> Union[GenericStreamingChunk, ModelResponseStream]:
+        return GenericStreamingChunk(
+            text="",
+            is_finished=False,
+            finish_reason="",
+            usage=None,
+            index=0,
+            tool_use=None,
+        )
+
+    # Sync iterator
+    def __iter__(self):
+        return self
+
+    def _handle_string_chunk(
+        self, str_line: str
+    ) -> Union[GenericStreamingChunk, ModelResponseStream]:
+        # chunk is a str at this point
+        if "[DONE]" in str_line:
+            return GenericStreamingChunk(
+                text="",
+                is_finished=True,
+                finish_reason="stop",
+                usage=None,
+                index=0,
+                tool_use=None,
+            )
+        elif str_line.startswith("data:"):
+            data_json = json.loads(str_line[5:])
+            return self.chunk_parser(chunk=data_json)
+        else:
+            return GenericStreamingChunk(
+                text="",
+                is_finished=False,
+                finish_reason="",
+                usage=None,
+                index=0,
+                tool_use=None,
+            )
+
+    def __next__(self):
+        try:
+            chunk = self.response_iterator.__next__()
+        except StopIteration:
+            raise StopIteration
+        except ValueError as e:
+            raise RuntimeError(f"Error receiving chunk from stream: {e}")
+
+        try:
+            str_line = chunk
+            if isinstance(chunk, bytes):  # Handle binary data
+                str_line = chunk.decode("utf-8")  # Convert bytes to string
+                index = str_line.find("data:")
+                if index != -1:
+                    str_line = str_line[index:]
+            # chunk is a str at this point
+            return self._handle_string_chunk(str_line=str_line)
+        except StopIteration:
+            raise StopIteration
+        except ValueError as e:
+            raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
+
+    # Async iterator
+    def __aiter__(self):
+        self.async_response_iterator = self.streaming_response.__aiter__()
+        return self
+
+    async def __anext__(self):
+        try:
+            chunk = await self.async_response_iterator.__anext__()
+        except StopAsyncIteration:
+            raise StopAsyncIteration
+        except ValueError as e:
+            raise RuntimeError(f"Error receiving chunk from stream: {e}")
+
+        try:
+            str_line = chunk
+            if isinstance(chunk, bytes):  # Handle binary data
+                str_line = chunk.decode("utf-8")  # Convert bytes to string
+                index = str_line.find("data:")
+                if index != -1:
+                    str_line = str_line[index:]
+
+            # chunk is a str at this point
+            return self._handle_string_chunk(str_line=str_line)
+        except StopAsyncIteration:
+            raise StopAsyncIteration
+        except ValueError as e:
+            raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
+
+
+class FakeStreamResponseIterator:
+    def __init__(self, model_response, json_mode: Optional[bool] = False):
+        self.model_response = model_response
+        self.json_mode = json_mode
+        self.is_done = False
+
+    # Sync iterator
+    def __iter__(self):
+        return self
+
+    @abstractmethod
+    def chunk_parser(self, chunk: dict) -> GenericStreamingChunk:
+        pass
+
+    def __next__(self):
+        if self.is_done:
+            raise StopIteration
+        self.is_done = True
+        return self.chunk_parser(self.model_response)
+
+    # Async iterator
+    def __aiter__(self):
+        return self
+
+    async def __anext__(self):
+        if self.is_done:
+            raise StopAsyncIteration
+        self.is_done = True
+        return self.chunk_parser(self.model_response)