diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/llms/anthropic')
9 files changed, 2279 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/__init__.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/__init__.py new file mode 100644 index 00000000..ae84c3b1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/__init__.py @@ -0,0 +1 @@ +from .handler import AnthropicChatCompletion, ModelResponseIterator diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py new file mode 100644 index 00000000..f2c5f390 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py @@ -0,0 +1,839 @@ +""" +Calling + translation logic for anthropic's `/v1/messages` endpoint +""" + +import copy +import json +from typing import Any, Callable, Dict, List, Optional, Tuple, Union + +import httpx # type: ignore + +import litellm +import litellm.litellm_core_utils +import litellm.types +import litellm.types.utils +from litellm import LlmProviders +from litellm.litellm_core_utils.core_helpers import map_finish_reason +from litellm.llms.base_llm.chat.transformation import BaseConfig +from litellm.llms.custom_httpx.http_handler import ( + AsyncHTTPHandler, + HTTPHandler, + get_async_httpx_client, +) +from litellm.types.llms.anthropic import ( + AnthropicChatCompletionUsageBlock, + ContentBlockDelta, + ContentBlockStart, + ContentBlockStop, + MessageBlockDelta, + MessageStartBlock, + UsageDelta, +) +from litellm.types.llms.openai import ( + ChatCompletionThinkingBlock, + ChatCompletionToolCallChunk, + ChatCompletionUsageBlock, +) +from litellm.types.utils import ( + Delta, + GenericStreamingChunk, + ModelResponseStream, + StreamingChoices, +) +from litellm.utils import CustomStreamWrapper, ModelResponse, ProviderConfigManager + +from ...base import BaseLLM +from ..common_utils import AnthropicError, process_anthropic_headers +from .transformation import AnthropicConfig + + +async def make_call( + client: Optional[AsyncHTTPHandler], + api_base: str, + headers: dict, + data: str, + model: str, + messages: list, + logging_obj, + timeout: Optional[Union[float, httpx.Timeout]], + json_mode: bool, +) -> Tuple[Any, httpx.Headers]: + if client is None: + client = litellm.module_level_aclient + + try: + response = await client.post( + api_base, headers=headers, data=data, stream=True, timeout=timeout + ) + except httpx.HTTPStatusError as e: + error_headers = getattr(e, "headers", None) + error_response = getattr(e, "response", None) + if error_headers is None and error_response: + error_headers = getattr(error_response, "headers", None) + raise AnthropicError( + status_code=e.response.status_code, + message=await e.response.aread(), + headers=error_headers, + ) + except Exception as e: + for exception in litellm.LITELLM_EXCEPTION_TYPES: + if isinstance(e, exception): + raise e + raise AnthropicError(status_code=500, message=str(e)) + + completion_stream = ModelResponseIterator( + streaming_response=response.aiter_lines(), + sync_stream=False, + json_mode=json_mode, + ) + + # LOGGING + logging_obj.post_call( + input=messages, + api_key="", + original_response=completion_stream, # Pass the completion stream for logging + additional_args={"complete_input_dict": data}, + ) + + return completion_stream, response.headers + + +def make_sync_call( + client: Optional[HTTPHandler], + api_base: str, + headers: dict, + data: str, + model: str, + messages: list, + logging_obj, + timeout: Optional[Union[float, httpx.Timeout]], + json_mode: bool, +) -> Tuple[Any, httpx.Headers]: + if client is None: + client = litellm.module_level_client # re-use a module level client + + try: + response = client.post( + api_base, headers=headers, data=data, stream=True, timeout=timeout + ) + except httpx.HTTPStatusError as e: + error_headers = getattr(e, "headers", None) + error_response = getattr(e, "response", None) + if error_headers is None and error_response: + error_headers = getattr(error_response, "headers", None) + raise AnthropicError( + status_code=e.response.status_code, + message=e.response.read(), + headers=error_headers, + ) + except Exception as e: + for exception in litellm.LITELLM_EXCEPTION_TYPES: + if isinstance(e, exception): + raise e + raise AnthropicError(status_code=500, message=str(e)) + + if response.status_code != 200: + response_headers = getattr(response, "headers", None) + raise AnthropicError( + status_code=response.status_code, + message=response.read(), + headers=response_headers, + ) + + completion_stream = ModelResponseIterator( + streaming_response=response.iter_lines(), sync_stream=True, json_mode=json_mode + ) + + # LOGGING + logging_obj.post_call( + input=messages, + api_key="", + original_response="first stream response received", + additional_args={"complete_input_dict": data}, + ) + + return completion_stream, response.headers + + +class AnthropicChatCompletion(BaseLLM): + def __init__(self) -> None: + super().__init__() + + async def acompletion_stream_function( + self, + model: str, + messages: list, + api_base: str, + custom_prompt_dict: dict, + model_response: ModelResponse, + print_verbose: Callable, + timeout: Union[float, httpx.Timeout], + client: Optional[AsyncHTTPHandler], + encoding, + api_key, + logging_obj, + stream, + _is_function_call, + data: dict, + json_mode: bool, + optional_params=None, + litellm_params=None, + logger_fn=None, + headers={}, + ): + data["stream"] = True + + completion_stream, headers = await make_call( + client=client, + api_base=api_base, + headers=headers, + data=json.dumps(data), + model=model, + messages=messages, + logging_obj=logging_obj, + timeout=timeout, + json_mode=json_mode, + ) + streamwrapper = CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="anthropic", + logging_obj=logging_obj, + _response_headers=process_anthropic_headers(headers), + ) + return streamwrapper + + async def acompletion_function( + self, + model: str, + messages: list, + api_base: str, + custom_prompt_dict: dict, + model_response: ModelResponse, + print_verbose: Callable, + timeout: Union[float, httpx.Timeout], + encoding, + api_key, + logging_obj, + stream, + _is_function_call, + data: dict, + optional_params: dict, + json_mode: bool, + litellm_params: dict, + provider_config: BaseConfig, + logger_fn=None, + headers={}, + client: Optional[AsyncHTTPHandler] = None, + ) -> Union[ModelResponse, CustomStreamWrapper]: + async_handler = client or get_async_httpx_client( + llm_provider=litellm.LlmProviders.ANTHROPIC + ) + + try: + response = await async_handler.post( + api_base, headers=headers, json=data, timeout=timeout + ) + except Exception as e: + ## LOGGING + logging_obj.post_call( + input=messages, + api_key=api_key, + original_response=str(e), + additional_args={"complete_input_dict": data}, + ) + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + error_text = getattr(e, "text", str(e)) + error_response = getattr(e, "response", None) + if error_headers is None and error_response: + error_headers = getattr(error_response, "headers", None) + if error_response and hasattr(error_response, "text"): + error_text = getattr(error_response, "text", error_text) + raise AnthropicError( + message=error_text, + status_code=status_code, + headers=error_headers, + ) + + return provider_config.transform_response( + model=model, + raw_response=response, + model_response=model_response, + logging_obj=logging_obj, + api_key=api_key, + request_data=data, + messages=messages, + optional_params=optional_params, + litellm_params=litellm_params, + encoding=encoding, + json_mode=json_mode, + ) + + def completion( + self, + model: str, + messages: list, + api_base: str, + custom_llm_provider: str, + custom_prompt_dict: dict, + model_response: ModelResponse, + print_verbose: Callable, + encoding, + api_key, + logging_obj, + optional_params: dict, + timeout: Union[float, httpx.Timeout], + litellm_params: dict, + acompletion=None, + logger_fn=None, + headers={}, + client=None, + ): + + optional_params = copy.deepcopy(optional_params) + stream = optional_params.pop("stream", None) + json_mode: bool = optional_params.pop("json_mode", False) + is_vertex_request: bool = optional_params.pop("is_vertex_request", False) + _is_function_call = False + messages = copy.deepcopy(messages) + headers = AnthropicConfig().validate_environment( + api_key=api_key, + headers=headers, + model=model, + messages=messages, + optional_params={**optional_params, "is_vertex_request": is_vertex_request}, + ) + + config = ProviderConfigManager.get_provider_chat_config( + model=model, + provider=LlmProviders(custom_llm_provider), + ) + + data = config.transform_request( + model=model, + messages=messages, + optional_params=optional_params, + litellm_params=litellm_params, + headers=headers, + ) + + ## LOGGING + logging_obj.pre_call( + input=messages, + api_key=api_key, + additional_args={ + "complete_input_dict": data, + "api_base": api_base, + "headers": headers, + }, + ) + print_verbose(f"_is_function_call: {_is_function_call}") + if acompletion is True: + if ( + stream is True + ): # if function call - fake the streaming (need complete blocks for output parsing in openai format) + print_verbose("makes async anthropic streaming POST request") + data["stream"] = stream + return self.acompletion_stream_function( + model=model, + messages=messages, + data=data, + api_base=api_base, + custom_prompt_dict=custom_prompt_dict, + model_response=model_response, + print_verbose=print_verbose, + encoding=encoding, + api_key=api_key, + logging_obj=logging_obj, + optional_params=optional_params, + stream=stream, + _is_function_call=_is_function_call, + json_mode=json_mode, + litellm_params=litellm_params, + logger_fn=logger_fn, + headers=headers, + timeout=timeout, + client=( + client + if client is not None and isinstance(client, AsyncHTTPHandler) + else None + ), + ) + else: + return self.acompletion_function( + model=model, + messages=messages, + data=data, + api_base=api_base, + custom_prompt_dict=custom_prompt_dict, + model_response=model_response, + print_verbose=print_verbose, + encoding=encoding, + api_key=api_key, + provider_config=config, + logging_obj=logging_obj, + optional_params=optional_params, + stream=stream, + _is_function_call=_is_function_call, + litellm_params=litellm_params, + logger_fn=logger_fn, + headers=headers, + client=client, + json_mode=json_mode, + timeout=timeout, + ) + else: + ## COMPLETION CALL + if ( + stream is True + ): # if function call - fake the streaming (need complete blocks for output parsing in openai format) + data["stream"] = stream + completion_stream, headers = make_sync_call( + client=client, + api_base=api_base, + headers=headers, # type: ignore + data=json.dumps(data), + model=model, + messages=messages, + logging_obj=logging_obj, + timeout=timeout, + json_mode=json_mode, + ) + return CustomStreamWrapper( + completion_stream=completion_stream, + model=model, + custom_llm_provider="anthropic", + logging_obj=logging_obj, + _response_headers=process_anthropic_headers(headers), + ) + + else: + if client is None or not isinstance(client, HTTPHandler): + client = HTTPHandler(timeout=timeout) # type: ignore + else: + client = client + + try: + response = client.post( + api_base, + headers=headers, + data=json.dumps(data), + timeout=timeout, + ) + except Exception as e: + status_code = getattr(e, "status_code", 500) + error_headers = getattr(e, "headers", None) + error_text = getattr(e, "text", str(e)) + error_response = getattr(e, "response", None) + if error_headers is None and error_response: + error_headers = getattr(error_response, "headers", None) + if error_response and hasattr(error_response, "text"): + error_text = getattr(error_response, "text", error_text) + raise AnthropicError( + message=error_text, + status_code=status_code, + headers=error_headers, + ) + + return config.transform_response( + model=model, + raw_response=response, + model_response=model_response, + logging_obj=logging_obj, + api_key=api_key, + request_data=data, + messages=messages, + optional_params=optional_params, + litellm_params=litellm_params, + encoding=encoding, + json_mode=json_mode, + ) + + def embedding(self): + # logic for parsing in - calling - parsing out model embedding calls + pass + + +class ModelResponseIterator: + def __init__( + self, streaming_response, sync_stream: bool, json_mode: Optional[bool] = False + ): + self.streaming_response = streaming_response + self.response_iterator = self.streaming_response + self.content_blocks: List[ContentBlockDelta] = [] + self.tool_index = -1 + self.json_mode = json_mode + + def check_empty_tool_call_args(self) -> bool: + """ + Check if the tool call block so far has been an empty string + """ + args = "" + # if text content block -> skip + if len(self.content_blocks) == 0: + return False + + if ( + self.content_blocks[0]["delta"]["type"] == "text_delta" + or self.content_blocks[0]["delta"]["type"] == "thinking_delta" + ): + return False + + for block in self.content_blocks: + if block["delta"]["type"] == "input_json_delta": + args += block["delta"].get("partial_json", "") # type: ignore + + if len(args) == 0: + return True + return False + + def _handle_usage( + self, anthropic_usage_chunk: Union[dict, UsageDelta] + ) -> AnthropicChatCompletionUsageBlock: + + usage_block = AnthropicChatCompletionUsageBlock( + prompt_tokens=anthropic_usage_chunk.get("input_tokens", 0), + completion_tokens=anthropic_usage_chunk.get("output_tokens", 0), + total_tokens=anthropic_usage_chunk.get("input_tokens", 0) + + anthropic_usage_chunk.get("output_tokens", 0), + ) + + cache_creation_input_tokens = anthropic_usage_chunk.get( + "cache_creation_input_tokens" + ) + if cache_creation_input_tokens is not None and isinstance( + cache_creation_input_tokens, int + ): + usage_block["cache_creation_input_tokens"] = cache_creation_input_tokens + + cache_read_input_tokens = anthropic_usage_chunk.get("cache_read_input_tokens") + if cache_read_input_tokens is not None and isinstance( + cache_read_input_tokens, int + ): + usage_block["cache_read_input_tokens"] = cache_read_input_tokens + + return usage_block + + def _content_block_delta_helper(self, chunk: dict) -> Tuple[ + str, + Optional[ChatCompletionToolCallChunk], + List[ChatCompletionThinkingBlock], + Dict[str, Any], + ]: + """ + Helper function to handle the content block delta + """ + + text = "" + tool_use: Optional[ChatCompletionToolCallChunk] = None + provider_specific_fields = {} + content_block = ContentBlockDelta(**chunk) # type: ignore + thinking_blocks: List[ChatCompletionThinkingBlock] = [] + + self.content_blocks.append(content_block) + if "text" in content_block["delta"]: + text = content_block["delta"]["text"] + elif "partial_json" in content_block["delta"]: + tool_use = { + "id": None, + "type": "function", + "function": { + "name": None, + "arguments": content_block["delta"]["partial_json"], + }, + "index": self.tool_index, + } + elif "citation" in content_block["delta"]: + provider_specific_fields["citation"] = content_block["delta"]["citation"] + elif ( + "thinking" in content_block["delta"] + or "signature" in content_block["delta"] + ): + thinking_blocks = [ + ChatCompletionThinkingBlock( + type="thinking", + thinking=content_block["delta"].get("thinking") or "", + signature=content_block["delta"].get("signature"), + ) + ] + provider_specific_fields["thinking_blocks"] = thinking_blocks + return text, tool_use, thinking_blocks, provider_specific_fields + + def _handle_reasoning_content( + self, thinking_blocks: List[ChatCompletionThinkingBlock] + ) -> Optional[str]: + """ + Handle the reasoning content + """ + reasoning_content = None + for block in thinking_blocks: + if reasoning_content is None: + reasoning_content = "" + if "thinking" in block: + reasoning_content += block["thinking"] + return reasoning_content + + def chunk_parser(self, chunk: dict) -> ModelResponseStream: + try: + type_chunk = chunk.get("type", "") or "" + + text = "" + tool_use: Optional[ChatCompletionToolCallChunk] = None + finish_reason = "" + usage: Optional[ChatCompletionUsageBlock] = None + provider_specific_fields: Dict[str, Any] = {} + reasoning_content: Optional[str] = None + thinking_blocks: Optional[List[ChatCompletionThinkingBlock]] = None + + index = int(chunk.get("index", 0)) + if type_chunk == "content_block_delta": + """ + Anthropic content chunk + chunk = {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Hello'}} + """ + text, tool_use, thinking_blocks, provider_specific_fields = ( + self._content_block_delta_helper(chunk=chunk) + ) + if thinking_blocks: + reasoning_content = self._handle_reasoning_content( + thinking_blocks=thinking_blocks + ) + elif type_chunk == "content_block_start": + """ + event: content_block_start + data: {"type":"content_block_start","index":1,"content_block":{"type":"tool_use","id":"toolu_01T1x1fJ34qAmk2tNTrN7Up6","name":"get_weather","input":{}}} + """ + content_block_start = ContentBlockStart(**chunk) # type: ignore + self.content_blocks = [] # reset content blocks when new block starts + if content_block_start["content_block"]["type"] == "text": + text = content_block_start["content_block"]["text"] + elif content_block_start["content_block"]["type"] == "tool_use": + self.tool_index += 1 + tool_use = { + "id": content_block_start["content_block"]["id"], + "type": "function", + "function": { + "name": content_block_start["content_block"]["name"], + "arguments": "", + }, + "index": self.tool_index, + } + elif type_chunk == "content_block_stop": + + ContentBlockStop(**chunk) # type: ignore + # check if tool call content block + is_empty = self.check_empty_tool_call_args() + + if is_empty: + tool_use = { + "id": None, + "type": "function", + "function": { + "name": None, + "arguments": "{}", + }, + "index": self.tool_index, + } + elif type_chunk == "message_delta": + """ + Anthropic + chunk = {'type': 'message_delta', 'delta': {'stop_reason': 'max_tokens', 'stop_sequence': None}, 'usage': {'output_tokens': 10}} + """ + # TODO - get usage from this chunk, set in response + message_delta = MessageBlockDelta(**chunk) # type: ignore + finish_reason = map_finish_reason( + finish_reason=message_delta["delta"].get("stop_reason", "stop") + or "stop" + ) + usage = self._handle_usage(anthropic_usage_chunk=message_delta["usage"]) + elif type_chunk == "message_start": + """ + Anthropic + chunk = { + "type": "message_start", + "message": { + "id": "msg_vrtx_011PqREFEMzd3REdCoUFAmdG", + "type": "message", + "role": "assistant", + "model": "claude-3-sonnet-20240229", + "content": [], + "stop_reason": null, + "stop_sequence": null, + "usage": { + "input_tokens": 270, + "output_tokens": 1 + } + } + } + """ + message_start_block = MessageStartBlock(**chunk) # type: ignore + if "usage" in message_start_block["message"]: + usage = self._handle_usage( + anthropic_usage_chunk=message_start_block["message"]["usage"] + ) + elif type_chunk == "error": + """ + {"type":"error","error":{"details":null,"type":"api_error","message":"Internal server error"} } + """ + _error_dict = chunk.get("error", {}) or {} + message = _error_dict.get("message", None) or str(chunk) + raise AnthropicError( + message=message, + status_code=500, # it looks like Anthropic API does not return a status code in the chunk error - default to 500 + ) + + text, tool_use = self._handle_json_mode_chunk(text=text, tool_use=tool_use) + + returned_chunk = ModelResponseStream( + choices=[ + StreamingChoices( + index=index, + delta=Delta( + content=text, + tool_calls=[tool_use] if tool_use is not None else None, + provider_specific_fields=( + provider_specific_fields + if provider_specific_fields + else None + ), + thinking_blocks=( + thinking_blocks if thinking_blocks else None + ), + reasoning_content=reasoning_content, + ), + finish_reason=finish_reason, + ) + ], + usage=usage, + ) + + return returned_chunk + + except json.JSONDecodeError: + raise ValueError(f"Failed to decode JSON from chunk: {chunk}") + + def _handle_json_mode_chunk( + self, text: str, tool_use: Optional[ChatCompletionToolCallChunk] + ) -> Tuple[str, Optional[ChatCompletionToolCallChunk]]: + """ + If JSON mode is enabled, convert the tool call to a message. + + Anthropic returns the JSON schema as part of the tool call + OpenAI returns the JSON schema as part of the content, this handles placing it in the content + + Args: + text: str + tool_use: Optional[ChatCompletionToolCallChunk] + Returns: + Tuple[str, Optional[ChatCompletionToolCallChunk]] + + text: The text to use in the content + tool_use: The ChatCompletionToolCallChunk to use in the chunk response + """ + if self.json_mode is True and tool_use is not None: + message = AnthropicConfig._convert_tool_response_to_message( + tool_calls=[tool_use] + ) + if message is not None: + text = message.content or "" + tool_use = None + + return text, tool_use + + # Sync iterator + def __iter__(self): + return self + + def __next__(self): + try: + chunk = self.response_iterator.__next__() + except StopIteration: + raise StopIteration + except ValueError as e: + raise RuntimeError(f"Error receiving chunk from stream: {e}") + + try: + str_line = chunk + if isinstance(chunk, bytes): # Handle binary data + str_line = chunk.decode("utf-8") # Convert bytes to string + index = str_line.find("data:") + if index != -1: + str_line = str_line[index:] + + if str_line.startswith("data:"): + data_json = json.loads(str_line[5:]) + return self.chunk_parser(chunk=data_json) + else: + return GenericStreamingChunk( + text="", + is_finished=False, + finish_reason="", + usage=None, + index=0, + tool_use=None, + ) + except StopIteration: + raise StopIteration + except ValueError as e: + raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}") + + # Async iterator + def __aiter__(self): + self.async_response_iterator = self.streaming_response.__aiter__() + return self + + async def __anext__(self): + try: + chunk = await self.async_response_iterator.__anext__() + except StopAsyncIteration: + raise StopAsyncIteration + except ValueError as e: + raise RuntimeError(f"Error receiving chunk from stream: {e}") + + try: + str_line = chunk + if isinstance(chunk, bytes): # Handle binary data + str_line = chunk.decode("utf-8") # Convert bytes to string + index = str_line.find("data:") + if index != -1: + str_line = str_line[index:] + + if str_line.startswith("data:"): + data_json = json.loads(str_line[5:]) + return self.chunk_parser(chunk=data_json) + else: + return GenericStreamingChunk( + text="", + is_finished=False, + finish_reason="", + usage=None, + index=0, + tool_use=None, + ) + except StopAsyncIteration: + raise StopAsyncIteration + except ValueError as e: + raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}") + + def convert_str_chunk_to_generic_chunk(self, chunk: str) -> ModelResponseStream: + """ + Convert a string chunk to a GenericStreamingChunk + + Note: This is used for Anthropic pass through streaming logging + + We can move __anext__, and __next__ to use this function since it's common logic. + Did not migrate them to minmize changes made in 1 PR. + """ + str_line = chunk + if isinstance(chunk, bytes): # Handle binary data + str_line = chunk.decode("utf-8") # Convert bytes to string + index = str_line.find("data:") + if index != -1: + str_line = str_line[index:] + + if str_line.startswith("data:"): + data_json = json.loads(str_line[5:]) + return self.chunk_parser(chunk=data_json) + else: + return ModelResponseStream() diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/transformation.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/transformation.py new file mode 100644 index 00000000..1a77c453 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/transformation.py @@ -0,0 +1,831 @@ +import json +import time +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast + +import httpx + +import litellm +from litellm.constants import RESPONSE_FORMAT_TOOL_NAME +from litellm.litellm_core_utils.core_helpers import map_finish_reason +from litellm.litellm_core_utils.prompt_templates.factory import anthropic_messages_pt +from litellm.llms.base_llm.base_utils import type_to_response_format_param +from litellm.llms.base_llm.chat.transformation import BaseConfig, BaseLLMException +from litellm.types.llms.anthropic import ( + AllAnthropicToolsValues, + AnthropicComputerTool, + AnthropicHostedTools, + AnthropicInputSchema, + AnthropicMessagesTool, + AnthropicMessagesToolChoice, + AnthropicSystemMessageContent, +) +from litellm.types.llms.openai import ( + AllMessageValues, + ChatCompletionCachedContent, + ChatCompletionSystemMessage, + ChatCompletionThinkingBlock, + ChatCompletionToolCallChunk, + ChatCompletionToolCallFunctionChunk, + ChatCompletionToolParam, +) +from litellm.types.utils import Message as LitellmMessage +from litellm.types.utils import PromptTokensDetailsWrapper +from litellm.utils import ModelResponse, Usage, add_dummy_tool, has_tool_call_blocks + +from ..common_utils import AnthropicError, process_anthropic_headers + +if TYPE_CHECKING: + from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj + + LoggingClass = LiteLLMLoggingObj +else: + LoggingClass = Any + + +class AnthropicConfig(BaseConfig): + """ + Reference: https://docs.anthropic.com/claude/reference/messages_post + + to pass metadata to anthropic, it's {"user_id": "any-relevant-information"} + """ + + max_tokens: Optional[int] = ( + 4096 # anthropic requires a default value (Opus, Sonnet, and Haiku have the same default) + ) + stop_sequences: Optional[list] = None + temperature: Optional[int] = None + top_p: Optional[int] = None + top_k: Optional[int] = None + metadata: Optional[dict] = None + system: Optional[str] = None + + def __init__( + self, + max_tokens: Optional[ + int + ] = 4096, # You can pass in a value yourself or use the default value 4096 + stop_sequences: Optional[list] = None, + temperature: Optional[int] = None, + top_p: Optional[int] = None, + top_k: Optional[int] = None, + metadata: Optional[dict] = None, + system: Optional[str] = None, + ) -> None: + locals_ = locals().copy() + for key, value in locals_.items(): + if key != "self" and value is not None: + setattr(self.__class__, key, value) + + @classmethod + def get_config(cls): + return super().get_config() + + def get_supported_openai_params(self, model: str): + params = [ + "stream", + "stop", + "temperature", + "top_p", + "max_tokens", + "max_completion_tokens", + "tools", + "tool_choice", + "extra_headers", + "parallel_tool_calls", + "response_format", + "user", + ] + + if "claude-3-7-sonnet" in model: + params.append("thinking") + + return params + + def get_json_schema_from_pydantic_object( + self, response_format: Union[Any, Dict, None] + ) -> Optional[dict]: + + return type_to_response_format_param( + response_format, ref_template="/$defs/{model}" + ) # Relevant issue: https://github.com/BerriAI/litellm/issues/7755 + + def get_cache_control_headers(self) -> dict: + return { + "anthropic-version": "2023-06-01", + "anthropic-beta": "prompt-caching-2024-07-31", + } + + def get_anthropic_headers( + self, + api_key: str, + anthropic_version: Optional[str] = None, + computer_tool_used: bool = False, + prompt_caching_set: bool = False, + pdf_used: bool = False, + is_vertex_request: bool = False, + user_anthropic_beta_headers: Optional[List[str]] = None, + ) -> dict: + + betas = set() + if prompt_caching_set: + betas.add("prompt-caching-2024-07-31") + if computer_tool_used: + betas.add("computer-use-2024-10-22") + if pdf_used: + betas.add("pdfs-2024-09-25") + headers = { + "anthropic-version": anthropic_version or "2023-06-01", + "x-api-key": api_key, + "accept": "application/json", + "content-type": "application/json", + } + + if user_anthropic_beta_headers is not None: + betas.update(user_anthropic_beta_headers) + + # Don't send any beta headers to Vertex, Vertex has failed requests when they are sent + if is_vertex_request is True: + pass + elif len(betas) > 0: + headers["anthropic-beta"] = ",".join(betas) + + return headers + + def _map_tool_choice( + self, tool_choice: Optional[str], parallel_tool_use: Optional[bool] + ) -> Optional[AnthropicMessagesToolChoice]: + _tool_choice: Optional[AnthropicMessagesToolChoice] = None + if tool_choice == "auto": + _tool_choice = AnthropicMessagesToolChoice( + type="auto", + ) + elif tool_choice == "required": + _tool_choice = AnthropicMessagesToolChoice(type="any") + elif isinstance(tool_choice, dict): + _tool_name = tool_choice.get("function", {}).get("name") + _tool_choice = AnthropicMessagesToolChoice(type="tool") + if _tool_name is not None: + _tool_choice["name"] = _tool_name + + if parallel_tool_use is not None: + # Anthropic uses 'disable_parallel_tool_use' flag to determine if parallel tool use is allowed + # this is the inverse of the openai flag. + if _tool_choice is not None: + _tool_choice["disable_parallel_tool_use"] = not parallel_tool_use + else: # use anthropic defaults and make sure to send the disable_parallel_tool_use flag + _tool_choice = AnthropicMessagesToolChoice( + type="auto", + disable_parallel_tool_use=not parallel_tool_use, + ) + return _tool_choice + + def _map_tool_helper( + self, tool: ChatCompletionToolParam + ) -> AllAnthropicToolsValues: + returned_tool: Optional[AllAnthropicToolsValues] = None + + if tool["type"] == "function" or tool["type"] == "custom": + _input_schema: dict = tool["function"].get( + "parameters", + { + "type": "object", + "properties": {}, + }, + ) + input_schema: AnthropicInputSchema = AnthropicInputSchema(**_input_schema) + _tool = AnthropicMessagesTool( + name=tool["function"]["name"], + input_schema=input_schema, + ) + + _description = tool["function"].get("description") + if _description is not None: + _tool["description"] = _description + + returned_tool = _tool + + elif tool["type"].startswith("computer_"): + ## check if all required 'display_' params are given + if "parameters" not in tool["function"]: + raise ValueError("Missing required parameter: parameters") + + _display_width_px: Optional[int] = tool["function"]["parameters"].get( + "display_width_px" + ) + _display_height_px: Optional[int] = tool["function"]["parameters"].get( + "display_height_px" + ) + if _display_width_px is None or _display_height_px is None: + raise ValueError( + "Missing required parameter: display_width_px or display_height_px" + ) + + _computer_tool = AnthropicComputerTool( + type=tool["type"], + name=tool["function"].get("name", "computer"), + display_width_px=_display_width_px, + display_height_px=_display_height_px, + ) + + _display_number = tool["function"]["parameters"].get("display_number") + if _display_number is not None: + _computer_tool["display_number"] = _display_number + + returned_tool = _computer_tool + elif tool["type"].startswith("bash_") or tool["type"].startswith( + "text_editor_" + ): + function_name = tool["function"].get("name") + if function_name is None: + raise ValueError("Missing required parameter: name") + + returned_tool = AnthropicHostedTools( + type=tool["type"], + name=function_name, + ) + if returned_tool is None: + raise ValueError(f"Unsupported tool type: {tool['type']}") + + ## check if cache_control is set in the tool + _cache_control = tool.get("cache_control", None) + _cache_control_function = tool.get("function", {}).get("cache_control", None) + if _cache_control is not None: + returned_tool["cache_control"] = _cache_control + elif _cache_control_function is not None and isinstance( + _cache_control_function, dict + ): + returned_tool["cache_control"] = ChatCompletionCachedContent( + **_cache_control_function # type: ignore + ) + + return returned_tool + + def _map_tools(self, tools: List) -> List[AllAnthropicToolsValues]: + anthropic_tools = [] + for tool in tools: + if "input_schema" in tool: # assume in anthropic format + anthropic_tools.append(tool) + else: # assume openai tool call + new_tool = self._map_tool_helper(tool) + + anthropic_tools.append(new_tool) + return anthropic_tools + + def _map_stop_sequences( + self, stop: Optional[Union[str, List[str]]] + ) -> Optional[List[str]]: + new_stop: Optional[List[str]] = None + if isinstance(stop, str): + if ( + stop.isspace() and litellm.drop_params is True + ): # anthropic doesn't allow whitespace characters as stop-sequences + return new_stop + new_stop = [stop] + elif isinstance(stop, list): + new_v = [] + for v in stop: + if ( + v.isspace() and litellm.drop_params is True + ): # anthropic doesn't allow whitespace characters as stop-sequences + continue + new_v.append(v) + if len(new_v) > 0: + new_stop = new_v + return new_stop + + def map_openai_params( + self, + non_default_params: dict, + optional_params: dict, + model: str, + drop_params: bool, + ) -> dict: + for param, value in non_default_params.items(): + if param == "max_tokens": + optional_params["max_tokens"] = value + if param == "max_completion_tokens": + optional_params["max_tokens"] = value + if param == "tools": + # check if optional params already has tools + tool_value = self._map_tools(value) + optional_params = self._add_tools_to_optional_params( + optional_params=optional_params, tools=tool_value + ) + if param == "tool_choice" or param == "parallel_tool_calls": + _tool_choice: Optional[AnthropicMessagesToolChoice] = ( + self._map_tool_choice( + tool_choice=non_default_params.get("tool_choice"), + parallel_tool_use=non_default_params.get("parallel_tool_calls"), + ) + ) + + if _tool_choice is not None: + optional_params["tool_choice"] = _tool_choice + if param == "stream" and value is True: + optional_params["stream"] = value + if param == "stop" and (isinstance(value, str) or isinstance(value, list)): + _value = self._map_stop_sequences(value) + if _value is not None: + optional_params["stop_sequences"] = _value + if param == "temperature": + optional_params["temperature"] = value + if param == "top_p": + optional_params["top_p"] = value + if param == "response_format" and isinstance(value, dict): + + ignore_response_format_types = ["text"] + if value["type"] in ignore_response_format_types: # value is a no-op + continue + + json_schema: Optional[dict] = None + if "response_schema" in value: + json_schema = value["response_schema"] + elif "json_schema" in value: + json_schema = value["json_schema"]["schema"] + """ + When using tools in this way: - https://docs.anthropic.com/en/docs/build-with-claude/tool-use#json-mode + - You usually want to provide a single tool + - You should set tool_choice (see Forcing tool use) to instruct the model to explicitly use that tool + - Remember that the model will pass the input to the tool, so the name of the tool and description should be from the model’s perspective. + """ + + _tool_choice = {"name": RESPONSE_FORMAT_TOOL_NAME, "type": "tool"} + _tool = self._create_json_tool_call_for_response_format( + json_schema=json_schema, + ) + optional_params = self._add_tools_to_optional_params( + optional_params=optional_params, tools=[_tool] + ) + optional_params["tool_choice"] = _tool_choice + optional_params["json_mode"] = True + if param == "user": + optional_params["metadata"] = {"user_id": value} + if param == "thinking": + optional_params["thinking"] = value + return optional_params + + def _create_json_tool_call_for_response_format( + self, + json_schema: Optional[dict] = None, + ) -> AnthropicMessagesTool: + """ + Handles creating a tool call for getting responses in JSON format. + + Args: + json_schema (Optional[dict]): The JSON schema the response should be in + + Returns: + AnthropicMessagesTool: The tool call to send to Anthropic API to get responses in JSON format + """ + _input_schema: AnthropicInputSchema = AnthropicInputSchema( + type="object", + ) + + if json_schema is None: + # Anthropic raises a 400 BadRequest error if properties is passed as None + # see usage with additionalProperties (Example 5) https://github.com/anthropics/anthropic-cookbook/blob/main/tool_use/extracting_structured_json.ipynb + _input_schema["additionalProperties"] = True + _input_schema["properties"] = {} + else: + _input_schema.update(cast(AnthropicInputSchema, json_schema)) + + _tool = AnthropicMessagesTool( + name=RESPONSE_FORMAT_TOOL_NAME, input_schema=_input_schema + ) + return _tool + + def is_cache_control_set(self, messages: List[AllMessageValues]) -> bool: + """ + Return if {"cache_control": ..} in message content block + + Used to check if anthropic prompt caching headers need to be set. + """ + for message in messages: + if message.get("cache_control", None) is not None: + return True + _message_content = message.get("content") + if _message_content is not None and isinstance(_message_content, list): + for content in _message_content: + if "cache_control" in content: + return True + + return False + + def is_computer_tool_used( + self, tools: Optional[List[AllAnthropicToolsValues]] + ) -> bool: + if tools is None: + return False + for tool in tools: + if "type" in tool and tool["type"].startswith("computer_"): + return True + return False + + def is_pdf_used(self, messages: List[AllMessageValues]) -> bool: + """ + Set to true if media passed into messages. + + """ + for message in messages: + if ( + "content" in message + and message["content"] is not None + and isinstance(message["content"], list) + ): + for content in message["content"]: + if "type" in content and content["type"] != "text": + return True + return False + + def translate_system_message( + self, messages: List[AllMessageValues] + ) -> List[AnthropicSystemMessageContent]: + """ + Translate system message to anthropic format. + + Removes system message from the original list and returns a new list of anthropic system message content. + """ + system_prompt_indices = [] + anthropic_system_message_list: List[AnthropicSystemMessageContent] = [] + for idx, message in enumerate(messages): + if message["role"] == "system": + valid_content: bool = False + system_message_block = ChatCompletionSystemMessage(**message) + if isinstance(system_message_block["content"], str): + anthropic_system_message_content = AnthropicSystemMessageContent( + type="text", + text=system_message_block["content"], + ) + if "cache_control" in system_message_block: + anthropic_system_message_content["cache_control"] = ( + system_message_block["cache_control"] + ) + anthropic_system_message_list.append( + anthropic_system_message_content + ) + valid_content = True + elif isinstance(message["content"], list): + for _content in message["content"]: + anthropic_system_message_content = ( + AnthropicSystemMessageContent( + type=_content.get("type"), + text=_content.get("text"), + ) + ) + if "cache_control" in _content: + anthropic_system_message_content["cache_control"] = ( + _content["cache_control"] + ) + + anthropic_system_message_list.append( + anthropic_system_message_content + ) + valid_content = True + + if valid_content: + system_prompt_indices.append(idx) + if len(system_prompt_indices) > 0: + for idx in reversed(system_prompt_indices): + messages.pop(idx) + + return anthropic_system_message_list + + def transform_request( + self, + model: str, + messages: List[AllMessageValues], + optional_params: dict, + litellm_params: dict, + headers: dict, + ) -> dict: + """ + Translate messages to anthropic format. + """ + ## VALIDATE REQUEST + """ + Anthropic doesn't support tool calling without `tools=` param specified. + """ + if ( + "tools" not in optional_params + and messages is not None + and has_tool_call_blocks(messages) + ): + if litellm.modify_params: + optional_params["tools"] = self._map_tools( + add_dummy_tool(custom_llm_provider="anthropic") + ) + else: + raise litellm.UnsupportedParamsError( + message="Anthropic doesn't support tool calling without `tools=` param specified. Pass `tools=` param OR set `litellm.modify_params = True` // `litellm_settings::modify_params: True` to add dummy tool to the request.", + model="", + llm_provider="anthropic", + ) + + # Separate system prompt from rest of message + anthropic_system_message_list = self.translate_system_message(messages=messages) + # Handling anthropic API Prompt Caching + if len(anthropic_system_message_list) > 0: + optional_params["system"] = anthropic_system_message_list + # Format rest of message according to anthropic guidelines + try: + anthropic_messages = anthropic_messages_pt( + model=model, + messages=messages, + llm_provider="anthropic", + ) + except Exception as e: + raise AnthropicError( + status_code=400, + message="{}\nReceived Messages={}".format(str(e), messages), + ) # don't use verbose_logger.exception, if exception is raised + + ## Load Config + config = litellm.AnthropicConfig.get_config() + for k, v in config.items(): + if ( + k not in optional_params + ): # completion(top_k=3) > anthropic_config(top_k=3) <- allows for dynamic variables to be passed in + optional_params[k] = v + + ## Handle user_id in metadata + _litellm_metadata = litellm_params.get("metadata", None) + if ( + _litellm_metadata + and isinstance(_litellm_metadata, dict) + and "user_id" in _litellm_metadata + ): + optional_params["metadata"] = {"user_id": _litellm_metadata["user_id"]} + + data = { + "model": model, + "messages": anthropic_messages, + **optional_params, + } + + return data + + def _transform_response_for_json_mode( + self, + json_mode: Optional[bool], + tool_calls: List[ChatCompletionToolCallChunk], + ) -> Optional[LitellmMessage]: + _message: Optional[LitellmMessage] = None + if json_mode is True and len(tool_calls) == 1: + # check if tool name is the default tool name + json_mode_content_str: Optional[str] = None + if ( + "name" in tool_calls[0]["function"] + and tool_calls[0]["function"]["name"] == RESPONSE_FORMAT_TOOL_NAME + ): + json_mode_content_str = tool_calls[0]["function"].get("arguments") + if json_mode_content_str is not None: + _message = AnthropicConfig._convert_tool_response_to_message( + tool_calls=tool_calls, + ) + return _message + + def extract_response_content(self, completion_response: dict) -> Tuple[ + str, + Optional[List[Any]], + Optional[List[ChatCompletionThinkingBlock]], + Optional[str], + List[ChatCompletionToolCallChunk], + ]: + text_content = "" + citations: Optional[List[Any]] = None + thinking_blocks: Optional[List[ChatCompletionThinkingBlock]] = None + reasoning_content: Optional[str] = None + tool_calls: List[ChatCompletionToolCallChunk] = [] + for idx, content in enumerate(completion_response["content"]): + if content["type"] == "text": + text_content += content["text"] + ## TOOL CALLING + elif content["type"] == "tool_use": + tool_calls.append( + ChatCompletionToolCallChunk( + id=content["id"], + type="function", + function=ChatCompletionToolCallFunctionChunk( + name=content["name"], + arguments=json.dumps(content["input"]), + ), + index=idx, + ) + ) + ## CITATIONS + if content.get("citations", None) is not None: + if citations is None: + citations = [] + citations.append(content["citations"]) + if content.get("thinking", None) is not None: + if thinking_blocks is None: + thinking_blocks = [] + thinking_blocks.append(cast(ChatCompletionThinkingBlock, content)) + if thinking_blocks is not None: + reasoning_content = "" + for block in thinking_blocks: + if "thinking" in block: + reasoning_content += block["thinking"] + return text_content, citations, thinking_blocks, reasoning_content, tool_calls + + def transform_response( + self, + model: str, + raw_response: httpx.Response, + model_response: ModelResponse, + logging_obj: LoggingClass, + request_data: Dict, + messages: List[AllMessageValues], + optional_params: Dict, + litellm_params: dict, + encoding: Any, + api_key: Optional[str] = None, + json_mode: Optional[bool] = None, + ) -> ModelResponse: + _hidden_params: Dict = {} + _hidden_params["additional_headers"] = process_anthropic_headers( + dict(raw_response.headers) + ) + ## LOGGING + logging_obj.post_call( + input=messages, + api_key=api_key, + original_response=raw_response.text, + additional_args={"complete_input_dict": request_data}, + ) + + ## RESPONSE OBJECT + try: + completion_response = raw_response.json() + except Exception as e: + response_headers = getattr(raw_response, "headers", None) + raise AnthropicError( + message="Unable to get json response - {}, Original Response: {}".format( + str(e), raw_response.text + ), + status_code=raw_response.status_code, + headers=response_headers, + ) + if "error" in completion_response: + response_headers = getattr(raw_response, "headers", None) + raise AnthropicError( + message=str(completion_response["error"]), + status_code=raw_response.status_code, + headers=response_headers, + ) + else: + text_content = "" + citations: Optional[List[Any]] = None + thinking_blocks: Optional[List[ChatCompletionThinkingBlock]] = None + reasoning_content: Optional[str] = None + tool_calls: List[ChatCompletionToolCallChunk] = [] + + text_content, citations, thinking_blocks, reasoning_content, tool_calls = ( + self.extract_response_content(completion_response=completion_response) + ) + + _message = litellm.Message( + tool_calls=tool_calls, + content=text_content or None, + provider_specific_fields={ + "citations": citations, + "thinking_blocks": thinking_blocks, + }, + thinking_blocks=thinking_blocks, + reasoning_content=reasoning_content, + ) + + ## HANDLE JSON MODE - anthropic returns single function call + json_mode_message = self._transform_response_for_json_mode( + json_mode=json_mode, + tool_calls=tool_calls, + ) + if json_mode_message is not None: + completion_response["stop_reason"] = "stop" + _message = json_mode_message + + model_response.choices[0].message = _message # type: ignore + model_response._hidden_params["original_response"] = completion_response[ + "content" + ] # allow user to access raw anthropic tool calling response + + model_response.choices[0].finish_reason = map_finish_reason( + completion_response["stop_reason"] + ) + + ## CALCULATING USAGE + prompt_tokens = completion_response["usage"]["input_tokens"] + completion_tokens = completion_response["usage"]["output_tokens"] + _usage = completion_response["usage"] + cache_creation_input_tokens: int = 0 + cache_read_input_tokens: int = 0 + + model_response.created = int(time.time()) + model_response.model = completion_response["model"] + if "cache_creation_input_tokens" in _usage: + cache_creation_input_tokens = _usage["cache_creation_input_tokens"] + prompt_tokens += cache_creation_input_tokens + if "cache_read_input_tokens" in _usage: + cache_read_input_tokens = _usage["cache_read_input_tokens"] + prompt_tokens += cache_read_input_tokens + + prompt_tokens_details = PromptTokensDetailsWrapper( + cached_tokens=cache_read_input_tokens + ) + total_tokens = prompt_tokens + completion_tokens + usage = Usage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + prompt_tokens_details=prompt_tokens_details, + cache_creation_input_tokens=cache_creation_input_tokens, + cache_read_input_tokens=cache_read_input_tokens, + ) + + setattr(model_response, "usage", usage) # type: ignore + + model_response._hidden_params = _hidden_params + return model_response + + @staticmethod + def _convert_tool_response_to_message( + tool_calls: List[ChatCompletionToolCallChunk], + ) -> Optional[LitellmMessage]: + """ + In JSON mode, Anthropic API returns JSON schema as a tool call, we need to convert it to a message to follow the OpenAI format + + """ + ## HANDLE JSON MODE - anthropic returns single function call + json_mode_content_str: Optional[str] = tool_calls[0]["function"].get( + "arguments" + ) + try: + if json_mode_content_str is not None: + args = json.loads(json_mode_content_str) + if ( + isinstance(args, dict) + and (values := args.get("values")) is not None + ): + _message = litellm.Message(content=json.dumps(values)) + return _message + else: + # a lot of the times the `values` key is not present in the tool response + # relevant issue: https://github.com/BerriAI/litellm/issues/6741 + _message = litellm.Message(content=json.dumps(args)) + return _message + except json.JSONDecodeError: + # json decode error does occur, return the original tool response str + return litellm.Message(content=json_mode_content_str) + return None + + def get_error_class( + self, error_message: str, status_code: int, headers: Union[Dict, httpx.Headers] + ) -> BaseLLMException: + return AnthropicError( + status_code=status_code, + message=error_message, + headers=cast(httpx.Headers, headers), + ) + + def _get_user_anthropic_beta_headers( + self, anthropic_beta_header: Optional[str] + ) -> Optional[List[str]]: + if anthropic_beta_header is None: + return None + return anthropic_beta_header.split(",") + + def validate_environment( + self, + headers: dict, + model: str, + messages: List[AllMessageValues], + optional_params: dict, + api_key: Optional[str] = None, + api_base: Optional[str] = None, + ) -> Dict: + if api_key is None: + raise litellm.AuthenticationError( + message="Missing Anthropic API Key - A call is being made to anthropic but no key is set either in the environment variables or via params. Please set `ANTHROPIC_API_KEY` in your environment vars", + llm_provider="anthropic", + model=model, + ) + + tools = optional_params.get("tools") + prompt_caching_set = self.is_cache_control_set(messages=messages) + computer_tool_used = self.is_computer_tool_used(tools=tools) + pdf_used = self.is_pdf_used(messages=messages) + user_anthropic_beta_headers = self._get_user_anthropic_beta_headers( + anthropic_beta_header=headers.get("anthropic-beta") + ) + anthropic_headers = self.get_anthropic_headers( + computer_tool_used=computer_tool_used, + prompt_caching_set=prompt_caching_set, + pdf_used=pdf_used, + api_key=api_key, + is_vertex_request=optional_params.get("is_vertex_request", False), + user_anthropic_beta_headers=user_anthropic_beta_headers, + ) + + headers = {**headers, **anthropic_headers} + + return headers diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/common_utils.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/common_utils.py new file mode 100644 index 00000000..409bbe2d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/common_utils.py @@ -0,0 +1,46 @@ +""" +This file contains common utils for anthropic calls. +""" + +from typing import Optional, Union + +import httpx + +from litellm.llms.base_llm.chat.transformation import BaseLLMException + + +class AnthropicError(BaseLLMException): + def __init__( + self, + status_code: int, + message, + headers: Optional[httpx.Headers] = None, + ): + super().__init__(status_code=status_code, message=message, headers=headers) + + +def process_anthropic_headers(headers: Union[httpx.Headers, dict]) -> dict: + openai_headers = {} + if "anthropic-ratelimit-requests-limit" in headers: + openai_headers["x-ratelimit-limit-requests"] = headers[ + "anthropic-ratelimit-requests-limit" + ] + if "anthropic-ratelimit-requests-remaining" in headers: + openai_headers["x-ratelimit-remaining-requests"] = headers[ + "anthropic-ratelimit-requests-remaining" + ] + if "anthropic-ratelimit-tokens-limit" in headers: + openai_headers["x-ratelimit-limit-tokens"] = headers[ + "anthropic-ratelimit-tokens-limit" + ] + if "anthropic-ratelimit-tokens-remaining" in headers: + openai_headers["x-ratelimit-remaining-tokens"] = headers[ + "anthropic-ratelimit-tokens-remaining" + ] + + llm_response_headers = { + "{}-{}".format("llm_provider", k): v for k, v in headers.items() + } + + additional_headers = {**llm_response_headers, **openai_headers} + return additional_headers diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/handler.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/handler.py new file mode 100644 index 00000000..f1c8be7b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/handler.py @@ -0,0 +1,5 @@ +""" +Anthropic /complete API - uses `llm_http_handler.py` to make httpx requests + +Request/Response transformation is handled in `transformation.py` +""" diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/transformation.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/transformation.py new file mode 100644 index 00000000..7a260b6f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/transformation.py @@ -0,0 +1,306 @@ +""" +Translation logic for anthropic's `/v1/complete` endpoint + +Litellm provider slug: `anthropic_text/<model_name>` +""" + +import json +import time +from typing import AsyncIterator, Dict, Iterator, List, Optional, Union + +import httpx + +import litellm +from litellm.litellm_core_utils.prompt_templates.factory import ( + custom_prompt, + prompt_factory, +) +from litellm.llms.base_llm.base_model_iterator import BaseModelResponseIterator +from litellm.llms.base_llm.chat.transformation import ( + BaseConfig, + BaseLLMException, + LiteLLMLoggingObj, +) +from litellm.types.llms.openai import AllMessageValues +from litellm.types.utils import ( + ChatCompletionToolCallChunk, + ChatCompletionUsageBlock, + GenericStreamingChunk, + ModelResponse, + Usage, +) + + +class AnthropicTextError(BaseLLMException): + def __init__(self, status_code, message): + self.status_code = status_code + self.message = message + self.request = httpx.Request( + method="POST", url="https://api.anthropic.com/v1/complete" + ) + self.response = httpx.Response(status_code=status_code, request=self.request) + super().__init__( + message=self.message, + status_code=self.status_code, + request=self.request, + response=self.response, + ) # Call the base class constructor with the parameters it needs + + +class AnthropicTextConfig(BaseConfig): + """ + Reference: https://docs.anthropic.com/claude/reference/complete_post + + to pass metadata to anthropic, it's {"user_id": "any-relevant-information"} + """ + + max_tokens_to_sample: Optional[int] = ( + litellm.max_tokens + ) # anthropic requires a default + stop_sequences: Optional[list] = None + temperature: Optional[int] = None + top_p: Optional[int] = None + top_k: Optional[int] = None + metadata: Optional[dict] = None + + def __init__( + self, + max_tokens_to_sample: Optional[int] = 256, # anthropic requires a default + stop_sequences: Optional[list] = None, + temperature: Optional[int] = None, + top_p: Optional[int] = None, + top_k: Optional[int] = None, + metadata: Optional[dict] = None, + ) -> None: + locals_ = locals().copy() + for key, value in locals_.items(): + if key != "self" and value is not None: + setattr(self.__class__, key, value) + + # makes headers for API call + def validate_environment( + self, + headers: dict, + model: str, + messages: List[AllMessageValues], + optional_params: dict, + api_key: Optional[str] = None, + api_base: Optional[str] = None, + ) -> dict: + if api_key is None: + raise ValueError( + "Missing Anthropic API Key - A call is being made to anthropic but no key is set either in the environment variables or via params" + ) + _headers = { + "accept": "application/json", + "anthropic-version": "2023-06-01", + "content-type": "application/json", + "x-api-key": api_key, + } + headers.update(_headers) + return headers + + def transform_request( + self, + model: str, + messages: List[AllMessageValues], + optional_params: dict, + litellm_params: dict, + headers: dict, + ) -> dict: + prompt = self._get_anthropic_text_prompt_from_messages( + messages=messages, model=model + ) + ## Load Config + config = litellm.AnthropicTextConfig.get_config() + for k, v in config.items(): + if ( + k not in optional_params + ): # completion(top_k=3) > anthropic_config(top_k=3) <- allows for dynamic variables to be passed in + optional_params[k] = v + + data = { + "model": model, + "prompt": prompt, + **optional_params, + } + + return data + + def get_supported_openai_params(self, model: str): + """ + Anthropic /complete API Ref: https://docs.anthropic.com/en/api/complete + """ + return [ + "stream", + "max_tokens", + "max_completion_tokens", + "stop", + "temperature", + "top_p", + "extra_headers", + "user", + ] + + def map_openai_params( + self, + non_default_params: dict, + optional_params: dict, + model: str, + drop_params: bool, + ) -> dict: + """ + Follows the same logic as the AnthropicConfig.map_openai_params method (which is the Anthropic /messages API) + + Note: the only difference is in the get supported openai params method between the AnthropicConfig and AnthropicTextConfig + API Ref: https://docs.anthropic.com/en/api/complete + """ + for param, value in non_default_params.items(): + if param == "max_tokens": + optional_params["max_tokens_to_sample"] = value + if param == "max_completion_tokens": + optional_params["max_tokens_to_sample"] = value + if param == "stream" and value is True: + optional_params["stream"] = value + if param == "stop" and (isinstance(value, str) or isinstance(value, list)): + _value = litellm.AnthropicConfig()._map_stop_sequences(value) + if _value is not None: + optional_params["stop_sequences"] = _value + if param == "temperature": + optional_params["temperature"] = value + if param == "top_p": + optional_params["top_p"] = value + if param == "user": + optional_params["metadata"] = {"user_id": value} + + return optional_params + + def transform_response( + self, + model: str, + raw_response: httpx.Response, + model_response: ModelResponse, + logging_obj: LiteLLMLoggingObj, + request_data: dict, + messages: List[AllMessageValues], + optional_params: dict, + litellm_params: dict, + encoding: str, + api_key: Optional[str] = None, + json_mode: Optional[bool] = None, + ) -> ModelResponse: + try: + completion_response = raw_response.json() + except Exception: + raise AnthropicTextError( + message=raw_response.text, status_code=raw_response.status_code + ) + prompt = self._get_anthropic_text_prompt_from_messages( + messages=messages, model=model + ) + if "error" in completion_response: + raise AnthropicTextError( + message=str(completion_response["error"]), + status_code=raw_response.status_code, + ) + else: + if len(completion_response["completion"]) > 0: + model_response.choices[0].message.content = completion_response[ # type: ignore + "completion" + ] + model_response.choices[0].finish_reason = completion_response["stop_reason"] + + ## CALCULATING USAGE + prompt_tokens = len( + encoding.encode(prompt) + ) ##[TODO] use the anthropic tokenizer here + completion_tokens = len( + encoding.encode(model_response["choices"][0]["message"].get("content", "")) + ) ##[TODO] use the anthropic tokenizer here + + model_response.created = int(time.time()) + model_response.model = model + usage = Usage( + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=prompt_tokens + completion_tokens, + ) + + setattr(model_response, "usage", usage) + return model_response + + def get_error_class( + self, error_message: str, status_code: int, headers: Union[Dict, httpx.Headers] + ) -> BaseLLMException: + return AnthropicTextError( + status_code=status_code, + message=error_message, + ) + + @staticmethod + def _is_anthropic_text_model(model: str) -> bool: + return model == "claude-2" or model == "claude-instant-1" + + def _get_anthropic_text_prompt_from_messages( + self, messages: List[AllMessageValues], model: str + ) -> str: + custom_prompt_dict = litellm.custom_prompt_dict + if model in custom_prompt_dict: + # check if the model has a registered custom prompt + model_prompt_details = custom_prompt_dict[model] + prompt = custom_prompt( + role_dict=model_prompt_details["roles"], + initial_prompt_value=model_prompt_details["initial_prompt_value"], + final_prompt_value=model_prompt_details["final_prompt_value"], + messages=messages, + ) + else: + prompt = prompt_factory( + model=model, messages=messages, custom_llm_provider="anthropic" + ) + + return str(prompt) + + def get_model_response_iterator( + self, + streaming_response: Union[Iterator[str], AsyncIterator[str], ModelResponse], + sync_stream: bool, + json_mode: Optional[bool] = False, + ): + return AnthropicTextCompletionResponseIterator( + streaming_response=streaming_response, + sync_stream=sync_stream, + json_mode=json_mode, + ) + + +class AnthropicTextCompletionResponseIterator(BaseModelResponseIterator): + def chunk_parser(self, chunk: dict) -> GenericStreamingChunk: + try: + text = "" + tool_use: Optional[ChatCompletionToolCallChunk] = None + is_finished = False + finish_reason = "" + usage: Optional[ChatCompletionUsageBlock] = None + provider_specific_fields = None + index = int(chunk.get("index", 0)) + _chunk_text = chunk.get("completion", None) + if _chunk_text is not None and isinstance(_chunk_text, str): + text = _chunk_text + finish_reason = chunk.get("stop_reason", None) + if finish_reason is not None: + is_finished = True + returned_chunk = GenericStreamingChunk( + text=text, + tool_use=tool_use, + is_finished=is_finished, + finish_reason=finish_reason, + usage=usage, + index=index, + provider_specific_fields=provider_specific_fields, + ) + + return returned_chunk + + except json.JSONDecodeError: + raise ValueError(f"Failed to decode JSON from chunk: {chunk}") diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/cost_calculation.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/cost_calculation.py new file mode 100644 index 00000000..0dbe19ca --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/cost_calculation.py @@ -0,0 +1,25 @@ +""" +Helper util for handling anthropic-specific cost calculation +- e.g.: prompt caching +""" + +from typing import Tuple + +from litellm.litellm_core_utils.llm_cost_calc.utils import generic_cost_per_token +from litellm.types.utils import Usage + + +def cost_per_token(model: str, usage: Usage) -> Tuple[float, float]: + """ + Calculates the cost per token for a given model, prompt tokens, and completion tokens. + + Input: + - model: str, the model name without provider prefix + - usage: LiteLLM Usage block, containing anthropic caching information + + Returns: + Tuple[float, float] - prompt_cost_in_usd, completion_cost_in_usd + """ + return generic_cost_per_token( + model=model, usage=usage, custom_llm_provider="anthropic" + ) diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/handler.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/handler.py new file mode 100644 index 00000000..a7dfff74 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/handler.py @@ -0,0 +1,179 @@ +""" +- call /messages on Anthropic API +- Make streaming + non-streaming request - just pass it through direct to Anthropic. No need to do anything special here +- Ensure requests are logged in the DB - stream + non-stream + +""" + +import json +from typing import Any, AsyncIterator, Dict, Optional, Union, cast + +import httpx + +import litellm +from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj +from litellm.llms.base_llm.anthropic_messages.transformation import ( + BaseAnthropicMessagesConfig, +) +from litellm.llms.custom_httpx.http_handler import ( + AsyncHTTPHandler, + get_async_httpx_client, +) +from litellm.types.router import GenericLiteLLMParams +from litellm.types.utils import ProviderSpecificHeader +from litellm.utils import ProviderConfigManager, client + + +class AnthropicMessagesHandler: + + @staticmethod + async def _handle_anthropic_streaming( + response: httpx.Response, + request_body: dict, + litellm_logging_obj: LiteLLMLoggingObj, + ) -> AsyncIterator: + """Helper function to handle Anthropic streaming responses using the existing logging handlers""" + from datetime import datetime + + from litellm.proxy.pass_through_endpoints.streaming_handler import ( + PassThroughStreamingHandler, + ) + from litellm.proxy.pass_through_endpoints.success_handler import ( + PassThroughEndpointLogging, + ) + from litellm.proxy.pass_through_endpoints.types import EndpointType + + # Create success handler object + passthrough_success_handler_obj = PassThroughEndpointLogging() + + # Use the existing streaming handler for Anthropic + start_time = datetime.now() + return PassThroughStreamingHandler.chunk_processor( + response=response, + request_body=request_body, + litellm_logging_obj=litellm_logging_obj, + endpoint_type=EndpointType.ANTHROPIC, + start_time=start_time, + passthrough_success_handler_obj=passthrough_success_handler_obj, + url_route="/v1/messages", + ) + + +@client +async def anthropic_messages( + api_key: str, + model: str, + stream: bool = False, + api_base: Optional[str] = None, + client: Optional[AsyncHTTPHandler] = None, + custom_llm_provider: Optional[str] = None, + **kwargs, +) -> Union[Dict[str, Any], AsyncIterator]: + """ + Makes Anthropic `/v1/messages` API calls In the Anthropic API Spec + """ + # Use provided client or create a new one + optional_params = GenericLiteLLMParams(**kwargs) + model, _custom_llm_provider, dynamic_api_key, dynamic_api_base = ( + litellm.get_llm_provider( + model=model, + custom_llm_provider=custom_llm_provider, + api_base=optional_params.api_base, + api_key=optional_params.api_key, + ) + ) + anthropic_messages_provider_config: Optional[BaseAnthropicMessagesConfig] = ( + ProviderConfigManager.get_provider_anthropic_messages_config( + model=model, + provider=litellm.LlmProviders(_custom_llm_provider), + ) + ) + if anthropic_messages_provider_config is None: + raise ValueError( + f"Anthropic messages provider config not found for model: {model}" + ) + if client is None or not isinstance(client, AsyncHTTPHandler): + async_httpx_client = get_async_httpx_client( + llm_provider=litellm.LlmProviders.ANTHROPIC + ) + else: + async_httpx_client = client + + litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None) + + # Prepare headers + provider_specific_header = cast( + Optional[ProviderSpecificHeader], kwargs.get("provider_specific_header", None) + ) + extra_headers = ( + provider_specific_header.get("extra_headers", {}) + if provider_specific_header + else {} + ) + headers = anthropic_messages_provider_config.validate_environment( + headers=extra_headers or {}, + model=model, + api_key=api_key, + ) + + litellm_logging_obj.update_environment_variables( + model=model, + optional_params=dict(optional_params), + litellm_params={ + "metadata": kwargs.get("metadata", {}), + "preset_cache_key": None, + "stream_response": {}, + **optional_params.model_dump(exclude_unset=True), + }, + custom_llm_provider=_custom_llm_provider, + ) + litellm_logging_obj.model_call_details.update(kwargs) + + # Prepare request body + request_body = kwargs.copy() + request_body = { + k: v + for k, v in request_body.items() + if k + in anthropic_messages_provider_config.get_supported_anthropic_messages_params( + model=model + ) + } + request_body["stream"] = stream + request_body["model"] = model + litellm_logging_obj.stream = stream + + # Make the request + request_url = anthropic_messages_provider_config.get_complete_url( + api_base=api_base, model=model + ) + + litellm_logging_obj.pre_call( + input=[{"role": "user", "content": json.dumps(request_body)}], + api_key="", + additional_args={ + "complete_input_dict": request_body, + "api_base": str(request_url), + "headers": headers, + }, + ) + + response = await async_httpx_client.post( + url=request_url, + headers=headers, + data=json.dumps(request_body), + stream=stream, + ) + response.raise_for_status() + + # used for logging + cost tracking + litellm_logging_obj.model_call_details["httpx_response"] = response + + if stream: + return await AnthropicMessagesHandler._handle_anthropic_streaming( + response=response, + request_body=request_body, + litellm_logging_obj=litellm_logging_obj, + ) + else: + return response.json() diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/transformation.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/transformation.py new file mode 100644 index 00000000..e9b598f1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/transformation.py @@ -0,0 +1,47 @@ +from typing import Optional + +from litellm.llms.base_llm.anthropic_messages.transformation import ( + BaseAnthropicMessagesConfig, +) + +DEFAULT_ANTHROPIC_API_BASE = "https://api.anthropic.com" +DEFAULT_ANTHROPIC_API_VERSION = "2023-06-01" + + +class AnthropicMessagesConfig(BaseAnthropicMessagesConfig): + def get_supported_anthropic_messages_params(self, model: str) -> list: + return [ + "messages", + "model", + "system", + "max_tokens", + "stop_sequences", + "temperature", + "top_p", + "top_k", + "tools", + "tool_choice", + "thinking", + # TODO: Add Anthropic `metadata` support + # "metadata", + ] + + def get_complete_url(self, api_base: Optional[str], model: str) -> str: + api_base = api_base or DEFAULT_ANTHROPIC_API_BASE + if not api_base.endswith("/v1/messages"): + api_base = f"{api_base}/v1/messages" + return api_base + + def validate_environment( + self, + headers: dict, + model: str, + api_key: Optional[str] = None, + ) -> dict: + if "x-api-key" not in headers: + headers["x-api-key"] = api_key + if "anthropic-version" not in headers: + headers["anthropic-version"] = DEFAULT_ANTHROPIC_API_VERSION + if "content-type" not in headers: + headers["content-type"] = "application/json" + return headers |