aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/litellm/llms/anthropic
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/llms/anthropic')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/__init__.py1
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py839
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/transformation.py831
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/anthropic/common_utils.py46
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/handler.py5
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/transformation.py306
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/anthropic/cost_calculation.py25
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/handler.py179
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/transformation.py47
9 files changed, 2279 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/__init__.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/__init__.py
new file mode 100644
index 00000000..ae84c3b1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/__init__.py
@@ -0,0 +1 @@
+from .handler import AnthropicChatCompletion, ModelResponseIterator
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py
new file mode 100644
index 00000000..f2c5f390
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/handler.py
@@ -0,0 +1,839 @@
+"""
+Calling + translation logic for anthropic's `/v1/messages` endpoint
+"""
+
+import copy
+import json
+from typing import Any, Callable, Dict, List, Optional, Tuple, Union
+
+import httpx # type: ignore
+
+import litellm
+import litellm.litellm_core_utils
+import litellm.types
+import litellm.types.utils
+from litellm import LlmProviders
+from litellm.litellm_core_utils.core_helpers import map_finish_reason
+from litellm.llms.base_llm.chat.transformation import BaseConfig
+from litellm.llms.custom_httpx.http_handler import (
+ AsyncHTTPHandler,
+ HTTPHandler,
+ get_async_httpx_client,
+)
+from litellm.types.llms.anthropic import (
+ AnthropicChatCompletionUsageBlock,
+ ContentBlockDelta,
+ ContentBlockStart,
+ ContentBlockStop,
+ MessageBlockDelta,
+ MessageStartBlock,
+ UsageDelta,
+)
+from litellm.types.llms.openai import (
+ ChatCompletionThinkingBlock,
+ ChatCompletionToolCallChunk,
+ ChatCompletionUsageBlock,
+)
+from litellm.types.utils import (
+ Delta,
+ GenericStreamingChunk,
+ ModelResponseStream,
+ StreamingChoices,
+)
+from litellm.utils import CustomStreamWrapper, ModelResponse, ProviderConfigManager
+
+from ...base import BaseLLM
+from ..common_utils import AnthropicError, process_anthropic_headers
+from .transformation import AnthropicConfig
+
+
+async def make_call(
+ client: Optional[AsyncHTTPHandler],
+ api_base: str,
+ headers: dict,
+ data: str,
+ model: str,
+ messages: list,
+ logging_obj,
+ timeout: Optional[Union[float, httpx.Timeout]],
+ json_mode: bool,
+) -> Tuple[Any, httpx.Headers]:
+ if client is None:
+ client = litellm.module_level_aclient
+
+ try:
+ response = await client.post(
+ api_base, headers=headers, data=data, stream=True, timeout=timeout
+ )
+ except httpx.HTTPStatusError as e:
+ error_headers = getattr(e, "headers", None)
+ error_response = getattr(e, "response", None)
+ if error_headers is None and error_response:
+ error_headers = getattr(error_response, "headers", None)
+ raise AnthropicError(
+ status_code=e.response.status_code,
+ message=await e.response.aread(),
+ headers=error_headers,
+ )
+ except Exception as e:
+ for exception in litellm.LITELLM_EXCEPTION_TYPES:
+ if isinstance(e, exception):
+ raise e
+ raise AnthropicError(status_code=500, message=str(e))
+
+ completion_stream = ModelResponseIterator(
+ streaming_response=response.aiter_lines(),
+ sync_stream=False,
+ json_mode=json_mode,
+ )
+
+ # LOGGING
+ logging_obj.post_call(
+ input=messages,
+ api_key="",
+ original_response=completion_stream, # Pass the completion stream for logging
+ additional_args={"complete_input_dict": data},
+ )
+
+ return completion_stream, response.headers
+
+
+def make_sync_call(
+ client: Optional[HTTPHandler],
+ api_base: str,
+ headers: dict,
+ data: str,
+ model: str,
+ messages: list,
+ logging_obj,
+ timeout: Optional[Union[float, httpx.Timeout]],
+ json_mode: bool,
+) -> Tuple[Any, httpx.Headers]:
+ if client is None:
+ client = litellm.module_level_client # re-use a module level client
+
+ try:
+ response = client.post(
+ api_base, headers=headers, data=data, stream=True, timeout=timeout
+ )
+ except httpx.HTTPStatusError as e:
+ error_headers = getattr(e, "headers", None)
+ error_response = getattr(e, "response", None)
+ if error_headers is None and error_response:
+ error_headers = getattr(error_response, "headers", None)
+ raise AnthropicError(
+ status_code=e.response.status_code,
+ message=e.response.read(),
+ headers=error_headers,
+ )
+ except Exception as e:
+ for exception in litellm.LITELLM_EXCEPTION_TYPES:
+ if isinstance(e, exception):
+ raise e
+ raise AnthropicError(status_code=500, message=str(e))
+
+ if response.status_code != 200:
+ response_headers = getattr(response, "headers", None)
+ raise AnthropicError(
+ status_code=response.status_code,
+ message=response.read(),
+ headers=response_headers,
+ )
+
+ completion_stream = ModelResponseIterator(
+ streaming_response=response.iter_lines(), sync_stream=True, json_mode=json_mode
+ )
+
+ # LOGGING
+ logging_obj.post_call(
+ input=messages,
+ api_key="",
+ original_response="first stream response received",
+ additional_args={"complete_input_dict": data},
+ )
+
+ return completion_stream, response.headers
+
+
+class AnthropicChatCompletion(BaseLLM):
+ def __init__(self) -> None:
+ super().__init__()
+
+ async def acompletion_stream_function(
+ self,
+ model: str,
+ messages: list,
+ api_base: str,
+ custom_prompt_dict: dict,
+ model_response: ModelResponse,
+ print_verbose: Callable,
+ timeout: Union[float, httpx.Timeout],
+ client: Optional[AsyncHTTPHandler],
+ encoding,
+ api_key,
+ logging_obj,
+ stream,
+ _is_function_call,
+ data: dict,
+ json_mode: bool,
+ optional_params=None,
+ litellm_params=None,
+ logger_fn=None,
+ headers={},
+ ):
+ data["stream"] = True
+
+ completion_stream, headers = await make_call(
+ client=client,
+ api_base=api_base,
+ headers=headers,
+ data=json.dumps(data),
+ model=model,
+ messages=messages,
+ logging_obj=logging_obj,
+ timeout=timeout,
+ json_mode=json_mode,
+ )
+ streamwrapper = CustomStreamWrapper(
+ completion_stream=completion_stream,
+ model=model,
+ custom_llm_provider="anthropic",
+ logging_obj=logging_obj,
+ _response_headers=process_anthropic_headers(headers),
+ )
+ return streamwrapper
+
+ async def acompletion_function(
+ self,
+ model: str,
+ messages: list,
+ api_base: str,
+ custom_prompt_dict: dict,
+ model_response: ModelResponse,
+ print_verbose: Callable,
+ timeout: Union[float, httpx.Timeout],
+ encoding,
+ api_key,
+ logging_obj,
+ stream,
+ _is_function_call,
+ data: dict,
+ optional_params: dict,
+ json_mode: bool,
+ litellm_params: dict,
+ provider_config: BaseConfig,
+ logger_fn=None,
+ headers={},
+ client: Optional[AsyncHTTPHandler] = None,
+ ) -> Union[ModelResponse, CustomStreamWrapper]:
+ async_handler = client or get_async_httpx_client(
+ llm_provider=litellm.LlmProviders.ANTHROPIC
+ )
+
+ try:
+ response = await async_handler.post(
+ api_base, headers=headers, json=data, timeout=timeout
+ )
+ except Exception as e:
+ ## LOGGING
+ logging_obj.post_call(
+ input=messages,
+ api_key=api_key,
+ original_response=str(e),
+ additional_args={"complete_input_dict": data},
+ )
+ status_code = getattr(e, "status_code", 500)
+ error_headers = getattr(e, "headers", None)
+ error_text = getattr(e, "text", str(e))
+ error_response = getattr(e, "response", None)
+ if error_headers is None and error_response:
+ error_headers = getattr(error_response, "headers", None)
+ if error_response and hasattr(error_response, "text"):
+ error_text = getattr(error_response, "text", error_text)
+ raise AnthropicError(
+ message=error_text,
+ status_code=status_code,
+ headers=error_headers,
+ )
+
+ return provider_config.transform_response(
+ model=model,
+ raw_response=response,
+ model_response=model_response,
+ logging_obj=logging_obj,
+ api_key=api_key,
+ request_data=data,
+ messages=messages,
+ optional_params=optional_params,
+ litellm_params=litellm_params,
+ encoding=encoding,
+ json_mode=json_mode,
+ )
+
+ def completion(
+ self,
+ model: str,
+ messages: list,
+ api_base: str,
+ custom_llm_provider: str,
+ custom_prompt_dict: dict,
+ model_response: ModelResponse,
+ print_verbose: Callable,
+ encoding,
+ api_key,
+ logging_obj,
+ optional_params: dict,
+ timeout: Union[float, httpx.Timeout],
+ litellm_params: dict,
+ acompletion=None,
+ logger_fn=None,
+ headers={},
+ client=None,
+ ):
+
+ optional_params = copy.deepcopy(optional_params)
+ stream = optional_params.pop("stream", None)
+ json_mode: bool = optional_params.pop("json_mode", False)
+ is_vertex_request: bool = optional_params.pop("is_vertex_request", False)
+ _is_function_call = False
+ messages = copy.deepcopy(messages)
+ headers = AnthropicConfig().validate_environment(
+ api_key=api_key,
+ headers=headers,
+ model=model,
+ messages=messages,
+ optional_params={**optional_params, "is_vertex_request": is_vertex_request},
+ )
+
+ config = ProviderConfigManager.get_provider_chat_config(
+ model=model,
+ provider=LlmProviders(custom_llm_provider),
+ )
+
+ data = config.transform_request(
+ model=model,
+ messages=messages,
+ optional_params=optional_params,
+ litellm_params=litellm_params,
+ headers=headers,
+ )
+
+ ## LOGGING
+ logging_obj.pre_call(
+ input=messages,
+ api_key=api_key,
+ additional_args={
+ "complete_input_dict": data,
+ "api_base": api_base,
+ "headers": headers,
+ },
+ )
+ print_verbose(f"_is_function_call: {_is_function_call}")
+ if acompletion is True:
+ if (
+ stream is True
+ ): # if function call - fake the streaming (need complete blocks for output parsing in openai format)
+ print_verbose("makes async anthropic streaming POST request")
+ data["stream"] = stream
+ return self.acompletion_stream_function(
+ model=model,
+ messages=messages,
+ data=data,
+ api_base=api_base,
+ custom_prompt_dict=custom_prompt_dict,
+ model_response=model_response,
+ print_verbose=print_verbose,
+ encoding=encoding,
+ api_key=api_key,
+ logging_obj=logging_obj,
+ optional_params=optional_params,
+ stream=stream,
+ _is_function_call=_is_function_call,
+ json_mode=json_mode,
+ litellm_params=litellm_params,
+ logger_fn=logger_fn,
+ headers=headers,
+ timeout=timeout,
+ client=(
+ client
+ if client is not None and isinstance(client, AsyncHTTPHandler)
+ else None
+ ),
+ )
+ else:
+ return self.acompletion_function(
+ model=model,
+ messages=messages,
+ data=data,
+ api_base=api_base,
+ custom_prompt_dict=custom_prompt_dict,
+ model_response=model_response,
+ print_verbose=print_verbose,
+ encoding=encoding,
+ api_key=api_key,
+ provider_config=config,
+ logging_obj=logging_obj,
+ optional_params=optional_params,
+ stream=stream,
+ _is_function_call=_is_function_call,
+ litellm_params=litellm_params,
+ logger_fn=logger_fn,
+ headers=headers,
+ client=client,
+ json_mode=json_mode,
+ timeout=timeout,
+ )
+ else:
+ ## COMPLETION CALL
+ if (
+ stream is True
+ ): # if function call - fake the streaming (need complete blocks for output parsing in openai format)
+ data["stream"] = stream
+ completion_stream, headers = make_sync_call(
+ client=client,
+ api_base=api_base,
+ headers=headers, # type: ignore
+ data=json.dumps(data),
+ model=model,
+ messages=messages,
+ logging_obj=logging_obj,
+ timeout=timeout,
+ json_mode=json_mode,
+ )
+ return CustomStreamWrapper(
+ completion_stream=completion_stream,
+ model=model,
+ custom_llm_provider="anthropic",
+ logging_obj=logging_obj,
+ _response_headers=process_anthropic_headers(headers),
+ )
+
+ else:
+ if client is None or not isinstance(client, HTTPHandler):
+ client = HTTPHandler(timeout=timeout) # type: ignore
+ else:
+ client = client
+
+ try:
+ response = client.post(
+ api_base,
+ headers=headers,
+ data=json.dumps(data),
+ timeout=timeout,
+ )
+ except Exception as e:
+ status_code = getattr(e, "status_code", 500)
+ error_headers = getattr(e, "headers", None)
+ error_text = getattr(e, "text", str(e))
+ error_response = getattr(e, "response", None)
+ if error_headers is None and error_response:
+ error_headers = getattr(error_response, "headers", None)
+ if error_response and hasattr(error_response, "text"):
+ error_text = getattr(error_response, "text", error_text)
+ raise AnthropicError(
+ message=error_text,
+ status_code=status_code,
+ headers=error_headers,
+ )
+
+ return config.transform_response(
+ model=model,
+ raw_response=response,
+ model_response=model_response,
+ logging_obj=logging_obj,
+ api_key=api_key,
+ request_data=data,
+ messages=messages,
+ optional_params=optional_params,
+ litellm_params=litellm_params,
+ encoding=encoding,
+ json_mode=json_mode,
+ )
+
+ def embedding(self):
+ # logic for parsing in - calling - parsing out model embedding calls
+ pass
+
+
+class ModelResponseIterator:
+ def __init__(
+ self, streaming_response, sync_stream: bool, json_mode: Optional[bool] = False
+ ):
+ self.streaming_response = streaming_response
+ self.response_iterator = self.streaming_response
+ self.content_blocks: List[ContentBlockDelta] = []
+ self.tool_index = -1
+ self.json_mode = json_mode
+
+ def check_empty_tool_call_args(self) -> bool:
+ """
+ Check if the tool call block so far has been an empty string
+ """
+ args = ""
+ # if text content block -> skip
+ if len(self.content_blocks) == 0:
+ return False
+
+ if (
+ self.content_blocks[0]["delta"]["type"] == "text_delta"
+ or self.content_blocks[0]["delta"]["type"] == "thinking_delta"
+ ):
+ return False
+
+ for block in self.content_blocks:
+ if block["delta"]["type"] == "input_json_delta":
+ args += block["delta"].get("partial_json", "") # type: ignore
+
+ if len(args) == 0:
+ return True
+ return False
+
+ def _handle_usage(
+ self, anthropic_usage_chunk: Union[dict, UsageDelta]
+ ) -> AnthropicChatCompletionUsageBlock:
+
+ usage_block = AnthropicChatCompletionUsageBlock(
+ prompt_tokens=anthropic_usage_chunk.get("input_tokens", 0),
+ completion_tokens=anthropic_usage_chunk.get("output_tokens", 0),
+ total_tokens=anthropic_usage_chunk.get("input_tokens", 0)
+ + anthropic_usage_chunk.get("output_tokens", 0),
+ )
+
+ cache_creation_input_tokens = anthropic_usage_chunk.get(
+ "cache_creation_input_tokens"
+ )
+ if cache_creation_input_tokens is not None and isinstance(
+ cache_creation_input_tokens, int
+ ):
+ usage_block["cache_creation_input_tokens"] = cache_creation_input_tokens
+
+ cache_read_input_tokens = anthropic_usage_chunk.get("cache_read_input_tokens")
+ if cache_read_input_tokens is not None and isinstance(
+ cache_read_input_tokens, int
+ ):
+ usage_block["cache_read_input_tokens"] = cache_read_input_tokens
+
+ return usage_block
+
+ def _content_block_delta_helper(self, chunk: dict) -> Tuple[
+ str,
+ Optional[ChatCompletionToolCallChunk],
+ List[ChatCompletionThinkingBlock],
+ Dict[str, Any],
+ ]:
+ """
+ Helper function to handle the content block delta
+ """
+
+ text = ""
+ tool_use: Optional[ChatCompletionToolCallChunk] = None
+ provider_specific_fields = {}
+ content_block = ContentBlockDelta(**chunk) # type: ignore
+ thinking_blocks: List[ChatCompletionThinkingBlock] = []
+
+ self.content_blocks.append(content_block)
+ if "text" in content_block["delta"]:
+ text = content_block["delta"]["text"]
+ elif "partial_json" in content_block["delta"]:
+ tool_use = {
+ "id": None,
+ "type": "function",
+ "function": {
+ "name": None,
+ "arguments": content_block["delta"]["partial_json"],
+ },
+ "index": self.tool_index,
+ }
+ elif "citation" in content_block["delta"]:
+ provider_specific_fields["citation"] = content_block["delta"]["citation"]
+ elif (
+ "thinking" in content_block["delta"]
+ or "signature" in content_block["delta"]
+ ):
+ thinking_blocks = [
+ ChatCompletionThinkingBlock(
+ type="thinking",
+ thinking=content_block["delta"].get("thinking") or "",
+ signature=content_block["delta"].get("signature"),
+ )
+ ]
+ provider_specific_fields["thinking_blocks"] = thinking_blocks
+ return text, tool_use, thinking_blocks, provider_specific_fields
+
+ def _handle_reasoning_content(
+ self, thinking_blocks: List[ChatCompletionThinkingBlock]
+ ) -> Optional[str]:
+ """
+ Handle the reasoning content
+ """
+ reasoning_content = None
+ for block in thinking_blocks:
+ if reasoning_content is None:
+ reasoning_content = ""
+ if "thinking" in block:
+ reasoning_content += block["thinking"]
+ return reasoning_content
+
+ def chunk_parser(self, chunk: dict) -> ModelResponseStream:
+ try:
+ type_chunk = chunk.get("type", "") or ""
+
+ text = ""
+ tool_use: Optional[ChatCompletionToolCallChunk] = None
+ finish_reason = ""
+ usage: Optional[ChatCompletionUsageBlock] = None
+ provider_specific_fields: Dict[str, Any] = {}
+ reasoning_content: Optional[str] = None
+ thinking_blocks: Optional[List[ChatCompletionThinkingBlock]] = None
+
+ index = int(chunk.get("index", 0))
+ if type_chunk == "content_block_delta":
+ """
+ Anthropic content chunk
+ chunk = {'type': 'content_block_delta', 'index': 0, 'delta': {'type': 'text_delta', 'text': 'Hello'}}
+ """
+ text, tool_use, thinking_blocks, provider_specific_fields = (
+ self._content_block_delta_helper(chunk=chunk)
+ )
+ if thinking_blocks:
+ reasoning_content = self._handle_reasoning_content(
+ thinking_blocks=thinking_blocks
+ )
+ elif type_chunk == "content_block_start":
+ """
+ event: content_block_start
+ data: {"type":"content_block_start","index":1,"content_block":{"type":"tool_use","id":"toolu_01T1x1fJ34qAmk2tNTrN7Up6","name":"get_weather","input":{}}}
+ """
+ content_block_start = ContentBlockStart(**chunk) # type: ignore
+ self.content_blocks = [] # reset content blocks when new block starts
+ if content_block_start["content_block"]["type"] == "text":
+ text = content_block_start["content_block"]["text"]
+ elif content_block_start["content_block"]["type"] == "tool_use":
+ self.tool_index += 1
+ tool_use = {
+ "id": content_block_start["content_block"]["id"],
+ "type": "function",
+ "function": {
+ "name": content_block_start["content_block"]["name"],
+ "arguments": "",
+ },
+ "index": self.tool_index,
+ }
+ elif type_chunk == "content_block_stop":
+
+ ContentBlockStop(**chunk) # type: ignore
+ # check if tool call content block
+ is_empty = self.check_empty_tool_call_args()
+
+ if is_empty:
+ tool_use = {
+ "id": None,
+ "type": "function",
+ "function": {
+ "name": None,
+ "arguments": "{}",
+ },
+ "index": self.tool_index,
+ }
+ elif type_chunk == "message_delta":
+ """
+ Anthropic
+ chunk = {'type': 'message_delta', 'delta': {'stop_reason': 'max_tokens', 'stop_sequence': None}, 'usage': {'output_tokens': 10}}
+ """
+ # TODO - get usage from this chunk, set in response
+ message_delta = MessageBlockDelta(**chunk) # type: ignore
+ finish_reason = map_finish_reason(
+ finish_reason=message_delta["delta"].get("stop_reason", "stop")
+ or "stop"
+ )
+ usage = self._handle_usage(anthropic_usage_chunk=message_delta["usage"])
+ elif type_chunk == "message_start":
+ """
+ Anthropic
+ chunk = {
+ "type": "message_start",
+ "message": {
+ "id": "msg_vrtx_011PqREFEMzd3REdCoUFAmdG",
+ "type": "message",
+ "role": "assistant",
+ "model": "claude-3-sonnet-20240229",
+ "content": [],
+ "stop_reason": null,
+ "stop_sequence": null,
+ "usage": {
+ "input_tokens": 270,
+ "output_tokens": 1
+ }
+ }
+ }
+ """
+ message_start_block = MessageStartBlock(**chunk) # type: ignore
+ if "usage" in message_start_block["message"]:
+ usage = self._handle_usage(
+ anthropic_usage_chunk=message_start_block["message"]["usage"]
+ )
+ elif type_chunk == "error":
+ """
+ {"type":"error","error":{"details":null,"type":"api_error","message":"Internal server error"} }
+ """
+ _error_dict = chunk.get("error", {}) or {}
+ message = _error_dict.get("message", None) or str(chunk)
+ raise AnthropicError(
+ message=message,
+ status_code=500, # it looks like Anthropic API does not return a status code in the chunk error - default to 500
+ )
+
+ text, tool_use = self._handle_json_mode_chunk(text=text, tool_use=tool_use)
+
+ returned_chunk = ModelResponseStream(
+ choices=[
+ StreamingChoices(
+ index=index,
+ delta=Delta(
+ content=text,
+ tool_calls=[tool_use] if tool_use is not None else None,
+ provider_specific_fields=(
+ provider_specific_fields
+ if provider_specific_fields
+ else None
+ ),
+ thinking_blocks=(
+ thinking_blocks if thinking_blocks else None
+ ),
+ reasoning_content=reasoning_content,
+ ),
+ finish_reason=finish_reason,
+ )
+ ],
+ usage=usage,
+ )
+
+ return returned_chunk
+
+ except json.JSONDecodeError:
+ raise ValueError(f"Failed to decode JSON from chunk: {chunk}")
+
+ def _handle_json_mode_chunk(
+ self, text: str, tool_use: Optional[ChatCompletionToolCallChunk]
+ ) -> Tuple[str, Optional[ChatCompletionToolCallChunk]]:
+ """
+ If JSON mode is enabled, convert the tool call to a message.
+
+ Anthropic returns the JSON schema as part of the tool call
+ OpenAI returns the JSON schema as part of the content, this handles placing it in the content
+
+ Args:
+ text: str
+ tool_use: Optional[ChatCompletionToolCallChunk]
+ Returns:
+ Tuple[str, Optional[ChatCompletionToolCallChunk]]
+
+ text: The text to use in the content
+ tool_use: The ChatCompletionToolCallChunk to use in the chunk response
+ """
+ if self.json_mode is True and tool_use is not None:
+ message = AnthropicConfig._convert_tool_response_to_message(
+ tool_calls=[tool_use]
+ )
+ if message is not None:
+ text = message.content or ""
+ tool_use = None
+
+ return text, tool_use
+
+ # Sync iterator
+ def __iter__(self):
+ return self
+
+ def __next__(self):
+ try:
+ chunk = self.response_iterator.__next__()
+ except StopIteration:
+ raise StopIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error receiving chunk from stream: {e}")
+
+ try:
+ str_line = chunk
+ if isinstance(chunk, bytes): # Handle binary data
+ str_line = chunk.decode("utf-8") # Convert bytes to string
+ index = str_line.find("data:")
+ if index != -1:
+ str_line = str_line[index:]
+
+ if str_line.startswith("data:"):
+ data_json = json.loads(str_line[5:])
+ return self.chunk_parser(chunk=data_json)
+ else:
+ return GenericStreamingChunk(
+ text="",
+ is_finished=False,
+ finish_reason="",
+ usage=None,
+ index=0,
+ tool_use=None,
+ )
+ except StopIteration:
+ raise StopIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
+
+ # Async iterator
+ def __aiter__(self):
+ self.async_response_iterator = self.streaming_response.__aiter__()
+ return self
+
+ async def __anext__(self):
+ try:
+ chunk = await self.async_response_iterator.__anext__()
+ except StopAsyncIteration:
+ raise StopAsyncIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error receiving chunk from stream: {e}")
+
+ try:
+ str_line = chunk
+ if isinstance(chunk, bytes): # Handle binary data
+ str_line = chunk.decode("utf-8") # Convert bytes to string
+ index = str_line.find("data:")
+ if index != -1:
+ str_line = str_line[index:]
+
+ if str_line.startswith("data:"):
+ data_json = json.loads(str_line[5:])
+ return self.chunk_parser(chunk=data_json)
+ else:
+ return GenericStreamingChunk(
+ text="",
+ is_finished=False,
+ finish_reason="",
+ usage=None,
+ index=0,
+ tool_use=None,
+ )
+ except StopAsyncIteration:
+ raise StopAsyncIteration
+ except ValueError as e:
+ raise RuntimeError(f"Error parsing chunk: {e},\nReceived chunk: {chunk}")
+
+ def convert_str_chunk_to_generic_chunk(self, chunk: str) -> ModelResponseStream:
+ """
+ Convert a string chunk to a GenericStreamingChunk
+
+ Note: This is used for Anthropic pass through streaming logging
+
+ We can move __anext__, and __next__ to use this function since it's common logic.
+ Did not migrate them to minmize changes made in 1 PR.
+ """
+ str_line = chunk
+ if isinstance(chunk, bytes): # Handle binary data
+ str_line = chunk.decode("utf-8") # Convert bytes to string
+ index = str_line.find("data:")
+ if index != -1:
+ str_line = str_line[index:]
+
+ if str_line.startswith("data:"):
+ data_json = json.loads(str_line[5:])
+ return self.chunk_parser(chunk=data_json)
+ else:
+ return ModelResponseStream()
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/transformation.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/transformation.py
new file mode 100644
index 00000000..1a77c453
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/chat/transformation.py
@@ -0,0 +1,831 @@
+import json
+import time
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
+
+import httpx
+
+import litellm
+from litellm.constants import RESPONSE_FORMAT_TOOL_NAME
+from litellm.litellm_core_utils.core_helpers import map_finish_reason
+from litellm.litellm_core_utils.prompt_templates.factory import anthropic_messages_pt
+from litellm.llms.base_llm.base_utils import type_to_response_format_param
+from litellm.llms.base_llm.chat.transformation import BaseConfig, BaseLLMException
+from litellm.types.llms.anthropic import (
+ AllAnthropicToolsValues,
+ AnthropicComputerTool,
+ AnthropicHostedTools,
+ AnthropicInputSchema,
+ AnthropicMessagesTool,
+ AnthropicMessagesToolChoice,
+ AnthropicSystemMessageContent,
+)
+from litellm.types.llms.openai import (
+ AllMessageValues,
+ ChatCompletionCachedContent,
+ ChatCompletionSystemMessage,
+ ChatCompletionThinkingBlock,
+ ChatCompletionToolCallChunk,
+ ChatCompletionToolCallFunctionChunk,
+ ChatCompletionToolParam,
+)
+from litellm.types.utils import Message as LitellmMessage
+from litellm.types.utils import PromptTokensDetailsWrapper
+from litellm.utils import ModelResponse, Usage, add_dummy_tool, has_tool_call_blocks
+
+from ..common_utils import AnthropicError, process_anthropic_headers
+
+if TYPE_CHECKING:
+ from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
+
+ LoggingClass = LiteLLMLoggingObj
+else:
+ LoggingClass = Any
+
+
+class AnthropicConfig(BaseConfig):
+ """
+ Reference: https://docs.anthropic.com/claude/reference/messages_post
+
+ to pass metadata to anthropic, it's {"user_id": "any-relevant-information"}
+ """
+
+ max_tokens: Optional[int] = (
+ 4096 # anthropic requires a default value (Opus, Sonnet, and Haiku have the same default)
+ )
+ stop_sequences: Optional[list] = None
+ temperature: Optional[int] = None
+ top_p: Optional[int] = None
+ top_k: Optional[int] = None
+ metadata: Optional[dict] = None
+ system: Optional[str] = None
+
+ def __init__(
+ self,
+ max_tokens: Optional[
+ int
+ ] = 4096, # You can pass in a value yourself or use the default value 4096
+ stop_sequences: Optional[list] = None,
+ temperature: Optional[int] = None,
+ top_p: Optional[int] = None,
+ top_k: Optional[int] = None,
+ metadata: Optional[dict] = None,
+ system: Optional[str] = None,
+ ) -> None:
+ locals_ = locals().copy()
+ for key, value in locals_.items():
+ if key != "self" and value is not None:
+ setattr(self.__class__, key, value)
+
+ @classmethod
+ def get_config(cls):
+ return super().get_config()
+
+ def get_supported_openai_params(self, model: str):
+ params = [
+ "stream",
+ "stop",
+ "temperature",
+ "top_p",
+ "max_tokens",
+ "max_completion_tokens",
+ "tools",
+ "tool_choice",
+ "extra_headers",
+ "parallel_tool_calls",
+ "response_format",
+ "user",
+ ]
+
+ if "claude-3-7-sonnet" in model:
+ params.append("thinking")
+
+ return params
+
+ def get_json_schema_from_pydantic_object(
+ self, response_format: Union[Any, Dict, None]
+ ) -> Optional[dict]:
+
+ return type_to_response_format_param(
+ response_format, ref_template="/$defs/{model}"
+ ) # Relevant issue: https://github.com/BerriAI/litellm/issues/7755
+
+ def get_cache_control_headers(self) -> dict:
+ return {
+ "anthropic-version": "2023-06-01",
+ "anthropic-beta": "prompt-caching-2024-07-31",
+ }
+
+ def get_anthropic_headers(
+ self,
+ api_key: str,
+ anthropic_version: Optional[str] = None,
+ computer_tool_used: bool = False,
+ prompt_caching_set: bool = False,
+ pdf_used: bool = False,
+ is_vertex_request: bool = False,
+ user_anthropic_beta_headers: Optional[List[str]] = None,
+ ) -> dict:
+
+ betas = set()
+ if prompt_caching_set:
+ betas.add("prompt-caching-2024-07-31")
+ if computer_tool_used:
+ betas.add("computer-use-2024-10-22")
+ if pdf_used:
+ betas.add("pdfs-2024-09-25")
+ headers = {
+ "anthropic-version": anthropic_version or "2023-06-01",
+ "x-api-key": api_key,
+ "accept": "application/json",
+ "content-type": "application/json",
+ }
+
+ if user_anthropic_beta_headers is not None:
+ betas.update(user_anthropic_beta_headers)
+
+ # Don't send any beta headers to Vertex, Vertex has failed requests when they are sent
+ if is_vertex_request is True:
+ pass
+ elif len(betas) > 0:
+ headers["anthropic-beta"] = ",".join(betas)
+
+ return headers
+
+ def _map_tool_choice(
+ self, tool_choice: Optional[str], parallel_tool_use: Optional[bool]
+ ) -> Optional[AnthropicMessagesToolChoice]:
+ _tool_choice: Optional[AnthropicMessagesToolChoice] = None
+ if tool_choice == "auto":
+ _tool_choice = AnthropicMessagesToolChoice(
+ type="auto",
+ )
+ elif tool_choice == "required":
+ _tool_choice = AnthropicMessagesToolChoice(type="any")
+ elif isinstance(tool_choice, dict):
+ _tool_name = tool_choice.get("function", {}).get("name")
+ _tool_choice = AnthropicMessagesToolChoice(type="tool")
+ if _tool_name is not None:
+ _tool_choice["name"] = _tool_name
+
+ if parallel_tool_use is not None:
+ # Anthropic uses 'disable_parallel_tool_use' flag to determine if parallel tool use is allowed
+ # this is the inverse of the openai flag.
+ if _tool_choice is not None:
+ _tool_choice["disable_parallel_tool_use"] = not parallel_tool_use
+ else: # use anthropic defaults and make sure to send the disable_parallel_tool_use flag
+ _tool_choice = AnthropicMessagesToolChoice(
+ type="auto",
+ disable_parallel_tool_use=not parallel_tool_use,
+ )
+ return _tool_choice
+
+ def _map_tool_helper(
+ self, tool: ChatCompletionToolParam
+ ) -> AllAnthropicToolsValues:
+ returned_tool: Optional[AllAnthropicToolsValues] = None
+
+ if tool["type"] == "function" or tool["type"] == "custom":
+ _input_schema: dict = tool["function"].get(
+ "parameters",
+ {
+ "type": "object",
+ "properties": {},
+ },
+ )
+ input_schema: AnthropicInputSchema = AnthropicInputSchema(**_input_schema)
+ _tool = AnthropicMessagesTool(
+ name=tool["function"]["name"],
+ input_schema=input_schema,
+ )
+
+ _description = tool["function"].get("description")
+ if _description is not None:
+ _tool["description"] = _description
+
+ returned_tool = _tool
+
+ elif tool["type"].startswith("computer_"):
+ ## check if all required 'display_' params are given
+ if "parameters" not in tool["function"]:
+ raise ValueError("Missing required parameter: parameters")
+
+ _display_width_px: Optional[int] = tool["function"]["parameters"].get(
+ "display_width_px"
+ )
+ _display_height_px: Optional[int] = tool["function"]["parameters"].get(
+ "display_height_px"
+ )
+ if _display_width_px is None or _display_height_px is None:
+ raise ValueError(
+ "Missing required parameter: display_width_px or display_height_px"
+ )
+
+ _computer_tool = AnthropicComputerTool(
+ type=tool["type"],
+ name=tool["function"].get("name", "computer"),
+ display_width_px=_display_width_px,
+ display_height_px=_display_height_px,
+ )
+
+ _display_number = tool["function"]["parameters"].get("display_number")
+ if _display_number is not None:
+ _computer_tool["display_number"] = _display_number
+
+ returned_tool = _computer_tool
+ elif tool["type"].startswith("bash_") or tool["type"].startswith(
+ "text_editor_"
+ ):
+ function_name = tool["function"].get("name")
+ if function_name is None:
+ raise ValueError("Missing required parameter: name")
+
+ returned_tool = AnthropicHostedTools(
+ type=tool["type"],
+ name=function_name,
+ )
+ if returned_tool is None:
+ raise ValueError(f"Unsupported tool type: {tool['type']}")
+
+ ## check if cache_control is set in the tool
+ _cache_control = tool.get("cache_control", None)
+ _cache_control_function = tool.get("function", {}).get("cache_control", None)
+ if _cache_control is not None:
+ returned_tool["cache_control"] = _cache_control
+ elif _cache_control_function is not None and isinstance(
+ _cache_control_function, dict
+ ):
+ returned_tool["cache_control"] = ChatCompletionCachedContent(
+ **_cache_control_function # type: ignore
+ )
+
+ return returned_tool
+
+ def _map_tools(self, tools: List) -> List[AllAnthropicToolsValues]:
+ anthropic_tools = []
+ for tool in tools:
+ if "input_schema" in tool: # assume in anthropic format
+ anthropic_tools.append(tool)
+ else: # assume openai tool call
+ new_tool = self._map_tool_helper(tool)
+
+ anthropic_tools.append(new_tool)
+ return anthropic_tools
+
+ def _map_stop_sequences(
+ self, stop: Optional[Union[str, List[str]]]
+ ) -> Optional[List[str]]:
+ new_stop: Optional[List[str]] = None
+ if isinstance(stop, str):
+ if (
+ stop.isspace() and litellm.drop_params is True
+ ): # anthropic doesn't allow whitespace characters as stop-sequences
+ return new_stop
+ new_stop = [stop]
+ elif isinstance(stop, list):
+ new_v = []
+ for v in stop:
+ if (
+ v.isspace() and litellm.drop_params is True
+ ): # anthropic doesn't allow whitespace characters as stop-sequences
+ continue
+ new_v.append(v)
+ if len(new_v) > 0:
+ new_stop = new_v
+ return new_stop
+
+ def map_openai_params(
+ self,
+ non_default_params: dict,
+ optional_params: dict,
+ model: str,
+ drop_params: bool,
+ ) -> dict:
+ for param, value in non_default_params.items():
+ if param == "max_tokens":
+ optional_params["max_tokens"] = value
+ if param == "max_completion_tokens":
+ optional_params["max_tokens"] = value
+ if param == "tools":
+ # check if optional params already has tools
+ tool_value = self._map_tools(value)
+ optional_params = self._add_tools_to_optional_params(
+ optional_params=optional_params, tools=tool_value
+ )
+ if param == "tool_choice" or param == "parallel_tool_calls":
+ _tool_choice: Optional[AnthropicMessagesToolChoice] = (
+ self._map_tool_choice(
+ tool_choice=non_default_params.get("tool_choice"),
+ parallel_tool_use=non_default_params.get("parallel_tool_calls"),
+ )
+ )
+
+ if _tool_choice is not None:
+ optional_params["tool_choice"] = _tool_choice
+ if param == "stream" and value is True:
+ optional_params["stream"] = value
+ if param == "stop" and (isinstance(value, str) or isinstance(value, list)):
+ _value = self._map_stop_sequences(value)
+ if _value is not None:
+ optional_params["stop_sequences"] = _value
+ if param == "temperature":
+ optional_params["temperature"] = value
+ if param == "top_p":
+ optional_params["top_p"] = value
+ if param == "response_format" and isinstance(value, dict):
+
+ ignore_response_format_types = ["text"]
+ if value["type"] in ignore_response_format_types: # value is a no-op
+ continue
+
+ json_schema: Optional[dict] = None
+ if "response_schema" in value:
+ json_schema = value["response_schema"]
+ elif "json_schema" in value:
+ json_schema = value["json_schema"]["schema"]
+ """
+ When using tools in this way: - https://docs.anthropic.com/en/docs/build-with-claude/tool-use#json-mode
+ - You usually want to provide a single tool
+ - You should set tool_choice (see Forcing tool use) to instruct the model to explicitly use that tool
+ - Remember that the model will pass the input to the tool, so the name of the tool and description should be from the model’s perspective.
+ """
+
+ _tool_choice = {"name": RESPONSE_FORMAT_TOOL_NAME, "type": "tool"}
+ _tool = self._create_json_tool_call_for_response_format(
+ json_schema=json_schema,
+ )
+ optional_params = self._add_tools_to_optional_params(
+ optional_params=optional_params, tools=[_tool]
+ )
+ optional_params["tool_choice"] = _tool_choice
+ optional_params["json_mode"] = True
+ if param == "user":
+ optional_params["metadata"] = {"user_id": value}
+ if param == "thinking":
+ optional_params["thinking"] = value
+ return optional_params
+
+ def _create_json_tool_call_for_response_format(
+ self,
+ json_schema: Optional[dict] = None,
+ ) -> AnthropicMessagesTool:
+ """
+ Handles creating a tool call for getting responses in JSON format.
+
+ Args:
+ json_schema (Optional[dict]): The JSON schema the response should be in
+
+ Returns:
+ AnthropicMessagesTool: The tool call to send to Anthropic API to get responses in JSON format
+ """
+ _input_schema: AnthropicInputSchema = AnthropicInputSchema(
+ type="object",
+ )
+
+ if json_schema is None:
+ # Anthropic raises a 400 BadRequest error if properties is passed as None
+ # see usage with additionalProperties (Example 5) https://github.com/anthropics/anthropic-cookbook/blob/main/tool_use/extracting_structured_json.ipynb
+ _input_schema["additionalProperties"] = True
+ _input_schema["properties"] = {}
+ else:
+ _input_schema.update(cast(AnthropicInputSchema, json_schema))
+
+ _tool = AnthropicMessagesTool(
+ name=RESPONSE_FORMAT_TOOL_NAME, input_schema=_input_schema
+ )
+ return _tool
+
+ def is_cache_control_set(self, messages: List[AllMessageValues]) -> bool:
+ """
+ Return if {"cache_control": ..} in message content block
+
+ Used to check if anthropic prompt caching headers need to be set.
+ """
+ for message in messages:
+ if message.get("cache_control", None) is not None:
+ return True
+ _message_content = message.get("content")
+ if _message_content is not None and isinstance(_message_content, list):
+ for content in _message_content:
+ if "cache_control" in content:
+ return True
+
+ return False
+
+ def is_computer_tool_used(
+ self, tools: Optional[List[AllAnthropicToolsValues]]
+ ) -> bool:
+ if tools is None:
+ return False
+ for tool in tools:
+ if "type" in tool and tool["type"].startswith("computer_"):
+ return True
+ return False
+
+ def is_pdf_used(self, messages: List[AllMessageValues]) -> bool:
+ """
+ Set to true if media passed into messages.
+
+ """
+ for message in messages:
+ if (
+ "content" in message
+ and message["content"] is not None
+ and isinstance(message["content"], list)
+ ):
+ for content in message["content"]:
+ if "type" in content and content["type"] != "text":
+ return True
+ return False
+
+ def translate_system_message(
+ self, messages: List[AllMessageValues]
+ ) -> List[AnthropicSystemMessageContent]:
+ """
+ Translate system message to anthropic format.
+
+ Removes system message from the original list and returns a new list of anthropic system message content.
+ """
+ system_prompt_indices = []
+ anthropic_system_message_list: List[AnthropicSystemMessageContent] = []
+ for idx, message in enumerate(messages):
+ if message["role"] == "system":
+ valid_content: bool = False
+ system_message_block = ChatCompletionSystemMessage(**message)
+ if isinstance(system_message_block["content"], str):
+ anthropic_system_message_content = AnthropicSystemMessageContent(
+ type="text",
+ text=system_message_block["content"],
+ )
+ if "cache_control" in system_message_block:
+ anthropic_system_message_content["cache_control"] = (
+ system_message_block["cache_control"]
+ )
+ anthropic_system_message_list.append(
+ anthropic_system_message_content
+ )
+ valid_content = True
+ elif isinstance(message["content"], list):
+ for _content in message["content"]:
+ anthropic_system_message_content = (
+ AnthropicSystemMessageContent(
+ type=_content.get("type"),
+ text=_content.get("text"),
+ )
+ )
+ if "cache_control" in _content:
+ anthropic_system_message_content["cache_control"] = (
+ _content["cache_control"]
+ )
+
+ anthropic_system_message_list.append(
+ anthropic_system_message_content
+ )
+ valid_content = True
+
+ if valid_content:
+ system_prompt_indices.append(idx)
+ if len(system_prompt_indices) > 0:
+ for idx in reversed(system_prompt_indices):
+ messages.pop(idx)
+
+ return anthropic_system_message_list
+
+ def transform_request(
+ self,
+ model: str,
+ messages: List[AllMessageValues],
+ optional_params: dict,
+ litellm_params: dict,
+ headers: dict,
+ ) -> dict:
+ """
+ Translate messages to anthropic format.
+ """
+ ## VALIDATE REQUEST
+ """
+ Anthropic doesn't support tool calling without `tools=` param specified.
+ """
+ if (
+ "tools" not in optional_params
+ and messages is not None
+ and has_tool_call_blocks(messages)
+ ):
+ if litellm.modify_params:
+ optional_params["tools"] = self._map_tools(
+ add_dummy_tool(custom_llm_provider="anthropic")
+ )
+ else:
+ raise litellm.UnsupportedParamsError(
+ message="Anthropic doesn't support tool calling without `tools=` param specified. Pass `tools=` param OR set `litellm.modify_params = True` // `litellm_settings::modify_params: True` to add dummy tool to the request.",
+ model="",
+ llm_provider="anthropic",
+ )
+
+ # Separate system prompt from rest of message
+ anthropic_system_message_list = self.translate_system_message(messages=messages)
+ # Handling anthropic API Prompt Caching
+ if len(anthropic_system_message_list) > 0:
+ optional_params["system"] = anthropic_system_message_list
+ # Format rest of message according to anthropic guidelines
+ try:
+ anthropic_messages = anthropic_messages_pt(
+ model=model,
+ messages=messages,
+ llm_provider="anthropic",
+ )
+ except Exception as e:
+ raise AnthropicError(
+ status_code=400,
+ message="{}\nReceived Messages={}".format(str(e), messages),
+ ) # don't use verbose_logger.exception, if exception is raised
+
+ ## Load Config
+ config = litellm.AnthropicConfig.get_config()
+ for k, v in config.items():
+ if (
+ k not in optional_params
+ ): # completion(top_k=3) > anthropic_config(top_k=3) <- allows for dynamic variables to be passed in
+ optional_params[k] = v
+
+ ## Handle user_id in metadata
+ _litellm_metadata = litellm_params.get("metadata", None)
+ if (
+ _litellm_metadata
+ and isinstance(_litellm_metadata, dict)
+ and "user_id" in _litellm_metadata
+ ):
+ optional_params["metadata"] = {"user_id": _litellm_metadata["user_id"]}
+
+ data = {
+ "model": model,
+ "messages": anthropic_messages,
+ **optional_params,
+ }
+
+ return data
+
+ def _transform_response_for_json_mode(
+ self,
+ json_mode: Optional[bool],
+ tool_calls: List[ChatCompletionToolCallChunk],
+ ) -> Optional[LitellmMessage]:
+ _message: Optional[LitellmMessage] = None
+ if json_mode is True and len(tool_calls) == 1:
+ # check if tool name is the default tool name
+ json_mode_content_str: Optional[str] = None
+ if (
+ "name" in tool_calls[0]["function"]
+ and tool_calls[0]["function"]["name"] == RESPONSE_FORMAT_TOOL_NAME
+ ):
+ json_mode_content_str = tool_calls[0]["function"].get("arguments")
+ if json_mode_content_str is not None:
+ _message = AnthropicConfig._convert_tool_response_to_message(
+ tool_calls=tool_calls,
+ )
+ return _message
+
+ def extract_response_content(self, completion_response: dict) -> Tuple[
+ str,
+ Optional[List[Any]],
+ Optional[List[ChatCompletionThinkingBlock]],
+ Optional[str],
+ List[ChatCompletionToolCallChunk],
+ ]:
+ text_content = ""
+ citations: Optional[List[Any]] = None
+ thinking_blocks: Optional[List[ChatCompletionThinkingBlock]] = None
+ reasoning_content: Optional[str] = None
+ tool_calls: List[ChatCompletionToolCallChunk] = []
+ for idx, content in enumerate(completion_response["content"]):
+ if content["type"] == "text":
+ text_content += content["text"]
+ ## TOOL CALLING
+ elif content["type"] == "tool_use":
+ tool_calls.append(
+ ChatCompletionToolCallChunk(
+ id=content["id"],
+ type="function",
+ function=ChatCompletionToolCallFunctionChunk(
+ name=content["name"],
+ arguments=json.dumps(content["input"]),
+ ),
+ index=idx,
+ )
+ )
+ ## CITATIONS
+ if content.get("citations", None) is not None:
+ if citations is None:
+ citations = []
+ citations.append(content["citations"])
+ if content.get("thinking", None) is not None:
+ if thinking_blocks is None:
+ thinking_blocks = []
+ thinking_blocks.append(cast(ChatCompletionThinkingBlock, content))
+ if thinking_blocks is not None:
+ reasoning_content = ""
+ for block in thinking_blocks:
+ if "thinking" in block:
+ reasoning_content += block["thinking"]
+ return text_content, citations, thinking_blocks, reasoning_content, tool_calls
+
+ def transform_response(
+ self,
+ model: str,
+ raw_response: httpx.Response,
+ model_response: ModelResponse,
+ logging_obj: LoggingClass,
+ request_data: Dict,
+ messages: List[AllMessageValues],
+ optional_params: Dict,
+ litellm_params: dict,
+ encoding: Any,
+ api_key: Optional[str] = None,
+ json_mode: Optional[bool] = None,
+ ) -> ModelResponse:
+ _hidden_params: Dict = {}
+ _hidden_params["additional_headers"] = process_anthropic_headers(
+ dict(raw_response.headers)
+ )
+ ## LOGGING
+ logging_obj.post_call(
+ input=messages,
+ api_key=api_key,
+ original_response=raw_response.text,
+ additional_args={"complete_input_dict": request_data},
+ )
+
+ ## RESPONSE OBJECT
+ try:
+ completion_response = raw_response.json()
+ except Exception as e:
+ response_headers = getattr(raw_response, "headers", None)
+ raise AnthropicError(
+ message="Unable to get json response - {}, Original Response: {}".format(
+ str(e), raw_response.text
+ ),
+ status_code=raw_response.status_code,
+ headers=response_headers,
+ )
+ if "error" in completion_response:
+ response_headers = getattr(raw_response, "headers", None)
+ raise AnthropicError(
+ message=str(completion_response["error"]),
+ status_code=raw_response.status_code,
+ headers=response_headers,
+ )
+ else:
+ text_content = ""
+ citations: Optional[List[Any]] = None
+ thinking_blocks: Optional[List[ChatCompletionThinkingBlock]] = None
+ reasoning_content: Optional[str] = None
+ tool_calls: List[ChatCompletionToolCallChunk] = []
+
+ text_content, citations, thinking_blocks, reasoning_content, tool_calls = (
+ self.extract_response_content(completion_response=completion_response)
+ )
+
+ _message = litellm.Message(
+ tool_calls=tool_calls,
+ content=text_content or None,
+ provider_specific_fields={
+ "citations": citations,
+ "thinking_blocks": thinking_blocks,
+ },
+ thinking_blocks=thinking_blocks,
+ reasoning_content=reasoning_content,
+ )
+
+ ## HANDLE JSON MODE - anthropic returns single function call
+ json_mode_message = self._transform_response_for_json_mode(
+ json_mode=json_mode,
+ tool_calls=tool_calls,
+ )
+ if json_mode_message is not None:
+ completion_response["stop_reason"] = "stop"
+ _message = json_mode_message
+
+ model_response.choices[0].message = _message # type: ignore
+ model_response._hidden_params["original_response"] = completion_response[
+ "content"
+ ] # allow user to access raw anthropic tool calling response
+
+ model_response.choices[0].finish_reason = map_finish_reason(
+ completion_response["stop_reason"]
+ )
+
+ ## CALCULATING USAGE
+ prompt_tokens = completion_response["usage"]["input_tokens"]
+ completion_tokens = completion_response["usage"]["output_tokens"]
+ _usage = completion_response["usage"]
+ cache_creation_input_tokens: int = 0
+ cache_read_input_tokens: int = 0
+
+ model_response.created = int(time.time())
+ model_response.model = completion_response["model"]
+ if "cache_creation_input_tokens" in _usage:
+ cache_creation_input_tokens = _usage["cache_creation_input_tokens"]
+ prompt_tokens += cache_creation_input_tokens
+ if "cache_read_input_tokens" in _usage:
+ cache_read_input_tokens = _usage["cache_read_input_tokens"]
+ prompt_tokens += cache_read_input_tokens
+
+ prompt_tokens_details = PromptTokensDetailsWrapper(
+ cached_tokens=cache_read_input_tokens
+ )
+ total_tokens = prompt_tokens + completion_tokens
+ usage = Usage(
+ prompt_tokens=prompt_tokens,
+ completion_tokens=completion_tokens,
+ total_tokens=total_tokens,
+ prompt_tokens_details=prompt_tokens_details,
+ cache_creation_input_tokens=cache_creation_input_tokens,
+ cache_read_input_tokens=cache_read_input_tokens,
+ )
+
+ setattr(model_response, "usage", usage) # type: ignore
+
+ model_response._hidden_params = _hidden_params
+ return model_response
+
+ @staticmethod
+ def _convert_tool_response_to_message(
+ tool_calls: List[ChatCompletionToolCallChunk],
+ ) -> Optional[LitellmMessage]:
+ """
+ In JSON mode, Anthropic API returns JSON schema as a tool call, we need to convert it to a message to follow the OpenAI format
+
+ """
+ ## HANDLE JSON MODE - anthropic returns single function call
+ json_mode_content_str: Optional[str] = tool_calls[0]["function"].get(
+ "arguments"
+ )
+ try:
+ if json_mode_content_str is not None:
+ args = json.loads(json_mode_content_str)
+ if (
+ isinstance(args, dict)
+ and (values := args.get("values")) is not None
+ ):
+ _message = litellm.Message(content=json.dumps(values))
+ return _message
+ else:
+ # a lot of the times the `values` key is not present in the tool response
+ # relevant issue: https://github.com/BerriAI/litellm/issues/6741
+ _message = litellm.Message(content=json.dumps(args))
+ return _message
+ except json.JSONDecodeError:
+ # json decode error does occur, return the original tool response str
+ return litellm.Message(content=json_mode_content_str)
+ return None
+
+ def get_error_class(
+ self, error_message: str, status_code: int, headers: Union[Dict, httpx.Headers]
+ ) -> BaseLLMException:
+ return AnthropicError(
+ status_code=status_code,
+ message=error_message,
+ headers=cast(httpx.Headers, headers),
+ )
+
+ def _get_user_anthropic_beta_headers(
+ self, anthropic_beta_header: Optional[str]
+ ) -> Optional[List[str]]:
+ if anthropic_beta_header is None:
+ return None
+ return anthropic_beta_header.split(",")
+
+ def validate_environment(
+ self,
+ headers: dict,
+ model: str,
+ messages: List[AllMessageValues],
+ optional_params: dict,
+ api_key: Optional[str] = None,
+ api_base: Optional[str] = None,
+ ) -> Dict:
+ if api_key is None:
+ raise litellm.AuthenticationError(
+ message="Missing Anthropic API Key - A call is being made to anthropic but no key is set either in the environment variables or via params. Please set `ANTHROPIC_API_KEY` in your environment vars",
+ llm_provider="anthropic",
+ model=model,
+ )
+
+ tools = optional_params.get("tools")
+ prompt_caching_set = self.is_cache_control_set(messages=messages)
+ computer_tool_used = self.is_computer_tool_used(tools=tools)
+ pdf_used = self.is_pdf_used(messages=messages)
+ user_anthropic_beta_headers = self._get_user_anthropic_beta_headers(
+ anthropic_beta_header=headers.get("anthropic-beta")
+ )
+ anthropic_headers = self.get_anthropic_headers(
+ computer_tool_used=computer_tool_used,
+ prompt_caching_set=prompt_caching_set,
+ pdf_used=pdf_used,
+ api_key=api_key,
+ is_vertex_request=optional_params.get("is_vertex_request", False),
+ user_anthropic_beta_headers=user_anthropic_beta_headers,
+ )
+
+ headers = {**headers, **anthropic_headers}
+
+ return headers
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/common_utils.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/common_utils.py
new file mode 100644
index 00000000..409bbe2d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/common_utils.py
@@ -0,0 +1,46 @@
+"""
+This file contains common utils for anthropic calls.
+"""
+
+from typing import Optional, Union
+
+import httpx
+
+from litellm.llms.base_llm.chat.transformation import BaseLLMException
+
+
+class AnthropicError(BaseLLMException):
+ def __init__(
+ self,
+ status_code: int,
+ message,
+ headers: Optional[httpx.Headers] = None,
+ ):
+ super().__init__(status_code=status_code, message=message, headers=headers)
+
+
+def process_anthropic_headers(headers: Union[httpx.Headers, dict]) -> dict:
+ openai_headers = {}
+ if "anthropic-ratelimit-requests-limit" in headers:
+ openai_headers["x-ratelimit-limit-requests"] = headers[
+ "anthropic-ratelimit-requests-limit"
+ ]
+ if "anthropic-ratelimit-requests-remaining" in headers:
+ openai_headers["x-ratelimit-remaining-requests"] = headers[
+ "anthropic-ratelimit-requests-remaining"
+ ]
+ if "anthropic-ratelimit-tokens-limit" in headers:
+ openai_headers["x-ratelimit-limit-tokens"] = headers[
+ "anthropic-ratelimit-tokens-limit"
+ ]
+ if "anthropic-ratelimit-tokens-remaining" in headers:
+ openai_headers["x-ratelimit-remaining-tokens"] = headers[
+ "anthropic-ratelimit-tokens-remaining"
+ ]
+
+ llm_response_headers = {
+ "{}-{}".format("llm_provider", k): v for k, v in headers.items()
+ }
+
+ additional_headers = {**llm_response_headers, **openai_headers}
+ return additional_headers
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/handler.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/handler.py
new file mode 100644
index 00000000..f1c8be7b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/handler.py
@@ -0,0 +1,5 @@
+"""
+Anthropic /complete API - uses `llm_http_handler.py` to make httpx requests
+
+Request/Response transformation is handled in `transformation.py`
+"""
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/transformation.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/transformation.py
new file mode 100644
index 00000000..7a260b6f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/completion/transformation.py
@@ -0,0 +1,306 @@
+"""
+Translation logic for anthropic's `/v1/complete` endpoint
+
+Litellm provider slug: `anthropic_text/<model_name>`
+"""
+
+import json
+import time
+from typing import AsyncIterator, Dict, Iterator, List, Optional, Union
+
+import httpx
+
+import litellm
+from litellm.litellm_core_utils.prompt_templates.factory import (
+ custom_prompt,
+ prompt_factory,
+)
+from litellm.llms.base_llm.base_model_iterator import BaseModelResponseIterator
+from litellm.llms.base_llm.chat.transformation import (
+ BaseConfig,
+ BaseLLMException,
+ LiteLLMLoggingObj,
+)
+from litellm.types.llms.openai import AllMessageValues
+from litellm.types.utils import (
+ ChatCompletionToolCallChunk,
+ ChatCompletionUsageBlock,
+ GenericStreamingChunk,
+ ModelResponse,
+ Usage,
+)
+
+
+class AnthropicTextError(BaseLLMException):
+ def __init__(self, status_code, message):
+ self.status_code = status_code
+ self.message = message
+ self.request = httpx.Request(
+ method="POST", url="https://api.anthropic.com/v1/complete"
+ )
+ self.response = httpx.Response(status_code=status_code, request=self.request)
+ super().__init__(
+ message=self.message,
+ status_code=self.status_code,
+ request=self.request,
+ response=self.response,
+ ) # Call the base class constructor with the parameters it needs
+
+
+class AnthropicTextConfig(BaseConfig):
+ """
+ Reference: https://docs.anthropic.com/claude/reference/complete_post
+
+ to pass metadata to anthropic, it's {"user_id": "any-relevant-information"}
+ """
+
+ max_tokens_to_sample: Optional[int] = (
+ litellm.max_tokens
+ ) # anthropic requires a default
+ stop_sequences: Optional[list] = None
+ temperature: Optional[int] = None
+ top_p: Optional[int] = None
+ top_k: Optional[int] = None
+ metadata: Optional[dict] = None
+
+ def __init__(
+ self,
+ max_tokens_to_sample: Optional[int] = 256, # anthropic requires a default
+ stop_sequences: Optional[list] = None,
+ temperature: Optional[int] = None,
+ top_p: Optional[int] = None,
+ top_k: Optional[int] = None,
+ metadata: Optional[dict] = None,
+ ) -> None:
+ locals_ = locals().copy()
+ for key, value in locals_.items():
+ if key != "self" and value is not None:
+ setattr(self.__class__, key, value)
+
+ # makes headers for API call
+ def validate_environment(
+ self,
+ headers: dict,
+ model: str,
+ messages: List[AllMessageValues],
+ optional_params: dict,
+ api_key: Optional[str] = None,
+ api_base: Optional[str] = None,
+ ) -> dict:
+ if api_key is None:
+ raise ValueError(
+ "Missing Anthropic API Key - A call is being made to anthropic but no key is set either in the environment variables or via params"
+ )
+ _headers = {
+ "accept": "application/json",
+ "anthropic-version": "2023-06-01",
+ "content-type": "application/json",
+ "x-api-key": api_key,
+ }
+ headers.update(_headers)
+ return headers
+
+ def transform_request(
+ self,
+ model: str,
+ messages: List[AllMessageValues],
+ optional_params: dict,
+ litellm_params: dict,
+ headers: dict,
+ ) -> dict:
+ prompt = self._get_anthropic_text_prompt_from_messages(
+ messages=messages, model=model
+ )
+ ## Load Config
+ config = litellm.AnthropicTextConfig.get_config()
+ for k, v in config.items():
+ if (
+ k not in optional_params
+ ): # completion(top_k=3) > anthropic_config(top_k=3) <- allows for dynamic variables to be passed in
+ optional_params[k] = v
+
+ data = {
+ "model": model,
+ "prompt": prompt,
+ **optional_params,
+ }
+
+ return data
+
+ def get_supported_openai_params(self, model: str):
+ """
+ Anthropic /complete API Ref: https://docs.anthropic.com/en/api/complete
+ """
+ return [
+ "stream",
+ "max_tokens",
+ "max_completion_tokens",
+ "stop",
+ "temperature",
+ "top_p",
+ "extra_headers",
+ "user",
+ ]
+
+ def map_openai_params(
+ self,
+ non_default_params: dict,
+ optional_params: dict,
+ model: str,
+ drop_params: bool,
+ ) -> dict:
+ """
+ Follows the same logic as the AnthropicConfig.map_openai_params method (which is the Anthropic /messages API)
+
+ Note: the only difference is in the get supported openai params method between the AnthropicConfig and AnthropicTextConfig
+ API Ref: https://docs.anthropic.com/en/api/complete
+ """
+ for param, value in non_default_params.items():
+ if param == "max_tokens":
+ optional_params["max_tokens_to_sample"] = value
+ if param == "max_completion_tokens":
+ optional_params["max_tokens_to_sample"] = value
+ if param == "stream" and value is True:
+ optional_params["stream"] = value
+ if param == "stop" and (isinstance(value, str) or isinstance(value, list)):
+ _value = litellm.AnthropicConfig()._map_stop_sequences(value)
+ if _value is not None:
+ optional_params["stop_sequences"] = _value
+ if param == "temperature":
+ optional_params["temperature"] = value
+ if param == "top_p":
+ optional_params["top_p"] = value
+ if param == "user":
+ optional_params["metadata"] = {"user_id": value}
+
+ return optional_params
+
+ def transform_response(
+ self,
+ model: str,
+ raw_response: httpx.Response,
+ model_response: ModelResponse,
+ logging_obj: LiteLLMLoggingObj,
+ request_data: dict,
+ messages: List[AllMessageValues],
+ optional_params: dict,
+ litellm_params: dict,
+ encoding: str,
+ api_key: Optional[str] = None,
+ json_mode: Optional[bool] = None,
+ ) -> ModelResponse:
+ try:
+ completion_response = raw_response.json()
+ except Exception:
+ raise AnthropicTextError(
+ message=raw_response.text, status_code=raw_response.status_code
+ )
+ prompt = self._get_anthropic_text_prompt_from_messages(
+ messages=messages, model=model
+ )
+ if "error" in completion_response:
+ raise AnthropicTextError(
+ message=str(completion_response["error"]),
+ status_code=raw_response.status_code,
+ )
+ else:
+ if len(completion_response["completion"]) > 0:
+ model_response.choices[0].message.content = completion_response[ # type: ignore
+ "completion"
+ ]
+ model_response.choices[0].finish_reason = completion_response["stop_reason"]
+
+ ## CALCULATING USAGE
+ prompt_tokens = len(
+ encoding.encode(prompt)
+ ) ##[TODO] use the anthropic tokenizer here
+ completion_tokens = len(
+ encoding.encode(model_response["choices"][0]["message"].get("content", ""))
+ ) ##[TODO] use the anthropic tokenizer here
+
+ model_response.created = int(time.time())
+ model_response.model = model
+ usage = Usage(
+ prompt_tokens=prompt_tokens,
+ completion_tokens=completion_tokens,
+ total_tokens=prompt_tokens + completion_tokens,
+ )
+
+ setattr(model_response, "usage", usage)
+ return model_response
+
+ def get_error_class(
+ self, error_message: str, status_code: int, headers: Union[Dict, httpx.Headers]
+ ) -> BaseLLMException:
+ return AnthropicTextError(
+ status_code=status_code,
+ message=error_message,
+ )
+
+ @staticmethod
+ def _is_anthropic_text_model(model: str) -> bool:
+ return model == "claude-2" or model == "claude-instant-1"
+
+ def _get_anthropic_text_prompt_from_messages(
+ self, messages: List[AllMessageValues], model: str
+ ) -> str:
+ custom_prompt_dict = litellm.custom_prompt_dict
+ if model in custom_prompt_dict:
+ # check if the model has a registered custom prompt
+ model_prompt_details = custom_prompt_dict[model]
+ prompt = custom_prompt(
+ role_dict=model_prompt_details["roles"],
+ initial_prompt_value=model_prompt_details["initial_prompt_value"],
+ final_prompt_value=model_prompt_details["final_prompt_value"],
+ messages=messages,
+ )
+ else:
+ prompt = prompt_factory(
+ model=model, messages=messages, custom_llm_provider="anthropic"
+ )
+
+ return str(prompt)
+
+ def get_model_response_iterator(
+ self,
+ streaming_response: Union[Iterator[str], AsyncIterator[str], ModelResponse],
+ sync_stream: bool,
+ json_mode: Optional[bool] = False,
+ ):
+ return AnthropicTextCompletionResponseIterator(
+ streaming_response=streaming_response,
+ sync_stream=sync_stream,
+ json_mode=json_mode,
+ )
+
+
+class AnthropicTextCompletionResponseIterator(BaseModelResponseIterator):
+ def chunk_parser(self, chunk: dict) -> GenericStreamingChunk:
+ try:
+ text = ""
+ tool_use: Optional[ChatCompletionToolCallChunk] = None
+ is_finished = False
+ finish_reason = ""
+ usage: Optional[ChatCompletionUsageBlock] = None
+ provider_specific_fields = None
+ index = int(chunk.get("index", 0))
+ _chunk_text = chunk.get("completion", None)
+ if _chunk_text is not None and isinstance(_chunk_text, str):
+ text = _chunk_text
+ finish_reason = chunk.get("stop_reason", None)
+ if finish_reason is not None:
+ is_finished = True
+ returned_chunk = GenericStreamingChunk(
+ text=text,
+ tool_use=tool_use,
+ is_finished=is_finished,
+ finish_reason=finish_reason,
+ usage=usage,
+ index=index,
+ provider_specific_fields=provider_specific_fields,
+ )
+
+ return returned_chunk
+
+ except json.JSONDecodeError:
+ raise ValueError(f"Failed to decode JSON from chunk: {chunk}")
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/cost_calculation.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/cost_calculation.py
new file mode 100644
index 00000000..0dbe19ca
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/cost_calculation.py
@@ -0,0 +1,25 @@
+"""
+Helper util for handling anthropic-specific cost calculation
+- e.g.: prompt caching
+"""
+
+from typing import Tuple
+
+from litellm.litellm_core_utils.llm_cost_calc.utils import generic_cost_per_token
+from litellm.types.utils import Usage
+
+
+def cost_per_token(model: str, usage: Usage) -> Tuple[float, float]:
+ """
+ Calculates the cost per token for a given model, prompt tokens, and completion tokens.
+
+ Input:
+ - model: str, the model name without provider prefix
+ - usage: LiteLLM Usage block, containing anthropic caching information
+
+ Returns:
+ Tuple[float, float] - prompt_cost_in_usd, completion_cost_in_usd
+ """
+ return generic_cost_per_token(
+ model=model, usage=usage, custom_llm_provider="anthropic"
+ )
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/handler.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/handler.py
new file mode 100644
index 00000000..a7dfff74
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/handler.py
@@ -0,0 +1,179 @@
+"""
+- call /messages on Anthropic API
+- Make streaming + non-streaming request - just pass it through direct to Anthropic. No need to do anything special here
+- Ensure requests are logged in the DB - stream + non-stream
+
+"""
+
+import json
+from typing import Any, AsyncIterator, Dict, Optional, Union, cast
+
+import httpx
+
+import litellm
+from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
+from litellm.llms.base_llm.anthropic_messages.transformation import (
+ BaseAnthropicMessagesConfig,
+)
+from litellm.llms.custom_httpx.http_handler import (
+ AsyncHTTPHandler,
+ get_async_httpx_client,
+)
+from litellm.types.router import GenericLiteLLMParams
+from litellm.types.utils import ProviderSpecificHeader
+from litellm.utils import ProviderConfigManager, client
+
+
+class AnthropicMessagesHandler:
+
+ @staticmethod
+ async def _handle_anthropic_streaming(
+ response: httpx.Response,
+ request_body: dict,
+ litellm_logging_obj: LiteLLMLoggingObj,
+ ) -> AsyncIterator:
+ """Helper function to handle Anthropic streaming responses using the existing logging handlers"""
+ from datetime import datetime
+
+ from litellm.proxy.pass_through_endpoints.streaming_handler import (
+ PassThroughStreamingHandler,
+ )
+ from litellm.proxy.pass_through_endpoints.success_handler import (
+ PassThroughEndpointLogging,
+ )
+ from litellm.proxy.pass_through_endpoints.types import EndpointType
+
+ # Create success handler object
+ passthrough_success_handler_obj = PassThroughEndpointLogging()
+
+ # Use the existing streaming handler for Anthropic
+ start_time = datetime.now()
+ return PassThroughStreamingHandler.chunk_processor(
+ response=response,
+ request_body=request_body,
+ litellm_logging_obj=litellm_logging_obj,
+ endpoint_type=EndpointType.ANTHROPIC,
+ start_time=start_time,
+ passthrough_success_handler_obj=passthrough_success_handler_obj,
+ url_route="/v1/messages",
+ )
+
+
+@client
+async def anthropic_messages(
+ api_key: str,
+ model: str,
+ stream: bool = False,
+ api_base: Optional[str] = None,
+ client: Optional[AsyncHTTPHandler] = None,
+ custom_llm_provider: Optional[str] = None,
+ **kwargs,
+) -> Union[Dict[str, Any], AsyncIterator]:
+ """
+ Makes Anthropic `/v1/messages` API calls In the Anthropic API Spec
+ """
+ # Use provided client or create a new one
+ optional_params = GenericLiteLLMParams(**kwargs)
+ model, _custom_llm_provider, dynamic_api_key, dynamic_api_base = (
+ litellm.get_llm_provider(
+ model=model,
+ custom_llm_provider=custom_llm_provider,
+ api_base=optional_params.api_base,
+ api_key=optional_params.api_key,
+ )
+ )
+ anthropic_messages_provider_config: Optional[BaseAnthropicMessagesConfig] = (
+ ProviderConfigManager.get_provider_anthropic_messages_config(
+ model=model,
+ provider=litellm.LlmProviders(_custom_llm_provider),
+ )
+ )
+ if anthropic_messages_provider_config is None:
+ raise ValueError(
+ f"Anthropic messages provider config not found for model: {model}"
+ )
+ if client is None or not isinstance(client, AsyncHTTPHandler):
+ async_httpx_client = get_async_httpx_client(
+ llm_provider=litellm.LlmProviders.ANTHROPIC
+ )
+ else:
+ async_httpx_client = client
+
+ litellm_logging_obj: LiteLLMLoggingObj = kwargs.get("litellm_logging_obj", None)
+
+ # Prepare headers
+ provider_specific_header = cast(
+ Optional[ProviderSpecificHeader], kwargs.get("provider_specific_header", None)
+ )
+ extra_headers = (
+ provider_specific_header.get("extra_headers", {})
+ if provider_specific_header
+ else {}
+ )
+ headers = anthropic_messages_provider_config.validate_environment(
+ headers=extra_headers or {},
+ model=model,
+ api_key=api_key,
+ )
+
+ litellm_logging_obj.update_environment_variables(
+ model=model,
+ optional_params=dict(optional_params),
+ litellm_params={
+ "metadata": kwargs.get("metadata", {}),
+ "preset_cache_key": None,
+ "stream_response": {},
+ **optional_params.model_dump(exclude_unset=True),
+ },
+ custom_llm_provider=_custom_llm_provider,
+ )
+ litellm_logging_obj.model_call_details.update(kwargs)
+
+ # Prepare request body
+ request_body = kwargs.copy()
+ request_body = {
+ k: v
+ for k, v in request_body.items()
+ if k
+ in anthropic_messages_provider_config.get_supported_anthropic_messages_params(
+ model=model
+ )
+ }
+ request_body["stream"] = stream
+ request_body["model"] = model
+ litellm_logging_obj.stream = stream
+
+ # Make the request
+ request_url = anthropic_messages_provider_config.get_complete_url(
+ api_base=api_base, model=model
+ )
+
+ litellm_logging_obj.pre_call(
+ input=[{"role": "user", "content": json.dumps(request_body)}],
+ api_key="",
+ additional_args={
+ "complete_input_dict": request_body,
+ "api_base": str(request_url),
+ "headers": headers,
+ },
+ )
+
+ response = await async_httpx_client.post(
+ url=request_url,
+ headers=headers,
+ data=json.dumps(request_body),
+ stream=stream,
+ )
+ response.raise_for_status()
+
+ # used for logging + cost tracking
+ litellm_logging_obj.model_call_details["httpx_response"] = response
+
+ if stream:
+ return await AnthropicMessagesHandler._handle_anthropic_streaming(
+ response=response,
+ request_body=request_body,
+ litellm_logging_obj=litellm_logging_obj,
+ )
+ else:
+ return response.json()
diff --git a/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/transformation.py b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/transformation.py
new file mode 100644
index 00000000..e9b598f1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/llms/anthropic/experimental_pass_through/messages/transformation.py
@@ -0,0 +1,47 @@
+from typing import Optional
+
+from litellm.llms.base_llm.anthropic_messages.transformation import (
+ BaseAnthropicMessagesConfig,
+)
+
+DEFAULT_ANTHROPIC_API_BASE = "https://api.anthropic.com"
+DEFAULT_ANTHROPIC_API_VERSION = "2023-06-01"
+
+
+class AnthropicMessagesConfig(BaseAnthropicMessagesConfig):
+ def get_supported_anthropic_messages_params(self, model: str) -> list:
+ return [
+ "messages",
+ "model",
+ "system",
+ "max_tokens",
+ "stop_sequences",
+ "temperature",
+ "top_p",
+ "top_k",
+ "tools",
+ "tool_choice",
+ "thinking",
+ # TODO: Add Anthropic `metadata` support
+ # "metadata",
+ ]
+
+ def get_complete_url(self, api_base: Optional[str], model: str) -> str:
+ api_base = api_base or DEFAULT_ANTHROPIC_API_BASE
+ if not api_base.endswith("/v1/messages"):
+ api_base = f"{api_base}/v1/messages"
+ return api_base
+
+ def validate_environment(
+ self,
+ headers: dict,
+ model: str,
+ api_key: Optional[str] = None,
+ ) -> dict:
+ if "x-api-key" not in headers:
+ headers["x-api-key"] = api_key
+ if "anthropic-version" not in headers:
+ headers["anthropic-version"] = DEFAULT_ANTHROPIC_API_VERSION
+ if "content-type" not in headers:
+ headers["content-type"] = "application/json"
+ return headers