diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py | 137 |
1 files changed, 137 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py b/.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py new file mode 100644 index 00000000..67b1466c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/llms/base_llm/base_model_iterator.py @@ -0,0 +1,137 @@ +import json +from abc import abstractmethod +from typing import Optional, Union + +from litellm.types.utils import GenericStreamingChunk, ModelResponseStream + + +class BaseModelResponseIterator: + def __init__( + self, streaming_response, sync_stream: bool, json_mode: Optional[bool] = False + ): + self.streaming_response = streaming_response + self.response_iterator = self.streaming_response + self.json_mode = json_mode + + def chunk_parser( + self, chunk: dict + ) -> Union[GenericStreamingChunk, ModelResponseStream]: + return GenericStreamingChunk( + text="", + is_finished=False, + finish_reason="", + usage=None, + index=0, + tool_use=None, + ) + + # Sync iterator + def __iter__(self): + return self + + def _handle_string_chunk( + self, str_line: str + ) -> Union[GenericStreamingChunk, ModelResponseStream]: + # chunk is a str at this point + if "[DONE]" in str_line: + return GenericStreamingChunk( + text="", + is_finished=True, + finish_reason="stop", + usage=None, + index=0, + tool_use=None, + ) + elif str_line.startswith("data:"): + data_json = json.loads(str_line[5:]) + return self.chunk_parser(chunk=data_json) + else: + return GenericStreamingChunk( + text="", + is_finished=False, + finish_reason="", + usage=None, + index=0, + tool_use=None, + ) + + def __next__(self): + try: + chunk = self.response_iterator.__next__() + except StopIteration: + raise StopIteration + except ValueError as e: + raise RuntimeError(f"Error receiving chunk from stream: {e}") + + try: + str_line = chunk + if isinstance(chunk, bytes): # Handle binary data + str_line = chunk.decode("utf-8") # Convert bytes to string + index = str_line.find("data:") + if index != -1: + str_line = str_line[index:] + # chunk is a str at this point + return self._handle_string_chunk(str_line=str_line) + except StopIteration: + raise StopIteration + except ValueError as e: + raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}") + + # Async iterator + def __aiter__(self): + self.async_response_iterator = self.streaming_response.__aiter__() + return self + + async def __anext__(self): + try: + chunk = await self.async_response_iterator.__anext__() + except StopAsyncIteration: + raise StopAsyncIteration + except ValueError as e: + raise RuntimeError(f"Error receiving chunk from stream: {e}") + + try: + str_line = chunk + if isinstance(chunk, bytes): # Handle binary data + str_line = chunk.decode("utf-8") # Convert bytes to string + index = str_line.find("data:") + if index != -1: + str_line = str_line[index:] + + # chunk is a str at this point + return self._handle_string_chunk(str_line=str_line) + except StopAsyncIteration: + raise StopAsyncIteration + except ValueError as e: + raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}") + + +class FakeStreamResponseIterator: + def __init__(self, model_response, json_mode: Optional[bool] = False): + self.model_response = model_response + self.json_mode = json_mode + self.is_done = False + + # Sync iterator + def __iter__(self): + return self + + @abstractmethod + def chunk_parser(self, chunk: dict) -> GenericStreamingChunk: + pass + + def __next__(self): + if self.is_done: + raise StopIteration + self.is_done = True + return self.chunk_parser(self.model_response) + + # Async iterator + def __aiter__(self): + return self + + async def __anext__(self): + if self.is_done: + raise StopAsyncIteration + self.is_done = True + return self.chunk_parser(self.model_response) |