aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py137
1 files changed, 137 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py b/.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py
new file mode 100644
index 00000000..67b1466c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py
@@ -0,0 +1,137 @@
+import json
+from abc import abstractmethod
+from typing import Optional, Union
+
+from litellm.types.utils import GenericStreamingChunk, ModelResponseStream
+
+
+class BaseModelResponseIterator:
+ def __init__(
+ self, streaming_response, sync_stream: bool, json_mode: Optional[bool] = False
+ ):
+ self.streaming_response = streaming_response
+ self.response_iterator = self.streaming_response
+ self.json_mode = json_mode
+
+ def chunk_parser(
+ self, chunk: dict
+ ) -> Union[GenericStreamingChunk, ModelResponseStream]:
+ return GenericStreamingChunk(
+ text="",
+ is_finished=False,
+ finish_reason="",
+ usage=None,
+ index=0,
+ tool_use=None,
+ )
+
+ # Sync iterator
+ def __iter__(self):
+ return self
+
+ def _handle_string_chunk(
+ self, str_line: str
+ ) -> Union[GenericStreamingChunk, ModelResponseStream]:
+ # chunk is a str at this point
+ if "[DONE]" in str_line:
+ return GenericStreamingChunk(
+ text="",
+ is_finished=True,
+ finish_reason="stop",
+ usage=None,
+ index=0,
+ tool_use=None,
+ )
+ elif str_line.startswith("data:"):
+ data_json = json.loads(str_line[5:])
+ return self.chunk_parser(chunk=data_json)
+ else:
+ return GenericStreamingChunk(
+ text="",
+ is_finished=False,
+ finish_reason="",
+ usage=None,
+ index=0,
+ tool_use=None,
+ )
+
+ def __next__(self):
+ try:
+ chunk = self.response_iterator.__next__()
+ except StopIteration:
+ raise StopIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error receiving chunk from stream: {e}")
+
+ try:
+ str_line = chunk
+ if isinstance(chunk, bytes): # Handle binary data
+ str_line = chunk.decode("utf-8") # Convert bytes to string
+ index = str_line.find("data:")
+ if index != -1:
+ str_line = str_line[index:]
+ # chunk is a str at this point
+ return self._handle_string_chunk(str_line=str_line)
+ except StopIteration:
+ raise StopIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
+
+ # Async iterator
+ def __aiter__(self):
+ self.async_response_iterator = self.streaming_response.__aiter__()
+ return self
+
+ async def __anext__(self):
+ try:
+ chunk = await self.async_response_iterator.__anext__()
+ except StopAsyncIteration:
+ raise StopAsyncIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error receiving chunk from stream: {e}")
+
+ try:
+ str_line = chunk
+ if isinstance(chunk, bytes): # Handle binary data
+ str_line = chunk.decode("utf-8") # Convert bytes to string
+ index = str_line.find("data:")
+ if index != -1:
+ str_line = str_line[index:]
+
+ # chunk is a str at this point
+ return self._handle_string_chunk(str_line=str_line)
+ except StopAsyncIteration:
+ raise StopAsyncIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
+
+
+class FakeStreamResponseIterator:
+ def __init__(self, model_response, json_mode: Optional[bool] = False):
+ self.model_response = model_response
+ self.json_mode = json_mode
+ self.is_done = False
+
+ # Sync iterator
+ def __iter__(self):
+ return self
+
+ @abstractmethod
+ def chunk_parser(self, chunk: dict) -> GenericStreamingChunk:
+ pass
+
+ def __next__(self):
+ if self.is_done:
+ raise StopIteration
+ self.is_done = True
+ return self.chunk_parser(self.model_response)
+
+ # Async iterator
+ def __aiter__(self):
+ return self
+
+ async def __anext__(self):
+ if self.is_done:
+ raise StopAsyncIteration
+ self.is_done = True
+ return self.chunk_parser(self.model_response)