aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/llms/databricks/streaming_utils.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/llms/databricks/streaming_utils.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/databricks/streaming_utils.py166
1 files changed, 166 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/databricks/streaming_utils.py b/.venv/lib/python3.12/site-packages/litellm/llms/databricks/streaming_utils.py
new file mode 100644
index 00000000..2db53df9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/databricks/streaming_utils.py
@@ -0,0 +1,166 @@
+import json
+from typing import Optional
+
+import litellm
+from litellm import verbose_logger
+from litellm.types.llms.openai import (
+ ChatCompletionToolCallChunk,
+ ChatCompletionToolCallFunctionChunk,
+ ChatCompletionUsageBlock,
+)
+from litellm.types.utils import GenericStreamingChunk, Usage
+
+
+class ModelResponseIterator:
+ def __init__(self, streaming_response, sync_stream: bool):
+ self.streaming_response = streaming_response
+
+ def chunk_parser(self, chunk: dict) -> GenericStreamingChunk:
+ try:
+ processed_chunk = litellm.ModelResponseStream(**chunk)
+
+ text = ""
+ tool_use: Optional[ChatCompletionToolCallChunk] = None
+ is_finished = False
+ finish_reason = ""
+ usage: Optional[ChatCompletionUsageBlock] = None
+
+ if processed_chunk.choices[0].delta.content is not None: # type: ignore
+ text = processed_chunk.choices[0].delta.content # type: ignore
+
+ if (
+ processed_chunk.choices[0].delta.tool_calls is not None # type: ignore
+ and len(processed_chunk.choices[0].delta.tool_calls) > 0 # type: ignore
+ and processed_chunk.choices[0].delta.tool_calls[0].function is not None # type: ignore
+ and processed_chunk.choices[0].delta.tool_calls[0].function.arguments # type: ignore
+ is not None
+ ):
+ tool_use = ChatCompletionToolCallChunk(
+ id=processed_chunk.choices[0].delta.tool_calls[0].id, # type: ignore
+ type="function",
+ function=ChatCompletionToolCallFunctionChunk(
+ name=processed_chunk.choices[0]
+ .delta.tool_calls[0] # type: ignore
+ .function.name,
+ arguments=processed_chunk.choices[0]
+ .delta.tool_calls[0] # type: ignore
+ .function.arguments,
+ ),
+ index=processed_chunk.choices[0].delta.tool_calls[0].index,
+ )
+
+ if processed_chunk.choices[0].finish_reason is not None:
+ is_finished = True
+ finish_reason = processed_chunk.choices[0].finish_reason
+
+ usage_chunk: Optional[Usage] = getattr(processed_chunk, "usage", None)
+ if usage_chunk is not None:
+
+ usage = ChatCompletionUsageBlock(
+ prompt_tokens=usage_chunk.prompt_tokens,
+ completion_tokens=usage_chunk.completion_tokens,
+ total_tokens=usage_chunk.total_tokens,
+ )
+
+ return GenericStreamingChunk(
+ text=text,
+ tool_use=tool_use,
+ is_finished=is_finished,
+ finish_reason=finish_reason,
+ usage=usage,
+ index=0,
+ )
+ except json.JSONDecodeError:
+ raise ValueError(f"Failed to decode JSON from chunk: {chunk}")
+
+ # Sync iterator
+ def __iter__(self):
+ self.response_iterator = self.streaming_response
+ return self
+
+ def __next__(self):
+ if not hasattr(self, "response_iterator"):
+ self.response_iterator = self.streaming_response
+ try:
+ chunk = self.response_iterator.__next__()
+ except StopIteration:
+ raise StopIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error receiving chunk from stream: {e}")
+
+ try:
+ chunk = litellm.CustomStreamWrapper._strip_sse_data_from_chunk(chunk) or ""
+ chunk = chunk.strip()
+ if len(chunk) > 0:
+ json_chunk = json.loads(chunk)
+ return self.chunk_parser(chunk=json_chunk)
+ else:
+ return GenericStreamingChunk(
+ text="",
+ is_finished=False,
+ finish_reason="",
+ usage=None,
+ index=0,
+ tool_use=None,
+ )
+ except StopIteration:
+ raise StopIteration
+ except ValueError as e:
+ verbose_logger.debug(
+ f"Error parsing chunk: {e},\nReceived chunk: {chunk}. Defaulting to empty chunk here."
+ )
+ return GenericStreamingChunk(
+ text="",
+ is_finished=False,
+ finish_reason="",
+ usage=None,
+ index=0,
+ tool_use=None,
+ )
+
+ # Async iterator
+ def __aiter__(self):
+ self.async_response_iterator = self.streaming_response.__aiter__()
+ return self
+
+ async def __anext__(self):
+ try:
+ chunk = await self.async_response_iterator.__anext__()
+ except StopAsyncIteration:
+ raise StopAsyncIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error receiving chunk from stream: {e}")
+ except Exception as e:
+ raise RuntimeError(f"Error receiving chunk from stream: {e}")
+
+ try:
+ chunk = litellm.CustomStreamWrapper._strip_sse_data_from_chunk(chunk) or ""
+ chunk = chunk.strip()
+ if chunk == "[DONE]":
+ raise StopAsyncIteration
+ if len(chunk) > 0:
+ json_chunk = json.loads(chunk)
+ return self.chunk_parser(chunk=json_chunk)
+ else:
+ return GenericStreamingChunk(
+ text="",
+ is_finished=False,
+ finish_reason="",
+ usage=None,
+ index=0,
+ tool_use=None,
+ )
+ except StopAsyncIteration:
+ raise StopAsyncIteration
+ except ValueError as e:
+ verbose_logger.debug(
+ f"Error parsing chunk: {e},\nReceived chunk: {chunk}. Defaulting to empty chunk here."
+ )
+ return GenericStreamingChunk(
+ text="",
+ is_finished=False,
+ finish_reason="",
+ usage=None,
+ index=0,
+ tool_use=None,
+ )