aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py839
1 files changed, 839 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py
new file mode 100644
index 00000000..f2c5f390
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py
@@ -0,0 +1,839 @@
+"""
+Calling + translation logic for anthropic's `/v1/messages` endpoint
+"""
+
+import copy
+import json
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+
+import httpx # type: ignore
+
+import litellm
+import litellm.litellm_core_utils
+import litellm.types
+import litellm.types.utils
+from litellm import LlmProviders
+from litellm.litellm_core_utils.core_helpers import map_finish_reason
+from litellm.llms.base_llm.chat.transformation import BaseConfig
+from litellm.llms.custom_httpx.http_handler import (
+ AsyncHTTPHandler,
+ HTTPHandler,
+ get_async_httpx_client,
+)
+from litellm.types.llms.anthropic import (
+ AnthropicChatCompletionUsageBlock,
+ ContentBlockDelta,
+ ContentBlockStart,
+ ContentBlockStop,
+ MessageBlockDelta,
+ MessageStartBlock,
+ UsageDelta,
+)
+from litellm.types.llms.openai import (
+ ChatCompletionThinkingBlock,
+ ChatCompletionToolCallChunk,
+ ChatCompletionUsageBlock,
+)
+from litellm.types.utils import (
+ Delta,
+ GenericStreamingChunk,
+ ModelResponseStream,
+ StreamingChoices,
+)
+from litellm.utils import CustomStreamWrapper, ModelResponse, ProviderConfigManager
+
+from ...base import BaseLLM
+from ..common_utils import AnthropicError, process_anthropic_headers
+from .transformation import AnthropicConfig
+
+
+async def make_call(
+ client: Optional[AsyncHTTPHandler],
+ api_base: str,
+ headers: dict,
+ data: str,
+ model: str,
+ messages: list,
+ logging_obj,
+ timeout: Optional[Union[float, httpx.Timeout]],
+ json_mode: bool,
+) -> Tuple[Any, httpx.Headers]:
+ if client is None:
+ client = litellm.module_level_aclient
+
+ try:
+ response = await client.post(
+ api_base, headers=headers, data=data, stream=True, timeout=timeout
+ )
+ except httpx.HTTPStatusError as e:
+ error_headers = getattr(e, "headers", None)
+ error_response = getattr(e, "response", None)
+ if error_headers is None and error_response:
+ error_headers = getattr(error_response, "headers", None)
+ raise AnthropicError(
+ status_code=e.response.status_code,
+ message=await e.response.aread(),
+ headers=error_headers,
+ )
+ except Exception as e:
+ for exception in litellm.LITELLM_EXCEPTION_TYPES:
+ if isinstance(e, exception):
+ raise e
+ raise AnthropicError(status_code=500, message=str(e))
+
+ completion_stream = ModelResponseIterator(
+ streaming_response=response.aiter_lines(),
+ sync_stream=False,
+ json_mode=json_mode,
+ )
+
+ # LOGGING
+ logging_obj.post_call(
+ input=messages,
+ api_key="",
+ original_response=completion_stream, # Pass the completion stream for logging
+ additional_args={"complete_input_dict": data},
+ )
+
+ return completion_stream, response.headers
+
+
+def make_sync_call(
+ client: Optional[HTTPHandler],
+ api_base: str,
+ headers: dict,
+ data: str,
+ model: str,
+ messages: list,
+ logging_obj,
+ timeout: Optional[Union[float, httpx.Timeout]],
+ json_mode: bool,
+) -> Tuple[Any, httpx.Headers]:
+ if client is None:
+ client = litellm.module_level_client # re-use a module level client
+
+ try:
+ response = client.post(
+ api_base, headers=headers, data=data, stream=True, timeout=timeout
+ )
+ except httpx.HTTPStatusError as e:
+ error_headers = getattr(e, "headers", None)
+ error_response = getattr(e, "response", None)
+ if error_headers is None and error_response:
+ error_headers = getattr(error_response, "headers", None)
+ raise AnthropicError(
+ status_code=e.response.status_code,
+ message=e.response.read(),
+ headers=error_headers,
+ )
+ except Exception as e:
+ for exception in litellm.LITELLM_EXCEPTION_TYPES:
+ if isinstance(e, exception):
+ raise e
+ raise AnthropicError(status_code=500, message=str(e))
+
+ if response.status_code != 200:
+ response_headers = getattr(response, "headers", None)
+ raise AnthropicError(
+ status_code=response.status_code,
+ message=response.read(),
+ headers=response_headers,
+ )
+
+ completion_stream = ModelResponseIterator(
+ streaming_response=response.iter_lines(), sync_stream=True, json_mode=json_mode
+ )
+
+ # LOGGING
+ logging_obj.post_call(
+ input=messages,
+ api_key="",
+ original_response="first stream response received",
+ additional_args={"complete_input_dict": data},
+ )
+
+ return completion_stream, response.headers
+
+
+class AnthropicChatCompletion(BaseLLM):
+ def __init__(self) -> None:
+ super().__init__()
+
+ async def acompletion_stream_function(
+ self,
+ model: str,
+ messages: list,
+ api_base: str,
+ custom_prompt_dict: dict,
+ model_response: ModelResponse,
+ print_verbose: Callable,
+ timeout: Union[float, httpx.Timeout],
+ client: Optional[AsyncHTTPHandler],
+ encoding,
+ api_key,
+ logging_obj,
+ stream,
+ _is_function_call,
+ data: dict,
+ json_mode: bool,
+ optional_params=None,
+ litellm_params=None,
+ logger_fn=None,
+ headers={},
+ ):
+ data["stream"] = True
+
+ completion_stream, headers = await make_call(
+ client=client,
+ api_base=api_base,
+ headers=headers,
+ data=json.dumps(data),
+ model=model,
+ messages=messages,
+ logging_obj=logging_obj,
+ timeout=timeout,
+ json_mode=json_mode,
+ )
+ streamwrapper = CustomStreamWrapper(
+ completion_stream=completion_stream,
+ model=model,
+ custom_llm_provider="anthropic",
+ logging_obj=logging_obj,
+ _response_headers=process_anthropic_headers(headers),
+ )
+ return streamwrapper
+
+ async def acompletion_function(
+ self,
+ model: str,
+ messages: list,
+ api_base: str,
+ custom_prompt_dict: dict,
+ model_response: ModelResponse,
+ print_verbose: Callable,
+ timeout: Union[float, httpx.Timeout],
+ encoding,
+ api_key,
+ logging_obj,
+ stream,
+ _is_function_call,
+ data: dict,
+ optional_params: dict,
+ json_mode: bool,
+ litellm_params: dict,
+ provider_config: BaseConfig,
+ logger_fn=None,
+ headers={},
+ client: Optional[AsyncHTTPHandler] = None,
+ ) -> Union[ModelResponse, CustomStreamWrapper]:
+ async_handler = client or get_async_httpx_client(
+ llm_provider=litellm.LlmProviders.ANTHROPIC
+ )
+
+ try:
+ response = await async_handler.post(
+ api_base, headers=headers, json=data, timeout=timeout
+ )
+ except Exception as e:
+ ## LOGGING
+ logging_obj.post_call(
+ input=messages,
+ api_key=api_key,
+ original_response=str(e),
+ additional_args={"complete_input_dict": data},
+ )
+ status_code = getattr(e, "status_code", 500)
+ error_headers = getattr(e, "headers", None)
+ error_text = getattr(e, "text", str(e))
+ error_response = getattr(e, "response", None)
+ if error_headers is None and error_response:
+ error_headers = getattr(error_response, "headers", None)
+ if error_response and hasattr(error_response, "text"):
+ error_text = getattr(error_response, "text", error_text)
+ raise AnthropicError(
+ message=error_text,
+ status_code=status_code,
+ headers=error_headers,
+ )
+
+ return provider_config.transform_response(
+ model=model,
+ raw_response=response,
+ model_response=model_response,
+ logging_obj=logging_obj,
+ api_key=api_key,
+ request_data=data,
+ messages=messages,
+ optional_params=optional_params,
+ litellm_params=litellm_params,
+ encoding=encoding,
+ json_mode=json_mode,
+ )
+
+ def completion(
+ self,
+ model: str,
+ messages: list,
+ api_base: str,
+ custom_llm_provider: str,
+ custom_prompt_dict: dict,
+ model_response: ModelResponse,
+ print_verbose: Callable,
+ encoding,
+ api_key,
+ logging_obj,
+ optional_params: dict,
+ timeout: Union[float, httpx.Timeout],
+ litellm_params: dict,
+ acompletion=None,
+ logger_fn=None,
+ headers={},
+ client=None,
+ ):
+
+ optional_params = copy.deepcopy(optional_params)
+ stream = optional_params.pop("stream", None)
+ json_mode: bool = optional_params.pop("json_mode", False)
+ is_vertex_request: bool = optional_params.pop("is_vertex_request", False)
+ _is_function_call = False
+ messages = copy.deepcopy(messages)
+ headers = AnthropicConfig().validate_environment(
+ api_key=api_key,
+ headers=headers,
+ model=model,
+ messages=messages,
+ optional_params={**optional_params, "is_vertex_request": is_vertex_request},
+ )
+
+ config = ProviderConfigManager.get_provider_chat_config(
+ model=model,
+ provider=LlmProviders(custom_llm_provider),
+ )
+
+ data = config.transform_request(
+ model=model,
+ messages=messages,
+ optional_params=optional_params,
+ litellm_params=litellm_params,
+ headers=headers,
+ )
+
+ ## LOGGING
+ logging_obj.pre_call(
+ input=messages,
+ api_key=api_key,
+ additional_args={
+ "complete_input_dict": data,
+ "api_base": api_base,
+ "headers": headers,
+ },
+ )
+ print_verbose(f"_is_function_call: {_is_function_call}")
+ if acompletion is True:
+ if (
+ stream is True
+ ): # if function call - fake the streaming (need complete blocks for output parsing in openai format)
+ print_verbose("makes async anthropic streaming POST request")
+ data["stream"] = stream
+ return self.acompletion_stream_function(
+ model=model,
+ messages=messages,
+ data=data,
+ api_base=api_base,
+ custom_prompt_dict=custom_prompt_dict,
+ model_response=model_response,
+ print_verbose=print_verbose,
+ encoding=encoding,
+ api_key=api_key,
+ logging_obj=logging_obj,
+ optional_params=optional_params,
+ stream=stream,
+ _is_function_call=_is_function_call,
+ json_mode=json_mode,
+ litellm_params=litellm_params,
+ logger_fn=logger_fn,
+ headers=headers,
+ timeout=timeout,
+ client=(
+ client
+ if client is not None and isinstance(client, AsyncHTTPHandler)
+ else None
+ ),
+ )
+ else:
+ return self.acompletion_function(
+ model=model,
+ messages=messages,
+ data=data,
+ api_base=api_base,
+ custom_prompt_dict=custom_prompt_dict,
+ model_response=model_response,
+ print_verbose=print_verbose,
+ encoding=encoding,
+ api_key=api_key,
+ provider_config=config,
+ logging_obj=logging_obj,
+ optional_params=optional_params,
+ stream=stream,
+ _is_function_call=_is_function_call,
+ litellm_params=litellm_params,
+ logger_fn=logger_fn,
+ headers=headers,
+ client=client,
+ json_mode=json_mode,
+ timeout=timeout,
+ )
+ else:
+ ## COMPLETION CALL
+ if (
+ stream is True
+ ): # if function call - fake the streaming (need complete blocks for output parsing in openai format)
+ data["stream"] = stream
+ completion_stream, headers = make_sync_call(
+ client=client,
+ api_base=api_base,
+ headers=headers, # type: ignore
+ data=json.dumps(data),
+ model=model,
+ messages=messages,
+ logging_obj=logging_obj,
+ timeout=timeout,
+ json_mode=json_mode,
+ )
+ return CustomStreamWrapper(
+ completion_stream=completion_stream,
+ model=model,
+ custom_llm_provider="anthropic",
+ logging_obj=logging_obj,
+ _response_headers=process_anthropic_headers(headers),
+ )
+
+ else:
+ if client is None or not isinstance(client, HTTPHandler):
+ client = HTTPHandler(timeout=timeout) # type: ignore
+ else:
+ client = client
+
+ try:
+ response = client.post(
+ api_base,
+ headers=headers,
+ data=json.dumps(data),
+ timeout=timeout,
+ )
+ except Exception as e:
+ status_code = getattr(e, "status_code", 500)
+ error_headers = getattr(e, "headers", None)
+ error_text = getattr(e, "text", str(e))
+ error_response = getattr(e, "response", None)
+ if error_headers is None and error_response:
+ error_headers = getattr(error_response, "headers", None)
+ if error_response and hasattr(error_response, "text"):
+ error_text = getattr(error_response, "text", error_text)
+ raise AnthropicError(
+ message=error_text,
+ status_code=status_code,
+ headers=error_headers,
+ )
+
+ return config.transform_response(
+ model=model,
+ raw_response=response,
+ model_response=model_response,
+ logging_obj=logging_obj,
+ api_key=api_key,
+ request_data=data,
+ messages=messages,
+ optional_params=optional_params,
+ litellm_params=litellm_params,
+ encoding=encoding,
+ json_mode=json_mode,
+ )
+
+ def embedding(self):
+ # logic for parsing in - calling - parsing out model embedding calls
+ pass
+
+
+class ModelResponseIterator:
+ def __init__(
+ self, streaming_response, sync_stream: bool, json_mode: Optional[bool] = False
+ ):
+ self.streaming_response = streaming_response
+ self.response_iterator = self.streaming_response
+ self.content_blocks: List[ContentBlockDelta] = []
+ self.tool_index = -1
+ self.json_mode = json_mode
+
+ def check_empty_tool_call_args(self) -> bool:
+ """
+ Check if the tool call block so far has been an empty string
+ """
+ args = ""
+ # if text content block -> skip
+ if len(self.content_blocks) == 0:
+ return False
+
+ if (
+ self.content_blocks[0]["delta"]["type"] == "text_delta"
+ or self.content_blocks[0]["delta"]["type"] == "thinking_delta"
+ ):
+ return False
+
+ for block in self.content_blocks:
+ if block["delta"]["type"] == "input_json_delta":
+ args += block["delta"].get("partial_json", "") # type: ignore
+
+ if len(args) == 0:
+ return True
+ return False
+
+ def _handle_usage(
+ self, anthropic_usage_chunk: Union[dict, UsageDelta]
+ ) -> AnthropicChatCompletionUsageBlock:
+
+ usage_block = AnthropicChatCompletionUsageBlock(
+ prompt_tokens=anthropic_usage_chunk.get("input_tokens", 0),
+ completion_tokens=anthropic_usage_chunk.get("output_tokens", 0),
+ total_tokens=anthropic_usage_chunk.get("input_tokens", 0)
+ + anthropic_usage_chunk.get("output_tokens", 0),
+ )
+
+ cache_creation_input_tokens = anthropic_usage_chunk.get(
+ "cache_creation_input_tokens"
+ )
+ if cache_creation_input_tokens is not None and isinstance(
+ cache_creation_input_tokens, int
+ ):
+ usage_block["cache_creation_input_tokens"] = cache_creation_input_tokens
+
+ cache_read_input_tokens = anthropic_usage_chunk.get("cache_read_input_tokens")
+ if cache_read_input_tokens is not None and isinstance(
+ cache_read_input_tokens, int
+ ):
+ usage_block["cache_read_input_tokens"] = cache_read_input_tokens
+
+ return usage_block
+
+ def _content_block_delta_helper(self, chunk: dict) -> Tuple[
+ str,
+ Optional[ChatCompletionToolCallChunk],
+ List[ChatCompletionThinkingBlock],
+ Dict[str, Any],
+ ]:
+ """
+ Helper function to handle the content block delta
+ """
+
+ text = ""
+ tool_use: Optional[ChatCompletionToolCallChunk] = None
+ provider_specific_fields = {}
+ content_block = ContentBlockDelta(**chunk) # type: ignore
+ thinking_blocks: List[ChatCompletionThinkingBlock] = []
+
+ self.content_blocks.append(content_block)
+ if "text" in content_block["delta"]:
+ text = content_block["delta"]["text"]
+ elif "partial_json" in content_block["delta"]:
+ tool_use = {
+ "id": None,
+ "type": "function",
+ "function": {
+ "name": None,
+ "arguments": content_block["delta"]["partial_json"],
+ },
+ "index": self.tool_index,
+ }
+ elif "citation" in content_block["delta"]:
+ provider_specific_fields["citation"] = content_block["delta"]["citation"]
+ elif (
+ "thinking" in content_block["delta"]
+ or "signature" in content_block["delta"]
+ ):
+ thinking_blocks = [
+ ChatCompletionThinkingBlock(
+ type="thinking",
+ thinking=content_block["delta"].get("thinking") or "",
+ signature=content_block["delta"].get("signature"),
+ )
+ ]
+ provider_specific_fields["thinking_blocks"] = thinking_blocks
+ return text, tool_use, thinking_blocks, provider_specific_fields
+
+ def _handle_reasoning_content(
+ self, thinking_blocks: List[ChatCompletionThinkingBlock]
+ ) -> Optional[str]:
+ """
+ Handle the reasoning content
+ """
+ reasoning_content = None
+ for block in thinking_blocks:
+ if reasoning_content is None:
+ reasoning_content = ""
+ if "thinking" in block:
+ reasoning_content += block["thinking"]
+ return reasoning_content
+
+ def chunk_parser(self, chunk: dict) -> ModelResponseStream:
+ try:
+ type_chunk = chunk.get("type", "") or ""
+
+ text = ""
+ tool_use: Optional[ChatCompletionToolCallChunk] = None
+ finish_reason = ""
+ usage: Optional[ChatCompletionUsageBlock] = None
+ provider_specific_fields: Dict[str, Any] = {}
+ reasoning_content: Optional[str] = None
+ thinking_blocks: Optional[List[ChatCompletionThinkingBlock]] = None
+
+ index = int(chunk.get("index", 0))
+ if type_chunk == "content_block_delta":
+ """
+ Anthropic content chunk
+ chunk = {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Hello'}}
+ """
+ text, tool_use, thinking_blocks, provider_specific_fields = (
+ self._content_block_delta_helper(chunk=chunk)
+ )
+ if thinking_blocks:
+ reasoning_content = self._handle_reasoning_content(
+ thinking_blocks=thinking_blocks
+ )
+ elif type_chunk == "content_block_start":
+ """
+ event: content_block_start
+ data: {"type":"content_block_start","index":1,"content_block":{"type":"tool_use","id":"toolu_01T1x1fJ34qAmk2tNTrN7Up6","name":"get_weather","input":{}}}
+ """
+ content_block_start = ContentBlockStart(**chunk) # type: ignore
+ self.content_blocks = [] # reset content blocks when new block starts
+ if content_block_start["content_block"]["type"] == "text":
+ text = content_block_start["content_block"]["text"]
+ elif content_block_start["content_block"]["type"] == "tool_use":
+ self.tool_index += 1
+ tool_use = {
+ "id": content_block_start["content_block"]["id"],
+ "type": "function",
+ "function": {
+ "name": content_block_start["content_block"]["name"],
+ "arguments": "",
+ },
+ "index": self.tool_index,
+ }
+ elif type_chunk == "content_block_stop":
+
+ ContentBlockStop(**chunk) # type: ignore
+ # check if tool call content block
+ is_empty = self.check_empty_tool_call_args()
+
+ if is_empty:
+ tool_use = {
+ "id": None,
+ "type": "function",
+ "function": {
+ "name": None,
+ "arguments": "{}",
+ },
+ "index": self.tool_index,
+ }
+ elif type_chunk == "message_delta":
+ """
+ Anthropic
+ chunk = {'type': 'message_delta', 'delta': {'stop_reason': 'max_tokens', 'stop_sequence': None}, 'usage': {'output_tokens': 10}}
+ """
+ # TODO - get usage from this chunk, set in response
+ message_delta = MessageBlockDelta(**chunk) # type: ignore
+ finish_reason = map_finish_reason(
+ finish_reason=message_delta["delta"].get("stop_reason", "stop")
+ or "stop"
+ )
+ usage = self._handle_usage(anthropic_usage_chunk=message_delta["usage"])
+ elif type_chunk == "message_start":
+ """
+ Anthropic
+ chunk = {
+ "type": "message_start",
+ "message": {
+ "id": "msg_vrtx_011PqREFEMzd3REdCoUFAmdG",
+ "type": "message",
+ "role": "assistant",
+ "model": "claude-3-sonnet-20240229",
+ "content": [],
+ "stop_reason": null,
+ "stop_sequence": null,
+ "usage": {
+ "input_tokens": 270,
+ "output_tokens": 1
+ }
+ }
+ }
+ """
+ message_start_block = MessageStartBlock(**chunk) # type: ignore
+ if "usage" in message_start_block["message"]:
+ usage = self._handle_usage(
+ anthropic_usage_chunk=message_start_block["message"]["usage"]
+ )
+ elif type_chunk == "error":
+ """
+ {"type":"error","error":{"details":null,"type":"api_error","message":"Internal server error"} }
+ """
+ _error_dict = chunk.get("error", {}) or {}
+ message = _error_dict.get("message", None) or str(chunk)
+ raise AnthropicError(
+ message=message,
+ status_code=500, # it looks like Anthropic API does not return a status code in the chunk error - default to 500
+ )
+
+ text, tool_use = self._handle_json_mode_chunk(text=text, tool_use=tool_use)
+
+ returned_chunk = ModelResponseStream(
+ choices=[
+ StreamingChoices(
+ index=index,
+ delta=Delta(
+ content=text,
+ tool_calls=[tool_use] if tool_use is not None else None,
+ provider_specific_fields=(
+ provider_specific_fields
+ if provider_specific_fields
+ else None
+ ),
+ thinking_blocks=(
+ thinking_blocks if thinking_blocks else None
+ ),
+ reasoning_content=reasoning_content,
+ ),
+ finish_reason=finish_reason,
+ )
+ ],
+ usage=usage,
+ )
+
+ return returned_chunk
+
+ except json.JSONDecodeError:
+ raise ValueError(f"Failed to decode JSON from chunk: {chunk}")
+
+ def _handle_json_mode_chunk(
+ self, text: str, tool_use: Optional[ChatCompletionToolCallChunk]
+ ) -> Tuple[str, Optional[ChatCompletionToolCallChunk]]:
+ """
+ If JSON mode is enabled, convert the tool call to a message.
+
+ Anthropic returns the JSON schema as part of the tool call
+ OpenAI returns the JSON schema as part of the content, this handles placing it in the content
+
+ Args:
+ text: str
+ tool_use: Optional[ChatCompletionToolCallChunk]
+ Returns:
+ Tuple[str, Optional[ChatCompletionToolCallChunk]]
+
+ text: The text to use in the content
+ tool_use: The ChatCompletionToolCallChunk to use in the chunk response
+ """
+ if self.json_mode is True and tool_use is not None:
+ message = AnthropicConfig._convert_tool_response_to_message(
+ tool_calls=[tool_use]
+ )
+ if message is not None:
+ text = message.content or ""
+ tool_use = None
+
+ return text, tool_use
+
+ # Sync iterator
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ try:
+ chunk = self.response_iterator.__next__()
+ except StopIteration:
+ raise StopIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error receiving chunk from stream: {e}")
+
+ try:
+ str_line = chunk
+ if isinstance(chunk, bytes): # Handle binary data
+ str_line = chunk.decode("utf-8") # Convert bytes to string
+ index = str_line.find("data:")
+ if index != -1:
+ str_line = str_line[index:]
+
+ if str_line.startswith("data:"):
+ data_json = json.loads(str_line[5:])
+ return self.chunk_parser(chunk=data_json)
+ else:
+ return GenericStreamingChunk(
+ text="",
+ is_finished=False,
+ finish_reason="",
+ usage=None,
+ index=0,
+ tool_use=None,
+ )
+ except StopIteration:
+ raise StopIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
+
+ # Async iterator
+ def __aiter__(self):
+ self.async_response_iterator = self.streaming_response.__aiter__()
+ return self
+
+ async def __anext__(self):
+ try:
+ chunk = await self.async_response_iterator.__anext__()
+ except StopAsyncIteration:
+ raise StopAsyncIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error receiving chunk from stream: {e}")
+
+ try:
+ str_line = chunk
+ if isinstance(chunk, bytes): # Handle binary data
+ str_line = chunk.decode("utf-8") # Convert bytes to string
+ index = str_line.find("data:")
+ if index != -1:
+ str_line = str_line[index:]
+
+ if str_line.startswith("data:"):
+ data_json = json.loads(str_line[5:])
+ return self.chunk_parser(chunk=data_json)
+ else:
+ return GenericStreamingChunk(
+ text="",
+ is_finished=False,
+ finish_reason="",
+ usage=None,
+ index=0,
+ tool_use=None,
+ )
+ except StopAsyncIteration:
+ raise StopAsyncIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
+
+ def convert_str_chunk_to_generic_chunk(self, chunk: str) -> ModelResponseStream:
+ """
+ Convert a string chunk to a GenericStreamingChunk
+
+ Note: This is used for Anthropic pass through streaming logging
+
+ We can move __anext__, and __next__ to use this function since it's common logic.
+ Did not migrate them to minmize changes made in 1 PR.
+ """
+ str_line = chunk
+ if isinstance(chunk, bytes): # Handle binary data
+ str_line = chunk.decode("utf-8") # Convert bytes to string
+ index = str_line.find("data:")
+ if index != -1:
+ str_line = str_line[index:]
+
+ if str_line.startswith("data:"):
+ data_json = json.loads(str_line[5:])
+ return self.chunk_parser(chunk=data_json)
+ else:
+ return ModelResponseStream()