diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/llms/databricks/streaming_utils.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/llms/databricks/streaming_utils.py | 166 |
1 files changed, 166 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/databricks/streaming_utils.py b/.venv/lib/python3.12/site-packages/litellm/llms/databricks/streaming_utils.py new file mode 100644 index 00000000..2db53df9 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/llms/databricks/streaming_utils.py @@ -0,0 +1,166 @@ +import json +from typing import Optional + +import litellm +from litellm import verbose_logger +from litellm.types.llms.openai import ( + ChatCompletionToolCallChunk, + ChatCompletionToolCallFunctionChunk, + ChatCompletionUsageBlock, +) +from litellm.types.utils import GenericStreamingChunk, Usage + + +class ModelResponseIterator: + def __init__(self, streaming_response, sync_stream: bool): + self.streaming_response = streaming_response + + def chunk_parser(self, chunk: dict) -> GenericStreamingChunk: + try: + processed_chunk = litellm.ModelResponseStream(**chunk) + + text = "" + tool_use: Optional[ChatCompletionToolCallChunk] = None + is_finished = False + finish_reason = "" + usage: Optional[ChatCompletionUsageBlock] = None + + if processed_chunk.choices[0].delta.content is not None: # type: ignore + text = processed_chunk.choices[0].delta.content # type: ignore + + if ( + processed_chunk.choices[0].delta.tool_calls is not None # type: ignore + and len(processed_chunk.choices[0].delta.tool_calls) > 0 # type: ignore + and processed_chunk.choices[0].delta.tool_calls[0].function is not None # type: ignore + and processed_chunk.choices[0].delta.tool_calls[0].function.arguments # type: ignore + is not None + ): + tool_use = ChatCompletionToolCallChunk( + id=processed_chunk.choices[0].delta.tool_calls[0].id, # type: ignore + type="function", + function=ChatCompletionToolCallFunctionChunk( + name=processed_chunk.choices[0] + .delta.tool_calls[0] # type: ignore + .function.name, + arguments=processed_chunk.choices[0] + .delta.tool_calls[0] # type: ignore + .function.arguments, + ), + index=processed_chunk.choices[0].delta.tool_calls[0].index, + ) + + if processed_chunk.choices[0].finish_reason is not None: + is_finished = True + finish_reason = processed_chunk.choices[0].finish_reason + + usage_chunk: Optional[Usage] = getattr(processed_chunk, "usage", None) + if usage_chunk is not None: + + usage = ChatCompletionUsageBlock( + prompt_tokens=usage_chunk.prompt_tokens, + completion_tokens=usage_chunk.completion_tokens, + total_tokens=usage_chunk.total_tokens, + ) + + return GenericStreamingChunk( + text=text, + tool_use=tool_use, + is_finished=is_finished, + finish_reason=finish_reason, + usage=usage, + index=0, + ) + except json.JSONDecodeError: + raise ValueError(f"Failed to decode JSON from chunk: {chunk}") + + # Sync iterator + def __iter__(self): + self.response_iterator = self.streaming_response + return self + + def __next__(self): + if not hasattr(self, "response_iterator"): + self.response_iterator = self.streaming_response + try: + chunk = self.response_iterator.__next__() + except StopIteration: + raise StopIteration + except ValueError as e: + raise RuntimeError(f"Error receiving chunk from stream: {e}") + + try: + chunk = litellm.CustomStreamWrapper._strip_sse_data_from_chunk(chunk) or "" + chunk = chunk.strip() + if len(chunk) > 0: + json_chunk = json.loads(chunk) + return self.chunk_parser(chunk=json_chunk) + else: + return GenericStreamingChunk( + text="", + is_finished=False, + finish_reason="", + usage=None, + index=0, + tool_use=None, + ) + except StopIteration: + raise StopIteration + except ValueError as e: + verbose_logger.debug( + f"Error parsing chunk: {e},\nReceived chunk: {chunk}. Defaulting to empty chunk here." + ) + return GenericStreamingChunk( + text="", + is_finished=False, + finish_reason="", + usage=None, + index=0, + tool_use=None, + ) + + # Async iterator + def __aiter__(self): + self.async_response_iterator = self.streaming_response.__aiter__() + return self + + async def __anext__(self): + try: + chunk = await self.async_response_iterator.__anext__() + except StopAsyncIteration: + raise StopAsyncIteration + except ValueError as e: + raise RuntimeError(f"Error receiving chunk from stream: {e}") + except Exception as e: + raise RuntimeError(f"Error receiving chunk from stream: {e}") + + try: + chunk = litellm.CustomStreamWrapper._strip_sse_data_from_chunk(chunk) or "" + chunk = chunk.strip() + if chunk == "[DONE]": + raise StopAsyncIteration + if len(chunk) > 0: + json_chunk = json.loads(chunk) + return self.chunk_parser(chunk=json_chunk) + else: + return GenericStreamingChunk( + text="", + is_finished=False, + finish_reason="", + usage=None, + index=0, + tool_use=None, + ) + except StopAsyncIteration: + raise StopAsyncIteration + except ValueError as e: + verbose_logger.debug( + f"Error parsing chunk: {e},\nReceived chunk: {chunk}. Defaulting to empty chunk here." + ) + return GenericStreamingChunk( + text="", + is_finished=False, + finish_reason="", + usage=None, + index=0, + tool_use=None, + ) |