aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/core/providers/llm
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/providers/llm
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/providers/llm')
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/llm/__init__.py11
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/llm/anthropic.py925
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/llm/azure_foundry.py110
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/llm/litellm.py80
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/llm/openai.py522
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/llm/r2r_llm.py96
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/llm/utils.py106
7 files changed, 1850 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/__init__.py b/.venv/lib/python3.12/site-packages/core/providers/llm/__init__.py
new file mode 100644
index 00000000..8132e11c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/llm/__init__.py
@@ -0,0 +1,11 @@
+from .anthropic import AnthropicCompletionProvider
+from .litellm import LiteLLMCompletionProvider
+from .openai import OpenAICompletionProvider
+from .r2r_llm import R2RCompletionProvider
+
+__all__ = [
+ "AnthropicCompletionProvider",
+ "LiteLLMCompletionProvider",
+ "OpenAICompletionProvider",
+ "R2RCompletionProvider",
+]
diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/anthropic.py b/.venv/lib/python3.12/site-packages/core/providers/llm/anthropic.py
new file mode 100644
index 00000000..0089a207
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/llm/anthropic.py
@@ -0,0 +1,925 @@
+import copy
+import json
+import logging
+import os
+import time
+import uuid
+from typing import (
+ Any,
+ AsyncGenerator,
+ Generator,
+ Optional,
+)
+
+from anthropic import Anthropic, AsyncAnthropic
+from anthropic.types import (
+ ContentBlockStopEvent,
+ Message,
+ MessageStopEvent,
+ RawContentBlockDeltaEvent,
+ RawContentBlockStartEvent,
+ RawMessageStartEvent,
+ ToolUseBlock,
+)
+
+from core.base.abstractions import GenerationConfig, LLMChatCompletion
+from core.base.providers.llm import CompletionConfig, CompletionProvider
+
+from .utils import resize_base64_image
+
+logger = logging.getLogger(__name__)
+
+
+def generate_tool_id() -> str:
+ """Generate a unique tool ID using UUID4."""
+ return f"tool_{uuid.uuid4().hex[:12]}"
+
+
+def process_images_in_message(message: dict) -> dict:
+ """
+ Process all images in a message to ensure they're within Anthropic's recommended limits.
+ """
+ if not message or not isinstance(message, dict):
+ return message
+
+ # Handle nested image_data (old format)
+ if (
+ message.get("role")
+ and message.get("image_data")
+ and isinstance(message["image_data"], dict)
+ ):
+ if message["image_data"].get("data") and message["image_data"].get(
+ "media_type"
+ ):
+ message["image_data"]["data"] = resize_base64_image(
+ message["image_data"]["data"]
+ )
+ return message
+
+ # Handle standard content list format
+ if message.get("content") and isinstance(message["content"], list):
+ for i, block in enumerate(message["content"]):
+ if isinstance(block, dict) and block.get("type") == "image":
+ if block.get("source", {}).get("type") == "base64" and block[
+ "source"
+ ].get("data"):
+ message["content"][i]["source"]["data"] = (
+ resize_base64_image(block["source"]["data"])
+ )
+
+ # Handle string content with base64 image detection (less common)
+ elif (
+ message.get("content")
+ and isinstance(message["content"], str)
+ and ";base64," in message["content"]
+ ):
+ # This is a basic detection for base64 images in text - might need more robust handling
+ logger.warning(
+ "Detected potential base64 image in string content - not auto-resizing"
+ )
+
+ return message
+
+
+def openai_message_to_anthropic_block(msg: dict) -> dict:
+ """Converts a single OpenAI-style message (including function/tool calls)
+ into one Anthropic-style message.
+
+ Expected keys in `msg` can include:
+ - role: "system" | "assistant" | "user" | "function" | "tool"
+ - content: str (possibly JSON arguments or the final text)
+ - name: str (tool/function name)
+ - tool_call_id or function_call arguments
+ - function_call: {"name": ..., "arguments": "..."}
+ """
+ role = msg.get("role", "")
+ content = msg.get("content", "")
+ tool_call_id = msg.get("tool_call_id")
+
+ # Handle old-style image_data field
+ image_data = msg.get("image_data")
+ # Handle nested image_url (less common)
+ image_url = msg.get("image_url")
+
+ if role == "system":
+ # System messages should not have images, extract any image to a separate user message
+ if image_url or image_data:
+ logger.warning(
+ "Found image in system message - images should be in user messages only"
+ )
+ return msg
+
+ if role in ["user", "assistant"]:
+ # If content is already a list, assume it's properly formatted
+ if isinstance(content, list):
+ return {"role": role, "content": content}
+
+ # Process old-style image_data or image_url
+ if image_url or image_data:
+ formatted_content = []
+
+ # Add image content first (as recommended by Anthropic)
+ if image_url:
+ formatted_content.append(
+ {
+ "type": "image",
+ "source": {"type": "url", "url": image_url},
+ }
+ )
+ elif image_data:
+ # Resize the image data if needed
+ resized_data = image_data.get("data", "")
+ if resized_data:
+ resized_data = resize_base64_image(resized_data)
+
+ formatted_content.append(
+ {
+ "type": "image",
+ "source": {
+ "type": "base64",
+ "media_type": image_data.get(
+ "media_type", "image/jpeg"
+ ),
+ "data": resized_data,
+ },
+ }
+ )
+
+ # Add text content after the image
+ if content:
+ if isinstance(content, str):
+ formatted_content.append({"type": "text", "text": content})
+ elif isinstance(content, list):
+ # If it's already a list, extend with it
+ formatted_content.extend(content)
+
+ return {"role": role, "content": formatted_content}
+
+ if role in ["function", "tool"]:
+ return {
+ "role": "user",
+ "content": [
+ {
+ "type": "tool_result",
+ "tool_use_id": tool_call_id,
+ "content": content,
+ }
+ ],
+ }
+
+ return {"role": role, "content": content}
+
+
+class AnthropicCompletionProvider(CompletionProvider):
+ def __init__(self, config: CompletionConfig, *args, **kwargs) -> None:
+ super().__init__(config)
+ self.client = Anthropic()
+ self.async_client = AsyncAnthropic()
+ logger.debug("AnthropicCompletionProvider initialized successfully")
+
+ def _get_base_args(
+ self, generation_config: GenerationConfig
+ ) -> dict[str, Any]:
+ """Build the arguments dictionary for Anthropic's messages.create().
+
+ Handles tool configuration according to Anthropic's schema:
+ {
+ "type": "function", # Use 'function' type for custom tools
+ "name": "tool_name",
+ "description": "tool description",
+ "parameters": { # Note: Anthropic expects 'parameters', not 'input_schema'
+ "type": "object",
+ "properties": {...},
+ "required": [...]
+ }
+ }
+ """
+ model_str = generation_config.model or ""
+ model_name = (
+ model_str.split("anthropic/")[-1]
+ if model_str
+ else "claude-3-opus-20240229"
+ )
+
+ args: dict[str, Any] = {
+ "model": model_name,
+ "temperature": generation_config.temperature,
+ "max_tokens": generation_config.max_tokens_to_sample,
+ "stream": generation_config.stream,
+ }
+ if generation_config.top_p:
+ args["top_p"] = generation_config.top_p
+
+ if generation_config.tools is not None:
+ # Convert tools to Anthropic's format
+ anthropic_tools: list[dict[str, Any]] = []
+ for tool in generation_config.tools:
+ tool_def = {
+ "name": tool["function"]["name"],
+ "description": tool["function"]["description"],
+ "input_schema": tool["function"]["parameters"],
+ }
+ anthropic_tools.append(tool_def)
+ args["tools"] = anthropic_tools
+
+ if hasattr(generation_config, "tool_choice"):
+ tool_choice = generation_config.tool_choice
+ if isinstance(tool_choice, str):
+ if tool_choice == "auto":
+ args["tool_choice"] = {"type": "auto"}
+ elif tool_choice == "any":
+ args["tool_choice"] = {"type": "any"}
+ elif isinstance(tool_choice, dict):
+ if tool_choice.get("type") == "function":
+ args["tool_choice"] = {
+ "type": "function",
+ "name": tool_choice.get("name"),
+ }
+ if hasattr(generation_config, "disable_parallel_tool_use"):
+ args["tool_choice"] = args.get("tool_choice", {})
+ args["tool_choice"]["disable_parallel_tool_use"] = (
+ generation_config.disable_parallel_tool_use
+ )
+
+ # --- Extended Thinking Support ---
+ if getattr(generation_config, "extended_thinking", False):
+ if (
+ not hasattr(generation_config, "thinking_budget")
+ or generation_config.thinking_budget is None
+ ):
+ raise ValueError(
+ "Extended thinking is enabled but no thinking_budget is provided."
+ )
+ if (
+ generation_config.thinking_budget
+ >= generation_config.max_tokens_to_sample
+ ):
+ raise ValueError(
+ "thinking_budget must be less than max_tokens_to_sample."
+ )
+ args["thinking"] = {
+ "type": "enabled",
+ "budget_tokens": generation_config.thinking_budget,
+ }
+ return args
+
+ def _preprocess_messages(self, messages: list[dict]) -> list[dict]:
+ """
+ Preprocess all messages to optimize images before sending to Anthropic API.
+ """
+ if not messages or not isinstance(messages, list):
+ return messages
+
+ processed_messages = []
+ for message in messages:
+ processed_message = process_images_in_message(message)
+ processed_messages.append(processed_message)
+
+ return processed_messages
+
+ def _create_openai_style_message(self, content_blocks, tool_calls=None):
+ """
+ Create an OpenAI-style message from Anthropic content blocks
+ while preserving the original structure.
+ """
+ display_content = ""
+ structured_content: list[Any] = []
+
+ for block in content_blocks:
+ if block.type == "text":
+ display_content += block.text
+ elif block.type == "thinking" and hasattr(block, "thinking"):
+ # Store the complete thinking block
+ structured_content.append(
+ {
+ "type": "thinking",
+ "thinking": block.thinking,
+ "signature": block.signature,
+ }
+ )
+ # For display/logging
+ # display_content += f"<think>{block.thinking}</think>"
+ elif block.type == "redacted_thinking" and hasattr(block, "data"):
+ # Store the complete redacted thinking block
+ structured_content.append(
+ {"type": "redacted_thinking", "data": block.data}
+ )
+ # Add a placeholder for display/logging
+ display_content += "<redacted thinking block>"
+ elif block.type == "tool_use":
+ # Tool use blocks are handled separately via tool_calls
+ pass
+
+ # If we have structured content (thinking blocks), use that
+ if structured_content:
+ # Add any text block at the end if needed
+ for block in content_blocks:
+ if block.type == "text":
+ structured_content.append(
+ {"type": "text", "text": block.text}
+ )
+
+ return {
+ "content": display_content or None,
+ "structured_content": structured_content,
+ }
+ else:
+ # If no structured content, just return the display content
+ return {"content": display_content or None}
+
+ def _convert_to_chat_completion(self, anthropic_msg: Message) -> dict:
+ """
+ Convert a non-streaming Anthropic Message into an OpenAI-style dict.
+ Preserves thinking blocks for proper handling.
+ """
+ tool_calls: list[Any] = []
+ message_data: dict[str, Any] = {"role": anthropic_msg.role}
+
+ if anthropic_msg.content:
+ # First, extract any tool use blocks
+ for block in anthropic_msg.content:
+ if hasattr(block, "type") and block.type == "tool_use":
+ tool_calls.append(
+ {
+ "index": len(tool_calls),
+ "id": block.id,
+ "type": "function",
+ "function": {
+ "name": block.name,
+ "arguments": json.dumps(block.input),
+ },
+ }
+ )
+
+ # Then create the message with appropriate content
+ message_data.update(
+ self._create_openai_style_message(
+ anthropic_msg.content, tool_calls
+ )
+ )
+
+ # If we have tool calls, add them
+ if tool_calls:
+ message_data["tool_calls"] = tool_calls
+
+ finish_reason = (
+ "stop"
+ if anthropic_msg.stop_reason == "end_turn"
+ else anthropic_msg.stop_reason
+ )
+ finish_reason = (
+ "tool_calls"
+ if anthropic_msg.stop_reason == "tool_use"
+ else finish_reason
+ )
+
+ model_str = anthropic_msg.model or ""
+ model_name = model_str.split("anthropic/")[-1] if model_str else ""
+
+ return {
+ "id": anthropic_msg.id,
+ "object": "chat.completion",
+ "created": int(time.time()),
+ "model": model_name,
+ "usage": {
+ "prompt_tokens": (
+ anthropic_msg.usage.input_tokens
+ if anthropic_msg.usage
+ else 0
+ ),
+ "completion_tokens": (
+ anthropic_msg.usage.output_tokens
+ if anthropic_msg.usage
+ else 0
+ ),
+ "total_tokens": (
+ (
+ anthropic_msg.usage.input_tokens
+ if anthropic_msg.usage
+ else 0
+ )
+ + (
+ anthropic_msg.usage.output_tokens
+ if anthropic_msg.usage
+ else 0
+ )
+ ),
+ },
+ "choices": [
+ {
+ "index": 0,
+ "message": message_data,
+ "finish_reason": finish_reason,
+ }
+ ],
+ }
+
+ def _split_system_messages(
+ self, messages: list[dict]
+ ) -> tuple[list[dict], Optional[str]]:
+ """
+ Process messages for Anthropic API, ensuring proper format for tool use and thinking blocks.
+ Now with image optimization.
+ """
+ # First preprocess to resize any images
+ messages = self._preprocess_messages(messages)
+
+ system_msg = None
+ filtered: list[dict[str, Any]] = []
+ pending_tool_results: list[dict[str, Any]] = []
+
+ # Look for pairs of tool_use and tool_result
+ i = 0
+ while i < len(messages):
+ m = copy.deepcopy(messages[i])
+
+ # Handle system message
+ if m["role"] == "system" and system_msg is None:
+ system_msg = m["content"]
+ i += 1
+ continue
+
+ # Case 1: Message with list format content (thinking blocks or tool blocks)
+ if (
+ isinstance(m.get("content"), list)
+ and len(m["content"]) > 0
+ and isinstance(m["content"][0], dict)
+ ):
+ filtered.append({"role": m["role"], "content": m["content"]})
+ i += 1
+ continue
+
+ # Case 2: Message with structured_content field
+ elif m.get("structured_content") and m["role"] == "assistant":
+ filtered.append(
+ {"role": "assistant", "content": m["structured_content"]}
+ )
+ i += 1
+ continue
+
+ # Case 3: Tool calls in an assistant message
+ elif m.get("tool_calls") and m["role"] == "assistant":
+ # Add content if it exists
+ if m.get("content") and not isinstance(m["content"], list):
+ content_to_add = m["content"]
+ # Handle content with thinking tags
+ if "<think>" in content_to_add:
+ thinking_start = content_to_add.find("<think>")
+ thinking_end = content_to_add.find("</think>")
+ if (
+ thinking_start >= 0
+ and thinking_end > thinking_start
+ ):
+ thinking_content = content_to_add[
+ thinking_start + 7 : thinking_end
+ ]
+ text_content = content_to_add[
+ thinking_end + 8 :
+ ].strip()
+ filtered.append(
+ {
+ "role": "assistant",
+ "content": [
+ {
+ "type": "thinking",
+ "thinking": thinking_content,
+ "signature": "placeholder_signature", # This is a placeholder
+ },
+ {"type": "text", "text": text_content},
+ ],
+ }
+ )
+ else:
+ filtered.append(
+ {
+ "role": "assistant",
+ "content": content_to_add,
+ }
+ )
+ else:
+ filtered.append(
+ {"role": "assistant", "content": content_to_add}
+ )
+
+ # Add tool use blocks
+ tool_uses = []
+ for call in m["tool_calls"]:
+ tool_uses.append(
+ {
+ "type": "tool_use",
+ "id": call["id"],
+ "name": call["function"]["name"],
+ "input": json.loads(call["function"]["arguments"]),
+ }
+ )
+
+ filtered.append({"role": "assistant", "content": tool_uses})
+
+ # Check if next message is a tool result for this tool call
+ if i + 1 < len(messages) and messages[i + 1]["role"] in [
+ "function",
+ "tool",
+ ]:
+ next_m = copy.deepcopy(messages[i + 1])
+
+ # Make sure this is a tool result for the current tool use
+ if next_m.get("tool_call_id") in [
+ call["id"] for call in m["tool_calls"]
+ ]:
+ # Add tool result as a user message
+ filtered.append(
+ {
+ "role": "user",
+ "content": [
+ {
+ "type": "tool_result",
+ "tool_use_id": next_m["tool_call_id"],
+ "content": next_m["content"],
+ }
+ ],
+ }
+ )
+ i += 2 # Skip both the tool call and result
+ continue
+
+ i += 1
+ continue
+
+ # Case 4: Direct tool result (might be missing its paired tool call)
+ elif m["role"] in ["function", "tool"] and m.get("tool_call_id"):
+ # Add a user message with the tool result
+ filtered.append(
+ {
+ "role": "user",
+ "content": [
+ {
+ "type": "tool_result",
+ "tool_use_id": m["tool_call_id"],
+ "content": m["content"],
+ }
+ ],
+ }
+ )
+ i += 1
+ continue
+
+ # Default case: normal message
+ elif m["role"] in ["function", "tool"]:
+ # Collect tool results to combine them
+ pending_tool_results.append(
+ {
+ "type": "tool_result",
+ "tool_use_id": m.get("tool_call_id"),
+ "content": m["content"],
+ }
+ )
+
+ # If we have all expected results, add them as one message
+ if len(filtered) > 0 and len(
+ filtered[-1].get("content", [])
+ ) == len(pending_tool_results):
+ filtered.append(
+ {"role": "user", "content": pending_tool_results}
+ )
+ pending_tool_results = []
+ else:
+ filtered.append(openai_message_to_anthropic_block(m))
+ i += 1
+
+ # Final validation: ensure no tool_use is at the end without a tool_result
+ if filtered and len(filtered) > 1:
+ last_msg = filtered[-1]
+ if (
+ last_msg["role"] == "assistant"
+ and isinstance(last_msg.get("content"), list)
+ and any(
+ block.get("type") == "tool_use"
+ for block in last_msg["content"]
+ )
+ ):
+ logger.warning(
+ "Found tool_use at end of conversation without tool_result - removing it"
+ )
+ filtered.pop() # Remove problematic message
+
+ return filtered, system_msg
+
+ async def _execute_task(self, task: dict[str, Any]):
+ """Async entry point.
+
+ Decide if streaming or not, then call the appropriate helper.
+ """
+ api_key = os.getenv("ANTHROPIC_API_KEY")
+ if not api_key:
+ logger.error("Missing ANTHROPIC_API_KEY in environment.")
+ raise ValueError(
+ "Anthropic API key not found. Set ANTHROPIC_API_KEY env var."
+ )
+
+ messages = task["messages"]
+ generation_config = task["generation_config"]
+ extra_kwargs = task["kwargs"]
+ base_args = self._get_base_args(generation_config)
+ filtered_messages, system_msg = self._split_system_messages(messages)
+ base_args["messages"] = filtered_messages
+ if system_msg:
+ base_args["system"] = system_msg
+
+ args = {**base_args, **extra_kwargs}
+ logger.debug(f"Anthropic async call with args={args}")
+
+ if generation_config.stream:
+ return self._execute_task_async_streaming(args)
+ else:
+ return await self._execute_task_async_nonstreaming(args)
+
+ async def _execute_task_async_nonstreaming(
+ self, args: dict[str, Any]
+ ) -> LLMChatCompletion:
+ api_key = os.getenv("ANTHROPIC_API_KEY")
+ if not api_key:
+ logger.error("Missing ANTHROPIC_API_KEY in environment.")
+ raise ValueError(
+ "Anthropic API key not found. Set ANTHROPIC_API_KEY env var."
+ )
+
+ try:
+ logger.debug(f"Anthropic API request: {args}")
+ response = await self.async_client.messages.create(**args)
+ logger.debug(f"Anthropic API response: {response}")
+
+ return LLMChatCompletion(
+ **self._convert_to_chat_completion(response)
+ )
+ except Exception as e:
+ logger.error(f"Anthropic async non-stream call failed: {e}")
+ logger.error("message payload = ", args)
+ raise
+
+ async def _execute_task_async_streaming(
+ self, args: dict
+ ) -> AsyncGenerator[dict[str, Any], None]:
+ """Streaming call (async): yields partial tokens in OpenAI-like SSE
+ format."""
+ # The `stream=True` is typically handled by Anthropics from the original args,
+ # but we remove it to avoid conflicts and rely on `messages.stream()`.
+ args.pop("stream", None)
+ try:
+ async with self.async_client.messages.stream(**args) as stream:
+ # We'll track partial JSON for function calls in buffer_data
+ buffer_data: dict[str, Any] = {
+ "tool_json_buffer": "",
+ "tool_name": None,
+ "tool_id": None,
+ "is_collecting_tool": False,
+ "thinking_buffer": "",
+ "is_collecting_thinking": False,
+ "thinking_signature": None,
+ "message_id": f"chatcmpl-{int(time.time())}",
+ }
+ model_name = args.get("model", "claude-2")
+ if isinstance(model_name, str):
+ model_name = model_name.split("anthropic/")[-1]
+
+ async for event in stream:
+ chunks = self._process_stream_event(
+ event=event,
+ buffer_data=buffer_data,
+ model_name=model_name,
+ )
+ for chunk in chunks:
+ yield chunk
+ except Exception as e:
+ logger.error(f"Failed to execute streaming Anthropic task: {e}")
+ logger.error("message payload = ", args)
+
+ raise
+
+ def _execute_task_sync(self, task: dict[str, Any]):
+ """Synchronous entry point."""
+ messages = task["messages"]
+ generation_config = task["generation_config"]
+ extra_kwargs = task["kwargs"]
+
+ base_args = self._get_base_args(generation_config)
+ filtered_messages, system_msg = self._split_system_messages(messages)
+ base_args["messages"] = filtered_messages
+ if system_msg:
+ base_args["system"] = system_msg
+
+ args = {**base_args, **extra_kwargs}
+ logger.debug(f"Anthropic sync call with args={args}")
+
+ if generation_config.stream:
+ return self._execute_task_sync_streaming(args)
+ else:
+ return self._execute_task_sync_nonstreaming(args)
+
+ def _execute_task_sync_nonstreaming(
+ self, args: dict[str, Any]
+ ): # -> LLMChatCompletion: # FIXME: LLMChatCompletion is an object from the OpenAI API, which causes a validation error
+ """Non-streaming synchronous call."""
+ try:
+ response = self.client.messages.create(**args)
+ logger.debug("Anthropic sync non-stream call succeeded.")
+ return LLMChatCompletion(
+ **self._convert_to_chat_completion(response)
+ )
+ except Exception as e:
+ logger.error(f"Anthropic sync call failed: {e}")
+ raise
+
+ def _execute_task_sync_streaming(
+ self, args: dict[str, Any]
+ ) -> Generator[dict[str, Any], None, None]:
+ """
+ Synchronous streaming call: yields partial tokens in a generator.
+ """
+ args.pop("stream", None)
+ try:
+ with self.client.messages.stream(**args) as stream:
+ buffer_data: dict[str, Any] = {
+ "tool_json_buffer": "",
+ "tool_name": None,
+ "tool_id": None,
+ "is_collecting_tool": False,
+ "thinking_buffer": "",
+ "is_collecting_thinking": False,
+ "thinking_signature": None,
+ "message_id": f"chatcmpl-{int(time.time())}",
+ }
+ model_name = args.get("model", "anthropic/claude-2")
+ if isinstance(model_name, str):
+ model_name = model_name.split("anthropic/")[-1]
+
+ for event in stream:
+ yield from self._process_stream_event(
+ event=event,
+ buffer_data=buffer_data,
+ model_name=model_name.split("anthropic/")[-1],
+ )
+ except Exception as e:
+ logger.error(f"Anthropic sync streaming call failed: {e}")
+ raise
+
+ def _process_stream_event(
+ self, event: Any, buffer_data: dict[str, Any], model_name: str
+ ) -> list[dict[str, Any]]:
+ chunks: list[dict[str, Any]] = []
+
+ def make_base_chunk() -> dict[str, Any]:
+ return {
+ "id": buffer_data["message_id"],
+ "object": "chat.completion.chunk",
+ "created": int(time.time()),
+ "model": model_name,
+ "choices": [{"index": 0, "delta": {}, "finish_reason": None}],
+ }
+
+ if isinstance(event, RawMessageStartEvent):
+ buffer_data["message_id"] = event.message.id
+ chunk = make_base_chunk()
+ input_tokens = (
+ event.message.usage.input_tokens if event.message.usage else 0
+ )
+ chunk["usage"] = {
+ "prompt_tokens": input_tokens,
+ "completion_tokens": 0,
+ "total_tokens": input_tokens,
+ }
+ chunks.append(chunk)
+
+ elif isinstance(event, RawContentBlockStartEvent):
+ if hasattr(event.content_block, "type"):
+ block_type = event.content_block.type
+ if block_type == "thinking":
+ buffer_data["is_collecting_thinking"] = True
+ buffer_data["thinking_buffer"] = ""
+ # Don't emit anything yet
+ elif block_type == "tool_use" or isinstance(
+ event.content_block, ToolUseBlock
+ ):
+ buffer_data["tool_name"] = event.content_block.name # type: ignore
+ buffer_data["tool_id"] = event.content_block.id # type: ignore
+ buffer_data["tool_json_buffer"] = ""
+ buffer_data["is_collecting_tool"] = True
+
+ elif isinstance(event, RawContentBlockDeltaEvent):
+ delta_obj = getattr(event, "delta", None)
+ delta_type = getattr(delta_obj, "type", None)
+
+ # Handle thinking deltas
+ if delta_type == "thinking_delta" and hasattr(
+ delta_obj, "thinking"
+ ):
+ thinking_chunk = delta_obj.thinking # type: ignore
+ if buffer_data["is_collecting_thinking"]:
+ buffer_data["thinking_buffer"] += thinking_chunk
+ # Stream thinking chunks as they come in
+ chunk = make_base_chunk()
+ chunk["choices"][0]["delta"] = {"thinking": thinking_chunk}
+ chunks.append(chunk)
+
+ # Handle signature deltas for thinking blocks
+ elif delta_type == "signature_delta" and hasattr(
+ delta_obj, "signature"
+ ):
+ if buffer_data["is_collecting_thinking"]:
+ buffer_data["thinking_signature"] = delta_obj.signature # type: ignore
+ # No need to emit anything for the signature
+ chunk = make_base_chunk()
+ chunk["choices"][0]["delta"] = {
+ "thinking_signature": delta_obj.signature # type: ignore
+ }
+ chunks.append(chunk)
+
+ # Handle text deltas
+ elif delta_type == "text_delta" and hasattr(delta_obj, "text"):
+ text_chunk = delta_obj.text # type: ignore
+ if not buffer_data["is_collecting_tool"] and text_chunk:
+ chunk = make_base_chunk()
+ chunk["choices"][0]["delta"] = {"content": text_chunk}
+ chunks.append(chunk)
+
+ # Handle partial JSON for tools
+ elif hasattr(delta_obj, "partial_json"):
+ if buffer_data["is_collecting_tool"]:
+ buffer_data["tool_json_buffer"] += delta_obj.partial_json # type: ignore
+
+ elif isinstance(event, ContentBlockStopEvent):
+ # Handle the end of a thinking block
+ if buffer_data.get("is_collecting_thinking"):
+ # Emit a special "structured_content_delta" with the complete thinking block
+ if (
+ buffer_data["thinking_buffer"]
+ and buffer_data["thinking_signature"]
+ ):
+ chunk = make_base_chunk()
+ chunk["choices"][0]["delta"] = {
+ "structured_content": [
+ {
+ "type": "thinking",
+ "thinking": buffer_data["thinking_buffer"],
+ "signature": buffer_data["thinking_signature"],
+ }
+ ]
+ }
+ chunks.append(chunk)
+
+ # Reset thinking collection
+ buffer_data["is_collecting_thinking"] = False
+ buffer_data["thinking_buffer"] = ""
+ buffer_data["thinking_signature"] = None
+
+ # Handle the end of a tool use block
+ elif buffer_data.get("is_collecting_tool"):
+ try:
+ json.loads(buffer_data["tool_json_buffer"])
+ chunk = make_base_chunk()
+ chunk["choices"][0]["delta"] = {
+ "tool_calls": [
+ {
+ "index": 0,
+ "type": "function",
+ "id": buffer_data["tool_id"]
+ or f"call_{generate_tool_id()}",
+ "function": {
+ "name": buffer_data["tool_name"],
+ "arguments": buffer_data[
+ "tool_json_buffer"
+ ],
+ },
+ }
+ ]
+ }
+ chunks.append(chunk)
+ buffer_data["is_collecting_tool"] = False
+ buffer_data["tool_json_buffer"] = ""
+ buffer_data["tool_name"] = None
+ buffer_data["tool_id"] = None
+ except json.JSONDecodeError:
+ logger.warning(
+ "Incomplete JSON in tool call, skipping chunk"
+ )
+
+ elif isinstance(event, MessageStopEvent):
+ # Check if the event has a message attribute before accessing it
+ stop_reason = getattr(event, "message", None)
+ if stop_reason and hasattr(stop_reason, "stop_reason"):
+ stop_reason = stop_reason.stop_reason
+ chunk = make_base_chunk()
+ if stop_reason == "tool_use":
+ chunk["choices"][0]["delta"] = {}
+ chunk["choices"][0]["finish_reason"] = "tool_calls"
+ else:
+ chunk["choices"][0]["delta"] = {}
+ chunk["choices"][0]["finish_reason"] = "stop"
+ chunks.append(chunk)
+ else:
+ # Handle the case where message is not available
+ chunk = make_base_chunk()
+ chunk["choices"][0]["delta"] = {}
+ chunk["choices"][0]["finish_reason"] = "stop"
+ chunks.append(chunk)
+
+ return chunks
diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/azure_foundry.py b/.venv/lib/python3.12/site-packages/core/providers/llm/azure_foundry.py
new file mode 100644
index 00000000..863e44ec
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/llm/azure_foundry.py
@@ -0,0 +1,110 @@
+import logging
+import os
+from typing import Any, Optional
+
+from azure.ai.inference import (
+ ChatCompletionsClient as AzureChatCompletionsClient,
+)
+from azure.ai.inference.aio import (
+ ChatCompletionsClient as AsyncAzureChatCompletionsClient,
+)
+from azure.core.credentials import AzureKeyCredential
+
+from core.base.abstractions import GenerationConfig
+from core.base.providers.llm import CompletionConfig, CompletionProvider
+
+logger = logging.getLogger(__name__)
+
+
+class AzureFoundryCompletionProvider(CompletionProvider):
+ def __init__(self, config: CompletionConfig, *args, **kwargs) -> None:
+ super().__init__(config)
+ self.azure_foundry_client: Optional[AzureChatCompletionsClient] = None
+ self.async_azure_foundry_client: Optional[
+ AsyncAzureChatCompletionsClient
+ ] = None
+
+ # Initialize Azure Foundry clients if credentials exist.
+ azure_foundry_api_key = os.getenv("AZURE_FOUNDRY_API_KEY")
+ azure_foundry_api_endpoint = os.getenv("AZURE_FOUNDRY_API_ENDPOINT")
+
+ if azure_foundry_api_key and azure_foundry_api_endpoint:
+ self.azure_foundry_client = AzureChatCompletionsClient(
+ endpoint=azure_foundry_api_endpoint,
+ credential=AzureKeyCredential(azure_foundry_api_key),
+ api_version=os.getenv(
+ "AZURE_FOUNDRY_API_VERSION", "2024-05-01-preview"
+ ),
+ )
+ self.async_azure_foundry_client = AsyncAzureChatCompletionsClient(
+ endpoint=azure_foundry_api_endpoint,
+ credential=AzureKeyCredential(azure_foundry_api_key),
+ api_version=os.getenv(
+ "AZURE_FOUNDRY_API_VERSION", "2024-05-01-preview"
+ ),
+ )
+ logger.debug("Azure Foundry clients initialized successfully")
+
+ def _get_base_args(
+ self, generation_config: GenerationConfig
+ ) -> dict[str, Any]:
+ # Construct arguments similar to the other providers.
+ args: dict[str, Any] = {
+ "top_p": generation_config.top_p,
+ "stream": generation_config.stream,
+ "max_tokens": generation_config.max_tokens_to_sample,
+ "temperature": generation_config.temperature,
+ }
+
+ if generation_config.functions is not None:
+ args["functions"] = generation_config.functions
+ if generation_config.tools is not None:
+ args["tools"] = generation_config.tools
+ if generation_config.response_format is not None:
+ args["response_format"] = generation_config.response_format
+ return args
+
+ async def _execute_task(self, task: dict[str, Any]):
+ messages = task["messages"]
+ generation_config = task["generation_config"]
+ kwargs = task["kwargs"]
+
+ args = self._get_base_args(generation_config)
+ # Azure Foundry does not require a "model" argument; the endpoint is fixed.
+ args["messages"] = messages
+ args = {**args, **kwargs}
+ logger.debug(f"Executing async Azure Foundry task with args: {args}")
+
+ try:
+ if self.async_azure_foundry_client is None:
+ raise ValueError("Azure Foundry client is not initialized")
+
+ response = await self.async_azure_foundry_client.complete(**args)
+ logger.debug("Async Azure Foundry task executed successfully")
+ return response
+ except Exception as e:
+ logger.error(
+ f"Async Azure Foundry task execution failed: {str(e)}"
+ )
+ raise
+
+ def _execute_task_sync(self, task: dict[str, Any]):
+ messages = task["messages"]
+ generation_config = task["generation_config"]
+ kwargs = task["kwargs"]
+
+ args = self._get_base_args(generation_config)
+ args["messages"] = messages
+ args = {**args, **kwargs}
+ logger.debug(f"Executing sync Azure Foundry task with args: {args}")
+
+ try:
+ if self.azure_foundry_client is None:
+ raise ValueError("Azure Foundry client is not initialized")
+
+ response = self.azure_foundry_client.complete(**args)
+ logger.debug("Sync Azure Foundry task executed successfully")
+ return response
+ except Exception as e:
+ logger.error(f"Sync Azure Foundry task execution failed: {str(e)}")
+ raise
diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/litellm.py b/.venv/lib/python3.12/site-packages/core/providers/llm/litellm.py
new file mode 100644
index 00000000..44d467c2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/llm/litellm.py
@@ -0,0 +1,80 @@
+import logging
+from typing import Any
+
+import litellm
+from litellm import acompletion, completion
+
+from core.base.abstractions import GenerationConfig
+from core.base.providers.llm import CompletionConfig, CompletionProvider
+
+logger = logging.getLogger()
+
+
+class LiteLLMCompletionProvider(CompletionProvider):
+ def __init__(self, config: CompletionConfig, *args, **kwargs) -> None:
+ super().__init__(config)
+ litellm.modify_params = True
+ self.acompletion = acompletion
+ self.completion = completion
+
+ # if config.provider != "litellm":
+ # logger.error(f"Invalid provider: {config.provider}")
+ # raise ValueError(
+ # "LiteLLMCompletionProvider must be initialized with config with `litellm` provider."
+ # )
+
+ def _get_base_args(
+ self, generation_config: GenerationConfig
+ ) -> dict[str, Any]:
+ args: dict[str, Any] = {
+ "model": generation_config.model,
+ "temperature": generation_config.temperature,
+ "top_p": generation_config.top_p,
+ "stream": generation_config.stream,
+ "max_tokens": generation_config.max_tokens_to_sample,
+ "api_base": generation_config.api_base,
+ }
+
+ # Fix the type errors by properly typing these assignments
+ if generation_config.functions is not None:
+ args["functions"] = generation_config.functions
+ if generation_config.tools is not None:
+ args["tools"] = generation_config.tools
+ if generation_config.response_format is not None:
+ args["response_format"] = generation_config.response_format
+
+ return args
+
+ async def _execute_task(self, task: dict[str, Any]):
+ messages = task["messages"]
+ generation_config = task["generation_config"]
+ kwargs = task["kwargs"]
+
+ args = self._get_base_args(generation_config)
+ args["messages"] = messages
+ args = {**args, **kwargs}
+
+ logger.debug(
+ f"Executing LiteLLM task with generation_config={generation_config}"
+ )
+
+ return await self.acompletion(**args)
+
+ def _execute_task_sync(self, task: dict[str, Any]):
+ messages = task["messages"]
+ generation_config = task["generation_config"]
+ kwargs = task["kwargs"]
+
+ args = self._get_base_args(generation_config)
+ args["messages"] = messages
+ args = {**args, **kwargs}
+
+ logger.debug(
+ f"Executing LiteLLM task with generation_config={generation_config}"
+ )
+
+ try:
+ return self.completion(**args)
+ except Exception as e:
+ logger.error(f"Sync LiteLLM task execution failed: {str(e)}")
+ raise
diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/openai.py b/.venv/lib/python3.12/site-packages/core/providers/llm/openai.py
new file mode 100644
index 00000000..30ef37ab
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/llm/openai.py
@@ -0,0 +1,522 @@
+import logging
+import os
+from typing import Any
+
+from openai import AsyncAzureOpenAI, AsyncOpenAI, OpenAI
+
+from core.base.abstractions import GenerationConfig
+from core.base.providers.llm import CompletionConfig, CompletionProvider
+
+from .utils import resize_base64_image
+
+logger = logging.getLogger()
+
+
+class OpenAICompletionProvider(CompletionProvider):
+ def __init__(self, config: CompletionConfig, *args, **kwargs) -> None:
+ super().__init__(config)
+ self.openai_client = None
+ self.async_openai_client = None
+ self.azure_client = None
+ self.async_azure_client = None
+ self.deepseek_client = None
+ self.async_deepseek_client = None
+ self.ollama_client = None
+ self.async_ollama_client = None
+ self.lmstudio_client = None
+ self.async_lmstudio_client = None
+ # NEW: Azure Foundry clients using the Azure Inference API
+ self.azure_foundry_client = None
+ self.async_azure_foundry_client = None
+
+ # Initialize OpenAI clients if credentials exist
+ if os.getenv("OPENAI_API_KEY"):
+ self.openai_client = OpenAI()
+ self.async_openai_client = AsyncOpenAI()
+ logger.debug("OpenAI clients initialized successfully")
+
+ # Initialize Azure OpenAI clients if credentials exist
+ azure_api_key = os.getenv("AZURE_API_KEY")
+ azure_api_base = os.getenv("AZURE_API_BASE")
+ if azure_api_key and azure_api_base:
+ self.azure_client = AsyncAzureOpenAI(
+ api_key=azure_api_key,
+ api_version=os.getenv(
+ "AZURE_API_VERSION", "2024-02-15-preview"
+ ),
+ azure_endpoint=azure_api_base,
+ )
+ self.async_azure_client = AsyncAzureOpenAI(
+ api_key=azure_api_key,
+ api_version=os.getenv(
+ "AZURE_API_VERSION", "2024-02-15-preview"
+ ),
+ azure_endpoint=azure_api_base,
+ )
+ logger.debug("Azure OpenAI clients initialized successfully")
+
+ # Initialize Deepseek clients if credentials exist
+ deepseek_api_key = os.getenv("DEEPSEEK_API_KEY")
+ deepseek_api_base = os.getenv(
+ "DEEPSEEK_API_BASE", "https://api.deepseek.com"
+ )
+ if deepseek_api_key and deepseek_api_base:
+ self.deepseek_client = OpenAI(
+ api_key=deepseek_api_key,
+ base_url=deepseek_api_base,
+ )
+ self.async_deepseek_client = AsyncOpenAI(
+ api_key=deepseek_api_key,
+ base_url=deepseek_api_base,
+ )
+ logger.debug("Deepseek OpenAI clients initialized successfully")
+
+ # Initialize Ollama clients with default API key
+ ollama_api_base = os.getenv(
+ "OLLAMA_API_BASE", "http://localhost:11434/v1"
+ )
+ if ollama_api_base:
+ self.ollama_client = OpenAI(
+ api_key=os.getenv("OLLAMA_API_KEY", "dummy"),
+ base_url=ollama_api_base,
+ )
+ self.async_ollama_client = AsyncOpenAI(
+ api_key=os.getenv("OLLAMA_API_KEY", "dummy"),
+ base_url=ollama_api_base,
+ )
+ logger.debug("Ollama OpenAI clients initialized successfully")
+
+ # Initialize LMStudio clients
+ lmstudio_api_base = os.getenv(
+ "LMSTUDIO_API_BASE", "http://localhost:1234/v1"
+ )
+ if lmstudio_api_base:
+ self.lmstudio_client = OpenAI(
+ api_key=os.getenv("LMSTUDIO_API_KEY", "lm-studio"),
+ base_url=lmstudio_api_base,
+ )
+ self.async_lmstudio_client = AsyncOpenAI(
+ api_key=os.getenv("LMSTUDIO_API_KEY", "lm-studio"),
+ base_url=lmstudio_api_base,
+ )
+ logger.debug("LMStudio OpenAI clients initialized successfully")
+
+ # Initialize Azure Foundry clients if credentials exist.
+ # These use the Azure Inference API (currently pasted into this handler).
+ azure_foundry_api_key = os.getenv("AZURE_FOUNDRY_API_KEY")
+ azure_foundry_api_endpoint = os.getenv("AZURE_FOUNDRY_API_ENDPOINT")
+ if azure_foundry_api_key and azure_foundry_api_endpoint:
+ from azure.ai.inference import (
+ ChatCompletionsClient as AzureChatCompletionsClient,
+ )
+ from azure.ai.inference.aio import (
+ ChatCompletionsClient as AsyncAzureChatCompletionsClient,
+ )
+ from azure.core.credentials import AzureKeyCredential
+
+ self.azure_foundry_client = AzureChatCompletionsClient(
+ endpoint=azure_foundry_api_endpoint,
+ credential=AzureKeyCredential(azure_foundry_api_key),
+ api_version=os.getenv(
+ "AZURE_FOUNDRY_API_VERSION", "2024-05-01-preview"
+ ),
+ )
+ self.async_azure_foundry_client = AsyncAzureChatCompletionsClient(
+ endpoint=azure_foundry_api_endpoint,
+ credential=AzureKeyCredential(azure_foundry_api_key),
+ api_version=os.getenv(
+ "AZURE_FOUNDRY_API_VERSION", "2024-05-01-preview"
+ ),
+ )
+ logger.debug("Azure Foundry clients initialized successfully")
+
+ if not any(
+ [
+ self.openai_client,
+ self.azure_client,
+ self.ollama_client,
+ self.lmstudio_client,
+ self.azure_foundry_client,
+ ]
+ ):
+ raise ValueError(
+ "No valid client credentials found. Please set either OPENAI_API_KEY, "
+ "both AZURE_API_KEY and AZURE_API_BASE environment variables, "
+ "OLLAMA_API_BASE, LMSTUDIO_API_BASE, or AZURE_FOUNDRY_API_KEY and AZURE_FOUNDRY_API_ENDPOINT."
+ )
+
+ def _get_client_and_model(self, model: str):
+ """Determine which client to use based on model prefix and return the
+ appropriate client and model name."""
+ if model.startswith("azure/"):
+ if not self.azure_client:
+ raise ValueError(
+ "Azure OpenAI credentials not configured but azure/ model prefix used"
+ )
+ return self.azure_client, model[6:] # Strip 'azure/' prefix
+ elif model.startswith("openai/"):
+ if not self.openai_client:
+ raise ValueError(
+ "OpenAI credentials not configured but openai/ model prefix used"
+ )
+ return self.openai_client, model[7:] # Strip 'openai/' prefix
+ elif model.startswith("deepseek/"):
+ if not self.deepseek_client:
+ raise ValueError(
+ "Deepseek OpenAI credentials not configured but deepseek/ model prefix used"
+ )
+ return self.deepseek_client, model[9:] # Strip 'deepseek/' prefix
+ elif model.startswith("ollama/"):
+ if not self.ollama_client:
+ raise ValueError(
+ "Ollama OpenAI credentials not configured but ollama/ model prefix used"
+ )
+ return self.ollama_client, model[7:] # Strip 'ollama/' prefix
+ elif model.startswith("lmstudio/"):
+ if not self.lmstudio_client:
+ raise ValueError(
+ "LMStudio credentials not configured but lmstudio/ model prefix used"
+ )
+ return self.lmstudio_client, model[9:] # Strip 'lmstudio/' prefix
+ elif model.startswith("azure-foundry/"):
+ if not self.azure_foundry_client:
+ raise ValueError(
+ "Azure Foundry credentials not configured but azure-foundry/ model prefix used"
+ )
+ return (
+ self.azure_foundry_client,
+ model[14:],
+ ) # Strip 'azure-foundry/' prefix
+ else:
+ # Default to OpenAI if no prefix is provided.
+ if self.openai_client:
+ return self.openai_client, model
+ elif self.azure_client:
+ return self.azure_client, model
+ elif self.ollama_client:
+ return self.ollama_client, model
+ elif self.lmstudio_client:
+ return self.lmstudio_client, model
+ elif self.azure_foundry_client:
+ return self.azure_foundry_client, model
+ else:
+ raise ValueError("No valid client available for model prefix")
+
+ def _get_async_client_and_model(self, model: str):
+ """Get async client and model name based on prefix."""
+ if model.startswith("azure/"):
+ if not self.async_azure_client:
+ raise ValueError(
+ "Azure OpenAI credentials not configured but azure/ model prefix used"
+ )
+ return self.async_azure_client, model[6:]
+ elif model.startswith("openai/"):
+ if not self.async_openai_client:
+ raise ValueError(
+ "OpenAI credentials not configured but openai/ model prefix used"
+ )
+ return self.async_openai_client, model[7:]
+ elif model.startswith("deepseek/"):
+ if not self.async_deepseek_client:
+ raise ValueError(
+ "Deepseek OpenAI credentials not configured but deepseek/ model prefix used"
+ )
+ return self.async_deepseek_client, model[9:].strip()
+ elif model.startswith("ollama/"):
+ if not self.async_ollama_client:
+ raise ValueError(
+ "Ollama OpenAI credentials not configured but ollama/ model prefix used"
+ )
+ return self.async_ollama_client, model[7:]
+ elif model.startswith("lmstudio/"):
+ if not self.async_lmstudio_client:
+ raise ValueError(
+ "LMStudio credentials not configured but lmstudio/ model prefix used"
+ )
+ return self.async_lmstudio_client, model[9:]
+ elif model.startswith("azure-foundry/"):
+ if not self.async_azure_foundry_client:
+ raise ValueError(
+ "Azure Foundry credentials not configured but azure-foundry/ model prefix used"
+ )
+ return self.async_azure_foundry_client, model[14:]
+ else:
+ if self.async_openai_client:
+ return self.async_openai_client, model
+ elif self.async_azure_client:
+ return self.async_azure_client, model
+ elif self.async_ollama_client:
+ return self.async_ollama_client, model
+ elif self.async_lmstudio_client:
+ return self.async_lmstudio_client, model
+ elif self.async_azure_foundry_client:
+ return self.async_azure_foundry_client, model
+ else:
+ raise ValueError(
+ "No valid async client available for model prefix"
+ )
+
+ def _process_messages_with_images(
+ self, messages: list[dict]
+ ) -> list[dict]:
+ """
+ Process messages that may contain image_url or image_data fields.
+ Now includes aggressive image resizing similar to Anthropic provider.
+ """
+ processed_messages = []
+
+ for msg in messages:
+ if msg.get("role") == "system":
+ # System messages don't support content arrays in OpenAI
+ processed_messages.append(msg)
+ continue
+
+ # Check if the message contains image data
+ image_url = msg.pop("image_url", None)
+ image_data = msg.pop("image_data", None)
+ content = msg.get("content")
+
+ if image_url or image_data:
+ # Convert to content array format
+ new_content = []
+
+ # Add image content
+ if image_url:
+ new_content.append(
+ {"type": "image_url", "image_url": {"url": image_url}}
+ )
+ elif image_data:
+ # Resize the base64 image data if available
+ media_type = image_data.get("media_type", "image/jpeg")
+ data = image_data.get("data", "")
+
+ # Apply image resizing if PIL is available
+ if data:
+ data = resize_base64_image(data)
+ logger.debug(
+ f"Image resized, new size: {len(data)} chars"
+ )
+
+ # OpenAI expects base64 images in data URL format
+ data_url = f"data:{media_type};base64,{data}"
+ new_content.append(
+ {"type": "image_url", "image_url": {"url": data_url}}
+ )
+
+ # Add text content if present
+ if content:
+ new_content.append({"type": "text", "text": content})
+
+ # Update the message
+ new_msg = dict(msg)
+ new_msg["content"] = new_content
+ processed_messages.append(new_msg)
+ else:
+ processed_messages.append(msg)
+
+ return processed_messages
+
+ def _process_array_content_with_images(self, content: list) -> list:
+ """
+ Process content array that may contain image_url items.
+ Used for messages that already have content in array format.
+ """
+ if not content or not isinstance(content, list):
+ return content
+
+ processed_content = []
+
+ for item in content:
+ if isinstance(item, dict):
+ if item.get("type") == "image_url":
+ # Process image URL if needed
+ processed_content.append(item)
+ elif item.get("type") == "image" and item.get("source"):
+ # Convert Anthropic-style to OpenAI-style
+ source = item.get("source", {})
+ if source.get("type") == "base64" and source.get("data"):
+ # Resize the base64 image data
+ resized_data = resize_base64_image(source.get("data"))
+
+ media_type = source.get("media_type", "image/jpeg")
+ data_url = f"data:{media_type};base64,{resized_data}"
+
+ processed_content.append(
+ {
+ "type": "image_url",
+ "image_url": {"url": data_url},
+ }
+ )
+ elif source.get("type") == "url" and source.get("url"):
+ processed_content.append(
+ {
+ "type": "image_url",
+ "image_url": {"url": source.get("url")},
+ }
+ )
+ else:
+ # Pass through other types
+ processed_content.append(item)
+ else:
+ processed_content.append(item)
+
+ return processed_content
+
+ def _preprocess_messages(self, messages: list[dict]) -> list[dict]:
+ """
+ Preprocess all messages to optimize images before sending to OpenAI API.
+ """
+ if not messages or not isinstance(messages, list):
+ return messages
+
+ processed_messages = []
+
+ for msg in messages:
+ # Skip system messages as they're handled separately
+ if msg.get("role") == "system":
+ processed_messages.append(msg)
+ continue
+
+ # Process array-format content (might contain images)
+ if isinstance(msg.get("content"), list):
+ new_msg = dict(msg)
+ new_msg["content"] = self._process_array_content_with_images(
+ msg["content"]
+ )
+ processed_messages.append(new_msg)
+ else:
+ # Standard processing for non-array content
+ processed_messages.append(msg)
+
+ return processed_messages
+
+ def _get_base_args(self, generation_config: GenerationConfig) -> dict:
+ # Keep existing implementation...
+ args: dict[str, Any] = {
+ "model": generation_config.model,
+ "stream": generation_config.stream,
+ }
+
+ model_str = generation_config.model or ""
+
+ if "o1" not in model_str and "o3" not in model_str:
+ args["max_tokens"] = generation_config.max_tokens_to_sample
+ args["temperature"] = generation_config.temperature
+ args["top_p"] = generation_config.top_p
+ else:
+ args["max_completion_tokens"] = (
+ generation_config.max_tokens_to_sample
+ )
+
+ if generation_config.reasoning_effort is not None:
+ args["reasoning_effort"] = generation_config.reasoning_effort
+ if generation_config.functions is not None:
+ args["functions"] = generation_config.functions
+ if generation_config.tools is not None:
+ args["tools"] = generation_config.tools
+ if generation_config.response_format is not None:
+ args["response_format"] = generation_config.response_format
+ return args
+
+ async def _execute_task(self, task: dict[str, Any]):
+ messages = task["messages"]
+ generation_config = task["generation_config"]
+ kwargs = task["kwargs"]
+
+ # First preprocess to handle any images in array format
+ messages = self._preprocess_messages(messages)
+
+ # Then process messages with direct image_url or image_data fields
+ processed_messages = self._process_messages_with_images(messages)
+
+ args = self._get_base_args(generation_config)
+ client, model_name = self._get_async_client_and_model(args["model"])
+ args["model"] = model_name
+ args["messages"] = processed_messages
+ args = {**args, **kwargs}
+
+ # Check if we're using a vision-capable model when images are present
+ contains_images = any(
+ isinstance(msg.get("content"), list)
+ and any(
+ item.get("type") == "image_url"
+ for item in msg.get("content", [])
+ )
+ for msg in processed_messages
+ )
+
+ if contains_images:
+ vision_models = ["gpt-4-vision", "gpt-4o"]
+ if all(
+ vision_model in model_name for vision_model in vision_models
+ ):
+ logger.warning(
+ f"Using model {model_name} with images, but it may not support vision"
+ )
+
+ logger.debug(f"Executing async task with args: {args}")
+ try:
+ # Same as before...
+ if client == self.async_azure_foundry_client:
+ model_value = args.pop(
+ "model"
+ ) # Remove model before passing args
+ response = await client.complete(**args)
+ else:
+ response = await client.chat.completions.create(**args)
+ logger.debug("Async task executed successfully")
+ return response
+ except Exception as e:
+ logger.error(f"Async task execution failed: {str(e)}")
+ # HACK: print the exception to the console for debugging
+ raise
+
+ def _execute_task_sync(self, task: dict[str, Any]):
+ messages = task["messages"]
+ generation_config = task["generation_config"]
+ kwargs = task["kwargs"]
+
+ # First preprocess to handle any images in array format
+ messages = self._preprocess_messages(messages)
+
+ # Then process messages with direct image_url or image_data fields
+ processed_messages = self._process_messages_with_images(messages)
+
+ args = self._get_base_args(generation_config)
+ client, model_name = self._get_client_and_model(args["model"])
+ args["model"] = model_name
+ args["messages"] = processed_messages
+ args = {**args, **kwargs}
+
+ # Same vision model check as in async version
+ contains_images = any(
+ isinstance(msg.get("content"), list)
+ and any(
+ item.get("type") == "image_url"
+ for item in msg.get("content", [])
+ )
+ for msg in processed_messages
+ )
+
+ if contains_images:
+ vision_models = ["gpt-4-vision", "gpt-4o"]
+ if all(
+ vision_model in model_name for vision_model in vision_models
+ ):
+ logger.warning(
+ f"Using model {model_name} with images, but it may not support vision"
+ )
+
+ logger.debug(f"Executing sync OpenAI task with args: {args}")
+ try:
+ # Same as before...
+ if client == self.azure_foundry_client:
+ args.pop("model")
+ response = client.complete(**args)
+ else:
+ response = client.chat.completions.create(**args)
+ logger.debug("Sync task executed successfully")
+ return response
+ except Exception as e:
+ logger.error(f"Sync task execution failed: {str(e)}")
+ raise
diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/r2r_llm.py b/.venv/lib/python3.12/site-packages/core/providers/llm/r2r_llm.py
new file mode 100644
index 00000000..b95b310a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/llm/r2r_llm.py
@@ -0,0 +1,96 @@
+import logging
+from typing import Any
+
+from core.base.abstractions import GenerationConfig
+from core.base.providers.llm import CompletionConfig, CompletionProvider
+
+from .anthropic import AnthropicCompletionProvider
+from .azure_foundry import AzureFoundryCompletionProvider
+from .litellm import LiteLLMCompletionProvider
+from .openai import OpenAICompletionProvider
+
+logger = logging.getLogger(__name__)
+
+
+class R2RCompletionProvider(CompletionProvider):
+ """A provider that routes to the right LLM provider (R2R):
+
+ - If `generation_config.model` starts with "anthropic/", call AnthropicCompletionProvider.
+ - If it starts with "azure-foundry/", call AzureFoundryCompletionProvider.
+ - If it starts with one of the other OpenAI-like prefixes ("openai/", "azure/", "deepseek/", "ollama/", "lmstudio/")
+ or has no prefix (e.g. "gpt-4", "gpt-3.5"), call OpenAICompletionProvider.
+ - Otherwise, fallback to LiteLLMCompletionProvider.
+ """
+
+ def __init__(self, config: CompletionConfig, *args, **kwargs) -> None:
+ """Initialize sub-providers for OpenAI, Anthropic, LiteLLM, and Azure
+ Foundry."""
+ super().__init__(config)
+ self.config = config
+
+ logger.info("Initializing R2RCompletionProvider...")
+ self._openai_provider = OpenAICompletionProvider(
+ self.config, *args, **kwargs
+ )
+ self._anthropic_provider = AnthropicCompletionProvider(
+ self.config, *args, **kwargs
+ )
+ self._litellm_provider = LiteLLMCompletionProvider(
+ self.config, *args, **kwargs
+ )
+ self._azure_foundry_provider = AzureFoundryCompletionProvider(
+ self.config, *args, **kwargs
+ ) # New provider
+
+ logger.debug(
+ "R2RCompletionProvider initialized with OpenAI, Anthropic, LiteLLM, and Azure Foundry sub-providers."
+ )
+
+ def _choose_subprovider_by_model(
+ self, model_name: str, is_streaming: bool = False
+ ) -> CompletionProvider:
+ """Decide which underlying sub-provider to call based on the model name
+ (prefix)."""
+ # Route to Anthropic if appropriate.
+ if model_name.startswith("anthropic/"):
+ return self._anthropic_provider
+
+ # Route to Azure Foundry explicitly.
+ if model_name.startswith("azure-foundry/"):
+ return self._azure_foundry_provider
+
+ # OpenAI-like prefixes.
+ openai_like_prefixes = [
+ "openai/",
+ "azure/",
+ "deepseek/",
+ "ollama/",
+ "lmstudio/",
+ ]
+ if (
+ any(
+ model_name.startswith(prefix)
+ for prefix in openai_like_prefixes
+ )
+ or "/" not in model_name
+ ):
+ return self._openai_provider
+
+ # Fallback to LiteLLM.
+ return self._litellm_provider
+
+ async def _execute_task(self, task: dict[str, Any]):
+ """Pick the sub-provider based on model name and forward the async
+ call."""
+ generation_config: GenerationConfig = task["generation_config"]
+ model_name = generation_config.model
+ sub_provider = self._choose_subprovider_by_model(model_name or "")
+ return await sub_provider._execute_task(task)
+
+ def _execute_task_sync(self, task: dict[str, Any]):
+ """Pick the sub-provider based on model name and forward the sync
+ call."""
+ generation_config: GenerationConfig = task["generation_config"]
+ model_name = generation_config.model
+ sub_provider = self._choose_subprovider_by_model(model_name or "")
+ return sub_provider._execute_task_sync(task)
diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/utils.py b/.venv/lib/python3.12/site-packages/core/providers/llm/utils.py
new file mode 100644
index 00000000..619b2e73
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/llm/utils.py
@@ -0,0 +1,106 @@
+import base64
+import io
+import logging
+from typing import Tuple
+
+from PIL import Image
+
+logger = logging.getLogger()
+
+
+def resize_base64_image(
+ base64_string: str,
+ max_size: Tuple[int, int] = (512, 512),
+ max_megapixels: float = 0.25,
+) -> str:
+ """Aggressively resize images with better error handling and debug output"""
+ logger.debug(
+ f"RESIZING NOW!!! Original length: {len(base64_string)} chars"
+ )
+
+ # Decode base64 string to bytes
+ try:
+ image_data = base64.b64decode(base64_string)
+ image = Image.open(io.BytesIO(image_data))
+ logger.debug(f"Image opened successfully: {image.format} {image.size}")
+ except Exception as e:
+ logger.debug(f"Failed to decode/open image: {e}")
+ # Emergency fallback - truncate the base64 string to reduce tokens
+ if len(base64_string) > 50000:
+ return base64_string[:50000]
+ return base64_string
+
+ try:
+ width, height = image.size
+ current_megapixels = (width * height) / 1_000_000
+ logger.debug(
+ f"Original dimensions: {width}x{height} ({current_megapixels:.2f} MP)"
+ )
+
+ # MUCH more aggressive resizing for large images
+ if current_megapixels > 0.5:
+ max_size = (384, 384)
+ max_megapixels = 0.15
+ logger.debug("Large image detected! Using more aggressive limits")
+
+ # Calculate new dimensions with strict enforcement
+ # Always resize if the image is larger than we want
+ scale_factor = min(
+ max_size[0] / width,
+ max_size[1] / height,
+ (max_megapixels / current_megapixels) ** 0.5,
+ )
+
+ if scale_factor >= 1.0:
+ # No resize needed, but still compress
+ new_width, new_height = width, height
+ else:
+ # Apply scaling
+ new_width = max(int(width * scale_factor), 64) # Min width
+ new_height = max(int(height * scale_factor), 64) # Min height
+
+ # Always resize/recompress the image
+ logger.debug(f"Resizing to: {new_width}x{new_height}")
+ resized_image = image.resize((new_width, new_height), Image.LANCZOS) # type: ignore
+
+ # Convert back to base64 with strong compression
+ buffer = io.BytesIO()
+ if image.format == "JPEG" or image.format is None:
+ # Apply very aggressive JPEG compression
+ quality = 50 # Very low quality to reduce size
+ resized_image.save(
+ buffer, format="JPEG", quality=quality, optimize=True
+ )
+ else:
+ # For other formats
+ resized_image.save(
+ buffer, format=image.format or "PNG", optimize=True
+ )
+
+ resized_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8")
+
+ logger.debug(
+ f"Resized base64 length: {len(resized_base64)} chars (reduction: {100 * (1 - len(resized_base64) / len(base64_string)):.1f}%)"
+ )
+ return resized_base64
+
+ except Exception as e:
+ logger.debug(f"Error during resize: {e}")
+ # If anything goes wrong, truncate the base64 to a reasonable size
+ if len(base64_string) > 50000:
+ return base64_string[:50000]
+ return base64_string
+
+
+def estimate_image_tokens(width: int, height: int) -> int:
+ """
+ Estimate the number of tokens an image will use based on Anthropic's formula.
+
+ Args:
+ width: Image width in pixels
+ height: Image height in pixels
+
+ Returns:
+ Estimated number of tokens
+ """
+ return int((width * height) / 750)