import copy
import json
import logging
import os
import time
import uuid
from typing import (
Any,
AsyncGenerator,
Generator,
Optional,
)
from anthropic import Anthropic, AsyncAnthropic
from anthropic.types import (
ContentBlockStopEvent,
Message,
MessageStopEvent,
RawContentBlockDeltaEvent,
RawContentBlockStartEvent,
RawMessageStartEvent,
ToolUseBlock,
)
from core.base.abstractions import GenerationConfig, LLMChatCompletion
from core.base.providers.llm import CompletionConfig, CompletionProvider
from .utils import resize_base64_image
logger = logging.getLogger(__name__)
def generate_tool_id() -> str:
"""Generate a unique tool ID using UUID4."""
return f"tool_{uuid.uuid4().hex[:12]}"
def process_images_in_message(message: dict) -> dict:
"""
Process all images in a message to ensure they're within Anthropic's recommended limits.
"""
if not message or not isinstance(message, dict):
return message
# Handle nested image_data (old format)
if (
message.get("role")
and message.get("image_data")
and isinstance(message["image_data"], dict)
):
if message["image_data"].get("data") and message["image_data"].get(
"media_type"
):
message["image_data"]["data"] = resize_base64_image(
message["image_data"]["data"]
)
return message
# Handle standard content list format
if message.get("content") and isinstance(message["content"], list):
for i, block in enumerate(message["content"]):
if isinstance(block, dict) and block.get("type") == "image":
if block.get("source", {}).get("type") == "base64" and block[
"source"
].get("data"):
message["content"][i]["source"]["data"] = (
resize_base64_image(block["source"]["data"])
)
# Handle string content with base64 image detection (less common)
elif (
message.get("content")
and isinstance(message["content"], str)
and ";base64," in message["content"]
):
# This is a basic detection for base64 images in text - might need more robust handling
logger.warning(
"Detected potential base64 image in string content - not auto-resizing"
)
return message
def openai_message_to_anthropic_block(msg: dict) -> dict:
"""Converts a single OpenAI-style message (including function/tool calls)
into one Anthropic-style message.
Expected keys in `msg` can include:
- role: "system" | "assistant" | "user" | "function" | "tool"
- content: str (possibly JSON arguments or the final text)
- name: str (tool/function name)
- tool_call_id or function_call arguments
- function_call: {"name": ..., "arguments": "..."}
"""
role = msg.get("role", "")
content = msg.get("content", "")
tool_call_id = msg.get("tool_call_id")
# Handle old-style image_data field
image_data = msg.get("image_data")
# Handle nested image_url (less common)
image_url = msg.get("image_url")
if role == "system":
# System messages should not have images, extract any image to a separate user message
if image_url or image_data:
logger.warning(
"Found image in system message - images should be in user messages only"
)
return msg
if role in ["user", "assistant"]:
# If content is already a list, assume it's properly formatted
if isinstance(content, list):
return {"role": role, "content": content}
# Process old-style image_data or image_url
if image_url or image_data:
formatted_content = []
# Add image content first (as recommended by Anthropic)
if image_url:
formatted_content.append(
{
"type": "image",
"source": {"type": "url", "url": image_url},
}
)
elif image_data:
# Resize the image data if needed
resized_data = image_data.get("data", "")
if resized_data:
resized_data = resize_base64_image(resized_data)
formatted_content.append(
{
"type": "image",
"source": {
"type": "base64",
"media_type": image_data.get(
"media_type", "image/jpeg"
),
"data": resized_data,
},
}
)
# Add text content after the image
if content:
if isinstance(content, str):
formatted_content.append({"type": "text", "text": content})
elif isinstance(content, list):
# If it's already a list, extend with it
formatted_content.extend(content)
return {"role": role, "content": formatted_content}
if role in ["function", "tool"]:
return {
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_call_id,
"content": content,
}
],
}
return {"role": role, "content": content}
class AnthropicCompletionProvider(CompletionProvider):
def __init__(self, config: CompletionConfig, *args, **kwargs) -> None:
super().__init__(config)
self.client = Anthropic()
self.async_client = AsyncAnthropic()
logger.debug("AnthropicCompletionProvider initialized successfully")
def _get_base_args(
self, generation_config: GenerationConfig
) -> dict[str, Any]:
"""Build the arguments dictionary for Anthropic's messages.create().
Handles tool configuration according to Anthropic's schema:
{
"type": "function", # Use 'function' type for custom tools
"name": "tool_name",
"description": "tool description",
"parameters": { # Note: Anthropic expects 'parameters', not 'input_schema'
"type": "object",
"properties": {...},
"required": [...]
}
}
"""
model_str = generation_config.model or ""
model_name = (
model_str.split("anthropic/")[-1]
if model_str
else "claude-3-opus-20240229"
)
args: dict[str, Any] = {
"model": model_name,
"temperature": generation_config.temperature,
"max_tokens": generation_config.max_tokens_to_sample,
"stream": generation_config.stream,
}
if generation_config.top_p:
args["top_p"] = generation_config.top_p
if generation_config.tools is not None:
# Convert tools to Anthropic's format
anthropic_tools: list[dict[str, Any]] = []
for tool in generation_config.tools:
tool_def = {
"name": tool["function"]["name"],
"description": tool["function"]["description"],
"input_schema": tool["function"]["parameters"],
}
anthropic_tools.append(tool_def)
args["tools"] = anthropic_tools
if hasattr(generation_config, "tool_choice"):
tool_choice = generation_config.tool_choice
if isinstance(tool_choice, str):
if tool_choice == "auto":
args["tool_choice"] = {"type": "auto"}
elif tool_choice == "any":
args["tool_choice"] = {"type": "any"}
elif isinstance(tool_choice, dict):
if tool_choice.get("type") == "function":
args["tool_choice"] = {
"type": "function",
"name": tool_choice.get("name"),
}
if hasattr(generation_config, "disable_parallel_tool_use"):
args["tool_choice"] = args.get("tool_choice", {})
args["tool_choice"]["disable_parallel_tool_use"] = (
generation_config.disable_parallel_tool_use
)
# --- Extended Thinking Support ---
if getattr(generation_config, "extended_thinking", False):
if (
not hasattr(generation_config, "thinking_budget")
or generation_config.thinking_budget is None
):
raise ValueError(
"Extended thinking is enabled but no thinking_budget is provided."
)
if (
generation_config.thinking_budget
>= generation_config.max_tokens_to_sample
):
raise ValueError(
"thinking_budget must be less than max_tokens_to_sample."
)
args["thinking"] = {
"type": "enabled",
"budget_tokens": generation_config.thinking_budget,
}
return args
def _preprocess_messages(self, messages: list[dict]) -> list[dict]:
"""
Preprocess all messages to optimize images before sending to Anthropic API.
"""
if not messages or not isinstance(messages, list):
return messages
processed_messages = []
for message in messages:
processed_message = process_images_in_message(message)
processed_messages.append(processed_message)
return processed_messages
def _create_openai_style_message(self, content_blocks, tool_calls=None):
"""
Create an OpenAI-style message from Anthropic content blocks
while preserving the original structure.
"""
display_content = ""
structured_content: list[Any] = []
for block in content_blocks:
if block.type == "text":
display_content += block.text
elif block.type == "thinking" and hasattr(block, "thinking"):
# Store the complete thinking block
structured_content.append(
{
"type": "thinking",
"thinking": block.thinking,
"signature": block.signature,
}
)
# For display/logging
# display_content += f"{block.thinking}"
elif block.type == "redacted_thinking" and hasattr(block, "data"):
# Store the complete redacted thinking block
structured_content.append(
{"type": "redacted_thinking", "data": block.data}
)
# Add a placeholder for display/logging
display_content += ""
elif block.type == "tool_use":
# Tool use blocks are handled separately via tool_calls
pass
# If we have structured content (thinking blocks), use that
if structured_content:
# Add any text block at the end if needed
for block in content_blocks:
if block.type == "text":
structured_content.append(
{"type": "text", "text": block.text}
)
return {
"content": display_content or None,
"structured_content": structured_content,
}
else:
# If no structured content, just return the display content
return {"content": display_content or None}
def _convert_to_chat_completion(self, anthropic_msg: Message) -> dict:
"""
Convert a non-streaming Anthropic Message into an OpenAI-style dict.
Preserves thinking blocks for proper handling.
"""
tool_calls: list[Any] = []
message_data: dict[str, Any] = {"role": anthropic_msg.role}
if anthropic_msg.content:
# First, extract any tool use blocks
for block in anthropic_msg.content:
if hasattr(block, "type") and block.type == "tool_use":
tool_calls.append(
{
"index": len(tool_calls),
"id": block.id,
"type": "function",
"function": {
"name": block.name,
"arguments": json.dumps(block.input),
},
}
)
# Then create the message with appropriate content
message_data.update(
self._create_openai_style_message(
anthropic_msg.content, tool_calls
)
)
# If we have tool calls, add them
if tool_calls:
message_data["tool_calls"] = tool_calls
finish_reason = (
"stop"
if anthropic_msg.stop_reason == "end_turn"
else anthropic_msg.stop_reason
)
finish_reason = (
"tool_calls"
if anthropic_msg.stop_reason == "tool_use"
else finish_reason
)
model_str = anthropic_msg.model or ""
model_name = model_str.split("anthropic/")[-1] if model_str else ""
return {
"id": anthropic_msg.id,
"object": "chat.completion",
"created": int(time.time()),
"model": model_name,
"usage": {
"prompt_tokens": (
anthropic_msg.usage.input_tokens
if anthropic_msg.usage
else 0
),
"completion_tokens": (
anthropic_msg.usage.output_tokens
if anthropic_msg.usage
else 0
),
"total_tokens": (
(
anthropic_msg.usage.input_tokens
if anthropic_msg.usage
else 0
)
+ (
anthropic_msg.usage.output_tokens
if anthropic_msg.usage
else 0
)
),
},
"choices": [
{
"index": 0,
"message": message_data,
"finish_reason": finish_reason,
}
],
}
def _split_system_messages(
self, messages: list[dict]
) -> tuple[list[dict], Optional[str]]:
"""
Process messages for Anthropic API, ensuring proper format for tool use and thinking blocks.
Now with image optimization.
"""
# First preprocess to resize any images
messages = self._preprocess_messages(messages)
system_msg = None
filtered: list[dict[str, Any]] = []
pending_tool_results: list[dict[str, Any]] = []
# Look for pairs of tool_use and tool_result
i = 0
while i < len(messages):
m = copy.deepcopy(messages[i])
# Handle system message
if m["role"] == "system" and system_msg is None:
system_msg = m["content"]
i += 1
continue
# Case 1: Message with list format content (thinking blocks or tool blocks)
if (
isinstance(m.get("content"), list)
and len(m["content"]) > 0
and isinstance(m["content"][0], dict)
):
filtered.append({"role": m["role"], "content": m["content"]})
i += 1
continue
# Case 2: Message with structured_content field
elif m.get("structured_content") and m["role"] == "assistant":
filtered.append(
{"role": "assistant", "content": m["structured_content"]}
)
i += 1
continue
# Case 3: Tool calls in an assistant message
elif m.get("tool_calls") and m["role"] == "assistant":
# Add content if it exists
if m.get("content") and not isinstance(m["content"], list):
content_to_add = m["content"]
# Handle content with thinking tags
if "" in content_to_add:
thinking_start = content_to_add.find("")
thinking_end = content_to_add.find("")
if (
thinking_start >= 0
and thinking_end > thinking_start
):
thinking_content = content_to_add[
thinking_start + 7 : thinking_end
]
text_content = content_to_add[
thinking_end + 8 :
].strip()
filtered.append(
{
"role": "assistant",
"content": [
{
"type": "thinking",
"thinking": thinking_content,
"signature": "placeholder_signature", # This is a placeholder
},
{"type": "text", "text": text_content},
],
}
)
else:
filtered.append(
{
"role": "assistant",
"content": content_to_add,
}
)
else:
filtered.append(
{"role": "assistant", "content": content_to_add}
)
# Add tool use blocks
tool_uses = []
for call in m["tool_calls"]:
tool_uses.append(
{
"type": "tool_use",
"id": call["id"],
"name": call["function"]["name"],
"input": json.loads(call["function"]["arguments"]),
}
)
filtered.append({"role": "assistant", "content": tool_uses})
# Check if next message is a tool result for this tool call
if i + 1 < len(messages) and messages[i + 1]["role"] in [
"function",
"tool",
]:
next_m = copy.deepcopy(messages[i + 1])
# Make sure this is a tool result for the current tool use
if next_m.get("tool_call_id") in [
call["id"] for call in m["tool_calls"]
]:
# Add tool result as a user message
filtered.append(
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": next_m["tool_call_id"],
"content": next_m["content"],
}
],
}
)
i += 2 # Skip both the tool call and result
continue
i += 1
continue
# Case 4: Direct tool result (might be missing its paired tool call)
elif m["role"] in ["function", "tool"] and m.get("tool_call_id"):
# Add a user message with the tool result
filtered.append(
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": m["tool_call_id"],
"content": m["content"],
}
],
}
)
i += 1
continue
# Default case: normal message
elif m["role"] in ["function", "tool"]:
# Collect tool results to combine them
pending_tool_results.append(
{
"type": "tool_result",
"tool_use_id": m.get("tool_call_id"),
"content": m["content"],
}
)
# If we have all expected results, add them as one message
if len(filtered) > 0 and len(
filtered[-1].get("content", [])
) == len(pending_tool_results):
filtered.append(
{"role": "user", "content": pending_tool_results}
)
pending_tool_results = []
else:
filtered.append(openai_message_to_anthropic_block(m))
i += 1
# Final validation: ensure no tool_use is at the end without a tool_result
if filtered and len(filtered) > 1:
last_msg = filtered[-1]
if (
last_msg["role"] == "assistant"
and isinstance(last_msg.get("content"), list)
and any(
block.get("type") == "tool_use"
for block in last_msg["content"]
)
):
logger.warning(
"Found tool_use at end of conversation without tool_result - removing it"
)
filtered.pop() # Remove problematic message
return filtered, system_msg
async def _execute_task(self, task: dict[str, Any]):
"""Async entry point.
Decide if streaming or not, then call the appropriate helper.
"""
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
logger.error("Missing ANTHROPIC_API_KEY in environment.")
raise ValueError(
"Anthropic API key not found. Set ANTHROPIC_API_KEY env var."
)
messages = task["messages"]
generation_config = task["generation_config"]
extra_kwargs = task["kwargs"]
base_args = self._get_base_args(generation_config)
filtered_messages, system_msg = self._split_system_messages(messages)
base_args["messages"] = filtered_messages
if system_msg:
base_args["system"] = system_msg
args = {**base_args, **extra_kwargs}
logger.debug(f"Anthropic async call with args={args}")
if generation_config.stream:
return self._execute_task_async_streaming(args)
else:
return await self._execute_task_async_nonstreaming(args)
async def _execute_task_async_nonstreaming(
self, args: dict[str, Any]
) -> LLMChatCompletion:
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
logger.error("Missing ANTHROPIC_API_KEY in environment.")
raise ValueError(
"Anthropic API key not found. Set ANTHROPIC_API_KEY env var."
)
try:
logger.debug(f"Anthropic API request: {args}")
response = await self.async_client.messages.create(**args)
logger.debug(f"Anthropic API response: {response}")
return LLMChatCompletion(
**self._convert_to_chat_completion(response)
)
except Exception as e:
logger.error(f"Anthropic async non-stream call failed: {e}")
logger.error("message payload = ", args)
raise
async def _execute_task_async_streaming(
self, args: dict
) -> AsyncGenerator[dict[str, Any], None]:
"""Streaming call (async): yields partial tokens in OpenAI-like SSE
format."""
# The `stream=True` is typically handled by Anthropics from the original args,
# but we remove it to avoid conflicts and rely on `messages.stream()`.
args.pop("stream", None)
try:
async with self.async_client.messages.stream(**args) as stream:
# We'll track partial JSON for function calls in buffer_data
buffer_data: dict[str, Any] = {
"tool_json_buffer": "",
"tool_name": None,
"tool_id": None,
"is_collecting_tool": False,
"thinking_buffer": "",
"is_collecting_thinking": False,
"thinking_signature": None,
"message_id": f"chatcmpl-{int(time.time())}",
}
model_name = args.get("model", "claude-2")
if isinstance(model_name, str):
model_name = model_name.split("anthropic/")[-1]
async for event in stream:
chunks = self._process_stream_event(
event=event,
buffer_data=buffer_data,
model_name=model_name,
)
for chunk in chunks:
yield chunk
except Exception as e:
logger.error(f"Failed to execute streaming Anthropic task: {e}")
logger.error("message payload = ", args)
raise
def _execute_task_sync(self, task: dict[str, Any]):
"""Synchronous entry point."""
messages = task["messages"]
generation_config = task["generation_config"]
extra_kwargs = task["kwargs"]
base_args = self._get_base_args(generation_config)
filtered_messages, system_msg = self._split_system_messages(messages)
base_args["messages"] = filtered_messages
if system_msg:
base_args["system"] = system_msg
args = {**base_args, **extra_kwargs}
logger.debug(f"Anthropic sync call with args={args}")
if generation_config.stream:
return self._execute_task_sync_streaming(args)
else:
return self._execute_task_sync_nonstreaming(args)
def _execute_task_sync_nonstreaming(
self, args: dict[str, Any]
): # -> LLMChatCompletion: # FIXME: LLMChatCompletion is an object from the OpenAI API, which causes a validation error
"""Non-streaming synchronous call."""
try:
response = self.client.messages.create(**args)
logger.debug("Anthropic sync non-stream call succeeded.")
return LLMChatCompletion(
**self._convert_to_chat_completion(response)
)
except Exception as e:
logger.error(f"Anthropic sync call failed: {e}")
raise
def _execute_task_sync_streaming(
self, args: dict[str, Any]
) -> Generator[dict[str, Any], None, None]:
"""
Synchronous streaming call: yields partial tokens in a generator.
"""
args.pop("stream", None)
try:
with self.client.messages.stream(**args) as stream:
buffer_data: dict[str, Any] = {
"tool_json_buffer": "",
"tool_name": None,
"tool_id": None,
"is_collecting_tool": False,
"thinking_buffer": "",
"is_collecting_thinking": False,
"thinking_signature": None,
"message_id": f"chatcmpl-{int(time.time())}",
}
model_name = args.get("model", "anthropic/claude-2")
if isinstance(model_name, str):
model_name = model_name.split("anthropic/")[-1]
for event in stream:
yield from self._process_stream_event(
event=event,
buffer_data=buffer_data,
model_name=model_name.split("anthropic/")[-1],
)
except Exception as e:
logger.error(f"Anthropic sync streaming call failed: {e}")
raise
def _process_stream_event(
self, event: Any, buffer_data: dict[str, Any], model_name: str
) -> list[dict[str, Any]]:
chunks: list[dict[str, Any]] = []
def make_base_chunk() -> dict[str, Any]:
return {
"id": buffer_data["message_id"],
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model_name,
"choices": [{"index": 0, "delta": {}, "finish_reason": None}],
}
if isinstance(event, RawMessageStartEvent):
buffer_data["message_id"] = event.message.id
chunk = make_base_chunk()
input_tokens = (
event.message.usage.input_tokens if event.message.usage else 0
)
chunk["usage"] = {
"prompt_tokens": input_tokens,
"completion_tokens": 0,
"total_tokens": input_tokens,
}
chunks.append(chunk)
elif isinstance(event, RawContentBlockStartEvent):
if hasattr(event.content_block, "type"):
block_type = event.content_block.type
if block_type == "thinking":
buffer_data["is_collecting_thinking"] = True
buffer_data["thinking_buffer"] = ""
# Don't emit anything yet
elif block_type == "tool_use" or isinstance(
event.content_block, ToolUseBlock
):
buffer_data["tool_name"] = event.content_block.name # type: ignore
buffer_data["tool_id"] = event.content_block.id # type: ignore
buffer_data["tool_json_buffer"] = ""
buffer_data["is_collecting_tool"] = True
elif isinstance(event, RawContentBlockDeltaEvent):
delta_obj = getattr(event, "delta", None)
delta_type = getattr(delta_obj, "type", None)
# Handle thinking deltas
if delta_type == "thinking_delta" and hasattr(
delta_obj, "thinking"
):
thinking_chunk = delta_obj.thinking # type: ignore
if buffer_data["is_collecting_thinking"]:
buffer_data["thinking_buffer"] += thinking_chunk
# Stream thinking chunks as they come in
chunk = make_base_chunk()
chunk["choices"][0]["delta"] = {"thinking": thinking_chunk}
chunks.append(chunk)
# Handle signature deltas for thinking blocks
elif delta_type == "signature_delta" and hasattr(
delta_obj, "signature"
):
if buffer_data["is_collecting_thinking"]:
buffer_data["thinking_signature"] = delta_obj.signature # type: ignore
# No need to emit anything for the signature
chunk = make_base_chunk()
chunk["choices"][0]["delta"] = {
"thinking_signature": delta_obj.signature # type: ignore
}
chunks.append(chunk)
# Handle text deltas
elif delta_type == "text_delta" and hasattr(delta_obj, "text"):
text_chunk = delta_obj.text # type: ignore
if not buffer_data["is_collecting_tool"] and text_chunk:
chunk = make_base_chunk()
chunk["choices"][0]["delta"] = {"content": text_chunk}
chunks.append(chunk)
# Handle partial JSON for tools
elif hasattr(delta_obj, "partial_json"):
if buffer_data["is_collecting_tool"]:
buffer_data["tool_json_buffer"] += delta_obj.partial_json # type: ignore
elif isinstance(event, ContentBlockStopEvent):
# Handle the end of a thinking block
if buffer_data.get("is_collecting_thinking"):
# Emit a special "structured_content_delta" with the complete thinking block
if (
buffer_data["thinking_buffer"]
and buffer_data["thinking_signature"]
):
chunk = make_base_chunk()
chunk["choices"][0]["delta"] = {
"structured_content": [
{
"type": "thinking",
"thinking": buffer_data["thinking_buffer"],
"signature": buffer_data["thinking_signature"],
}
]
}
chunks.append(chunk)
# Reset thinking collection
buffer_data["is_collecting_thinking"] = False
buffer_data["thinking_buffer"] = ""
buffer_data["thinking_signature"] = None
# Handle the end of a tool use block
elif buffer_data.get("is_collecting_tool"):
try:
json.loads(buffer_data["tool_json_buffer"])
chunk = make_base_chunk()
chunk["choices"][0]["delta"] = {
"tool_calls": [
{
"index": 0,
"type": "function",
"id": buffer_data["tool_id"]
or f"call_{generate_tool_id()}",
"function": {
"name": buffer_data["tool_name"],
"arguments": buffer_data[
"tool_json_buffer"
],
},
}
]
}
chunks.append(chunk)
buffer_data["is_collecting_tool"] = False
buffer_data["tool_json_buffer"] = ""
buffer_data["tool_name"] = None
buffer_data["tool_id"] = None
except json.JSONDecodeError:
logger.warning(
"Incomplete JSON in tool call, skipping chunk"
)
elif isinstance(event, MessageStopEvent):
# Check if the event has a message attribute before accessing it
stop_reason = getattr(event, "message", None)
if stop_reason and hasattr(stop_reason, "stop_reason"):
stop_reason = stop_reason.stop_reason
chunk = make_base_chunk()
if stop_reason == "tool_use":
chunk["choices"][0]["delta"] = {}
chunk["choices"][0]["finish_reason"] = "tool_calls"
else:
chunk["choices"][0]["delta"] = {}
chunk["choices"][0]["finish_reason"] = "stop"
chunks.append(chunk)
else:
# Handle the case where message is not available
chunk = make_base_chunk()
chunk["choices"][0]["delta"] = {}
chunk["choices"][0]["finish_reason"] = "stop"
chunks.append(chunk)
return chunks