diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/providers/llm')
7 files changed, 1850 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/__init__.py b/.venv/lib/python3.12/site-packages/core/providers/llm/__init__.py new file mode 100644 index 00000000..8132e11c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/providers/llm/__init__.py @@ -0,0 +1,11 @@ +from .anthropic import AnthropicCompletionProvider +from .litellm import LiteLLMCompletionProvider +from .openai import OpenAICompletionProvider +from .r2r_llm import R2RCompletionProvider + +__all__ = [ + "AnthropicCompletionProvider", + "LiteLLMCompletionProvider", + "OpenAICompletionProvider", + "R2RCompletionProvider", +] diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/anthropic.py b/.venv/lib/python3.12/site-packages/core/providers/llm/anthropic.py new file mode 100644 index 00000000..0089a207 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/providers/llm/anthropic.py @@ -0,0 +1,925 @@ +import copy +import json +import logging +import os +import time +import uuid +from typing import ( + Any, + AsyncGenerator, + Generator, + Optional, +) + +from anthropic import Anthropic, AsyncAnthropic +from anthropic.types import ( + ContentBlockStopEvent, + Message, + MessageStopEvent, + RawContentBlockDeltaEvent, + RawContentBlockStartEvent, + RawMessageStartEvent, + ToolUseBlock, +) + +from core.base.abstractions import GenerationConfig, LLMChatCompletion +from core.base.providers.llm import CompletionConfig, CompletionProvider + +from .utils import resize_base64_image + +logger = logging.getLogger(__name__) + + +def generate_tool_id() -> str: + """Generate a unique tool ID using UUID4.""" + return f"tool_{uuid.uuid4().hex[:12]}" + + +def process_images_in_message(message: dict) -> dict: + """ + Process all images in a message to ensure they're within Anthropic's recommended limits. + """ + if not message or not isinstance(message, dict): + return message + + # Handle nested image_data (old format) + if ( + message.get("role") + and message.get("image_data") + and isinstance(message["image_data"], dict) + ): + if message["image_data"].get("data") and message["image_data"].get( + "media_type" + ): + message["image_data"]["data"] = resize_base64_image( + message["image_data"]["data"] + ) + return message + + # Handle standard content list format + if message.get("content") and isinstance(message["content"], list): + for i, block in enumerate(message["content"]): + if isinstance(block, dict) and block.get("type") == "image": + if block.get("source", {}).get("type") == "base64" and block[ + "source" + ].get("data"): + message["content"][i]["source"]["data"] = ( + resize_base64_image(block["source"]["data"]) + ) + + # Handle string content with base64 image detection (less common) + elif ( + message.get("content") + and isinstance(message["content"], str) + and ";base64," in message["content"] + ): + # This is a basic detection for base64 images in text - might need more robust handling + logger.warning( + "Detected potential base64 image in string content - not auto-resizing" + ) + + return message + + +def openai_message_to_anthropic_block(msg: dict) -> dict: + """Converts a single OpenAI-style message (including function/tool calls) + into one Anthropic-style message. + + Expected keys in `msg` can include: + - role: "system" | "assistant" | "user" | "function" | "tool" + - content: str (possibly JSON arguments or the final text) + - name: str (tool/function name) + - tool_call_id or function_call arguments + - function_call: {"name": ..., "arguments": "..."} + """ + role = msg.get("role", "") + content = msg.get("content", "") + tool_call_id = msg.get("tool_call_id") + + # Handle old-style image_data field + image_data = msg.get("image_data") + # Handle nested image_url (less common) + image_url = msg.get("image_url") + + if role == "system": + # System messages should not have images, extract any image to a separate user message + if image_url or image_data: + logger.warning( + "Found image in system message - images should be in user messages only" + ) + return msg + + if role in ["user", "assistant"]: + # If content is already a list, assume it's properly formatted + if isinstance(content, list): + return {"role": role, "content": content} + + # Process old-style image_data or image_url + if image_url or image_data: + formatted_content = [] + + # Add image content first (as recommended by Anthropic) + if image_url: + formatted_content.append( + { + "type": "image", + "source": {"type": "url", "url": image_url}, + } + ) + elif image_data: + # Resize the image data if needed + resized_data = image_data.get("data", "") + if resized_data: + resized_data = resize_base64_image(resized_data) + + formatted_content.append( + { + "type": "image", + "source": { + "type": "base64", + "media_type": image_data.get( + "media_type", "image/jpeg" + ), + "data": resized_data, + }, + } + ) + + # Add text content after the image + if content: + if isinstance(content, str): + formatted_content.append({"type": "text", "text": content}) + elif isinstance(content, list): + # If it's already a list, extend with it + formatted_content.extend(content) + + return {"role": role, "content": formatted_content} + + if role in ["function", "tool"]: + return { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": tool_call_id, + "content": content, + } + ], + } + + return {"role": role, "content": content} + + +class AnthropicCompletionProvider(CompletionProvider): + def __init__(self, config: CompletionConfig, *args, **kwargs) -> None: + super().__init__(config) + self.client = Anthropic() + self.async_client = AsyncAnthropic() + logger.debug("AnthropicCompletionProvider initialized successfully") + + def _get_base_args( + self, generation_config: GenerationConfig + ) -> dict[str, Any]: + """Build the arguments dictionary for Anthropic's messages.create(). + + Handles tool configuration according to Anthropic's schema: + { + "type": "function", # Use 'function' type for custom tools + "name": "tool_name", + "description": "tool description", + "parameters": { # Note: Anthropic expects 'parameters', not 'input_schema' + "type": "object", + "properties": {...}, + "required": [...] + } + } + """ + model_str = generation_config.model or "" + model_name = ( + model_str.split("anthropic/")[-1] + if model_str + else "claude-3-opus-20240229" + ) + + args: dict[str, Any] = { + "model": model_name, + "temperature": generation_config.temperature, + "max_tokens": generation_config.max_tokens_to_sample, + "stream": generation_config.stream, + } + if generation_config.top_p: + args["top_p"] = generation_config.top_p + + if generation_config.tools is not None: + # Convert tools to Anthropic's format + anthropic_tools: list[dict[str, Any]] = [] + for tool in generation_config.tools: + tool_def = { + "name": tool["function"]["name"], + "description": tool["function"]["description"], + "input_schema": tool["function"]["parameters"], + } + anthropic_tools.append(tool_def) + args["tools"] = anthropic_tools + + if hasattr(generation_config, "tool_choice"): + tool_choice = generation_config.tool_choice + if isinstance(tool_choice, str): + if tool_choice == "auto": + args["tool_choice"] = {"type": "auto"} + elif tool_choice == "any": + args["tool_choice"] = {"type": "any"} + elif isinstance(tool_choice, dict): + if tool_choice.get("type") == "function": + args["tool_choice"] = { + "type": "function", + "name": tool_choice.get("name"), + } + if hasattr(generation_config, "disable_parallel_tool_use"): + args["tool_choice"] = args.get("tool_choice", {}) + args["tool_choice"]["disable_parallel_tool_use"] = ( + generation_config.disable_parallel_tool_use + ) + + # --- Extended Thinking Support --- + if getattr(generation_config, "extended_thinking", False): + if ( + not hasattr(generation_config, "thinking_budget") + or generation_config.thinking_budget is None + ): + raise ValueError( + "Extended thinking is enabled but no thinking_budget is provided." + ) + if ( + generation_config.thinking_budget + >= generation_config.max_tokens_to_sample + ): + raise ValueError( + "thinking_budget must be less than max_tokens_to_sample." + ) + args["thinking"] = { + "type": "enabled", + "budget_tokens": generation_config.thinking_budget, + } + return args + + def _preprocess_messages(self, messages: list[dict]) -> list[dict]: + """ + Preprocess all messages to optimize images before sending to Anthropic API. + """ + if not messages or not isinstance(messages, list): + return messages + + processed_messages = [] + for message in messages: + processed_message = process_images_in_message(message) + processed_messages.append(processed_message) + + return processed_messages + + def _create_openai_style_message(self, content_blocks, tool_calls=None): + """ + Create an OpenAI-style message from Anthropic content blocks + while preserving the original structure. + """ + display_content = "" + structured_content: list[Any] = [] + + for block in content_blocks: + if block.type == "text": + display_content += block.text + elif block.type == "thinking" and hasattr(block, "thinking"): + # Store the complete thinking block + structured_content.append( + { + "type": "thinking", + "thinking": block.thinking, + "signature": block.signature, + } + ) + # For display/logging + # display_content += f"<think>{block.thinking}</think>" + elif block.type == "redacted_thinking" and hasattr(block, "data"): + # Store the complete redacted thinking block + structured_content.append( + {"type": "redacted_thinking", "data": block.data} + ) + # Add a placeholder for display/logging + display_content += "<redacted thinking block>" + elif block.type == "tool_use": + # Tool use blocks are handled separately via tool_calls + pass + + # If we have structured content (thinking blocks), use that + if structured_content: + # Add any text block at the end if needed + for block in content_blocks: + if block.type == "text": + structured_content.append( + {"type": "text", "text": block.text} + ) + + return { + "content": display_content or None, + "structured_content": structured_content, + } + else: + # If no structured content, just return the display content + return {"content": display_content or None} + + def _convert_to_chat_completion(self, anthropic_msg: Message) -> dict: + """ + Convert a non-streaming Anthropic Message into an OpenAI-style dict. + Preserves thinking blocks for proper handling. + """ + tool_calls: list[Any] = [] + message_data: dict[str, Any] = {"role": anthropic_msg.role} + + if anthropic_msg.content: + # First, extract any tool use blocks + for block in anthropic_msg.content: + if hasattr(block, "type") and block.type == "tool_use": + tool_calls.append( + { + "index": len(tool_calls), + "id": block.id, + "type": "function", + "function": { + "name": block.name, + "arguments": json.dumps(block.input), + }, + } + ) + + # Then create the message with appropriate content + message_data.update( + self._create_openai_style_message( + anthropic_msg.content, tool_calls + ) + ) + + # If we have tool calls, add them + if tool_calls: + message_data["tool_calls"] = tool_calls + + finish_reason = ( + "stop" + if anthropic_msg.stop_reason == "end_turn" + else anthropic_msg.stop_reason + ) + finish_reason = ( + "tool_calls" + if anthropic_msg.stop_reason == "tool_use" + else finish_reason + ) + + model_str = anthropic_msg.model or "" + model_name = model_str.split("anthropic/")[-1] if model_str else "" + + return { + "id": anthropic_msg.id, + "object": "chat.completion", + "created": int(time.time()), + "model": model_name, + "usage": { + "prompt_tokens": ( + anthropic_msg.usage.input_tokens + if anthropic_msg.usage + else 0 + ), + "completion_tokens": ( + anthropic_msg.usage.output_tokens + if anthropic_msg.usage + else 0 + ), + "total_tokens": ( + ( + anthropic_msg.usage.input_tokens + if anthropic_msg.usage + else 0 + ) + + ( + anthropic_msg.usage.output_tokens + if anthropic_msg.usage + else 0 + ) + ), + }, + "choices": [ + { + "index": 0, + "message": message_data, + "finish_reason": finish_reason, + } + ], + } + + def _split_system_messages( + self, messages: list[dict] + ) -> tuple[list[dict], Optional[str]]: + """ + Process messages for Anthropic API, ensuring proper format for tool use and thinking blocks. + Now with image optimization. + """ + # First preprocess to resize any images + messages = self._preprocess_messages(messages) + + system_msg = None + filtered: list[dict[str, Any]] = [] + pending_tool_results: list[dict[str, Any]] = [] + + # Look for pairs of tool_use and tool_result + i = 0 + while i < len(messages): + m = copy.deepcopy(messages[i]) + + # Handle system message + if m["role"] == "system" and system_msg is None: + system_msg = m["content"] + i += 1 + continue + + # Case 1: Message with list format content (thinking blocks or tool blocks) + if ( + isinstance(m.get("content"), list) + and len(m["content"]) > 0 + and isinstance(m["content"][0], dict) + ): + filtered.append({"role": m["role"], "content": m["content"]}) + i += 1 + continue + + # Case 2: Message with structured_content field + elif m.get("structured_content") and m["role"] == "assistant": + filtered.append( + {"role": "assistant", "content": m["structured_content"]} + ) + i += 1 + continue + + # Case 3: Tool calls in an assistant message + elif m.get("tool_calls") and m["role"] == "assistant": + # Add content if it exists + if m.get("content") and not isinstance(m["content"], list): + content_to_add = m["content"] + # Handle content with thinking tags + if "<think>" in content_to_add: + thinking_start = content_to_add.find("<think>") + thinking_end = content_to_add.find("</think>") + if ( + thinking_start >= 0 + and thinking_end > thinking_start + ): + thinking_content = content_to_add[ + thinking_start + 7 : thinking_end + ] + text_content = content_to_add[ + thinking_end + 8 : + ].strip() + filtered.append( + { + "role": "assistant", + "content": [ + { + "type": "thinking", + "thinking": thinking_content, + "signature": "placeholder_signature", # This is a placeholder + }, + {"type": "text", "text": text_content}, + ], + } + ) + else: + filtered.append( + { + "role": "assistant", + "content": content_to_add, + } + ) + else: + filtered.append( + {"role": "assistant", "content": content_to_add} + ) + + # Add tool use blocks + tool_uses = [] + for call in m["tool_calls"]: + tool_uses.append( + { + "type": "tool_use", + "id": call["id"], + "name": call["function"]["name"], + "input": json.loads(call["function"]["arguments"]), + } + ) + + filtered.append({"role": "assistant", "content": tool_uses}) + + # Check if next message is a tool result for this tool call + if i + 1 < len(messages) and messages[i + 1]["role"] in [ + "function", + "tool", + ]: + next_m = copy.deepcopy(messages[i + 1]) + + # Make sure this is a tool result for the current tool use + if next_m.get("tool_call_id") in [ + call["id"] for call in m["tool_calls"] + ]: + # Add tool result as a user message + filtered.append( + { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": next_m["tool_call_id"], + "content": next_m["content"], + } + ], + } + ) + i += 2 # Skip both the tool call and result + continue + + i += 1 + continue + + # Case 4: Direct tool result (might be missing its paired tool call) + elif m["role"] in ["function", "tool"] and m.get("tool_call_id"): + # Add a user message with the tool result + filtered.append( + { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": m["tool_call_id"], + "content": m["content"], + } + ], + } + ) + i += 1 + continue + + # Default case: normal message + elif m["role"] in ["function", "tool"]: + # Collect tool results to combine them + pending_tool_results.append( + { + "type": "tool_result", + "tool_use_id": m.get("tool_call_id"), + "content": m["content"], + } + ) + + # If we have all expected results, add them as one message + if len(filtered) > 0 and len( + filtered[-1].get("content", []) + ) == len(pending_tool_results): + filtered.append( + {"role": "user", "content": pending_tool_results} + ) + pending_tool_results = [] + else: + filtered.append(openai_message_to_anthropic_block(m)) + i += 1 + + # Final validation: ensure no tool_use is at the end without a tool_result + if filtered and len(filtered) > 1: + last_msg = filtered[-1] + if ( + last_msg["role"] == "assistant" + and isinstance(last_msg.get("content"), list) + and any( + block.get("type") == "tool_use" + for block in last_msg["content"] + ) + ): + logger.warning( + "Found tool_use at end of conversation without tool_result - removing it" + ) + filtered.pop() # Remove problematic message + + return filtered, system_msg + + async def _execute_task(self, task: dict[str, Any]): + """Async entry point. + + Decide if streaming or not, then call the appropriate helper. + """ + api_key = os.getenv("ANTHROPIC_API_KEY") + if not api_key: + logger.error("Missing ANTHROPIC_API_KEY in environment.") + raise ValueError( + "Anthropic API key not found. Set ANTHROPIC_API_KEY env var." + ) + + messages = task["messages"] + generation_config = task["generation_config"] + extra_kwargs = task["kwargs"] + base_args = self._get_base_args(generation_config) + filtered_messages, system_msg = self._split_system_messages(messages) + base_args["messages"] = filtered_messages + if system_msg: + base_args["system"] = system_msg + + args = {**base_args, **extra_kwargs} + logger.debug(f"Anthropic async call with args={args}") + + if generation_config.stream: + return self._execute_task_async_streaming(args) + else: + return await self._execute_task_async_nonstreaming(args) + + async def _execute_task_async_nonstreaming( + self, args: dict[str, Any] + ) -> LLMChatCompletion: + api_key = os.getenv("ANTHROPIC_API_KEY") + if not api_key: + logger.error("Missing ANTHROPIC_API_KEY in environment.") + raise ValueError( + "Anthropic API key not found. Set ANTHROPIC_API_KEY env var." + ) + + try: + logger.debug(f"Anthropic API request: {args}") + response = await self.async_client.messages.create(**args) + logger.debug(f"Anthropic API response: {response}") + + return LLMChatCompletion( + **self._convert_to_chat_completion(response) + ) + except Exception as e: + logger.error(f"Anthropic async non-stream call failed: {e}") + logger.error("message payload = ", args) + raise + + async def _execute_task_async_streaming( + self, args: dict + ) -> AsyncGenerator[dict[str, Any], None]: + """Streaming call (async): yields partial tokens in OpenAI-like SSE + format.""" + # The `stream=True` is typically handled by Anthropics from the original args, + # but we remove it to avoid conflicts and rely on `messages.stream()`. + args.pop("stream", None) + try: + async with self.async_client.messages.stream(**args) as stream: + # We'll track partial JSON for function calls in buffer_data + buffer_data: dict[str, Any] = { + "tool_json_buffer": "", + "tool_name": None, + "tool_id": None, + "is_collecting_tool": False, + "thinking_buffer": "", + "is_collecting_thinking": False, + "thinking_signature": None, + "message_id": f"chatcmpl-{int(time.time())}", + } + model_name = args.get("model", "claude-2") + if isinstance(model_name, str): + model_name = model_name.split("anthropic/")[-1] + + async for event in stream: + chunks = self._process_stream_event( + event=event, + buffer_data=buffer_data, + model_name=model_name, + ) + for chunk in chunks: + yield chunk + except Exception as e: + logger.error(f"Failed to execute streaming Anthropic task: {e}") + logger.error("message payload = ", args) + + raise + + def _execute_task_sync(self, task: dict[str, Any]): + """Synchronous entry point.""" + messages = task["messages"] + generation_config = task["generation_config"] + extra_kwargs = task["kwargs"] + + base_args = self._get_base_args(generation_config) + filtered_messages, system_msg = self._split_system_messages(messages) + base_args["messages"] = filtered_messages + if system_msg: + base_args["system"] = system_msg + + args = {**base_args, **extra_kwargs} + logger.debug(f"Anthropic sync call with args={args}") + + if generation_config.stream: + return self._execute_task_sync_streaming(args) + else: + return self._execute_task_sync_nonstreaming(args) + + def _execute_task_sync_nonstreaming( + self, args: dict[str, Any] + ): # -> LLMChatCompletion: # FIXME: LLMChatCompletion is an object from the OpenAI API, which causes a validation error + """Non-streaming synchronous call.""" + try: + response = self.client.messages.create(**args) + logger.debug("Anthropic sync non-stream call succeeded.") + return LLMChatCompletion( + **self._convert_to_chat_completion(response) + ) + except Exception as e: + logger.error(f"Anthropic sync call failed: {e}") + raise + + def _execute_task_sync_streaming( + self, args: dict[str, Any] + ) -> Generator[dict[str, Any], None, None]: + """ + Synchronous streaming call: yields partial tokens in a generator. + """ + args.pop("stream", None) + try: + with self.client.messages.stream(**args) as stream: + buffer_data: dict[str, Any] = { + "tool_json_buffer": "", + "tool_name": None, + "tool_id": None, + "is_collecting_tool": False, + "thinking_buffer": "", + "is_collecting_thinking": False, + "thinking_signature": None, + "message_id": f"chatcmpl-{int(time.time())}", + } + model_name = args.get("model", "anthropic/claude-2") + if isinstance(model_name, str): + model_name = model_name.split("anthropic/")[-1] + + for event in stream: + yield from self._process_stream_event( + event=event, + buffer_data=buffer_data, + model_name=model_name.split("anthropic/")[-1], + ) + except Exception as e: + logger.error(f"Anthropic sync streaming call failed: {e}") + raise + + def _process_stream_event( + self, event: Any, buffer_data: dict[str, Any], model_name: str + ) -> list[dict[str, Any]]: + chunks: list[dict[str, Any]] = [] + + def make_base_chunk() -> dict[str, Any]: + return { + "id": buffer_data["message_id"], + "object": "chat.completion.chunk", + "created": int(time.time()), + "model": model_name, + "choices": [{"index": 0, "delta": {}, "finish_reason": None}], + } + + if isinstance(event, RawMessageStartEvent): + buffer_data["message_id"] = event.message.id + chunk = make_base_chunk() + input_tokens = ( + event.message.usage.input_tokens if event.message.usage else 0 + ) + chunk["usage"] = { + "prompt_tokens": input_tokens, + "completion_tokens": 0, + "total_tokens": input_tokens, + } + chunks.append(chunk) + + elif isinstance(event, RawContentBlockStartEvent): + if hasattr(event.content_block, "type"): + block_type = event.content_block.type + if block_type == "thinking": + buffer_data["is_collecting_thinking"] = True + buffer_data["thinking_buffer"] = "" + # Don't emit anything yet + elif block_type == "tool_use" or isinstance( + event.content_block, ToolUseBlock + ): + buffer_data["tool_name"] = event.content_block.name # type: ignore + buffer_data["tool_id"] = event.content_block.id # type: ignore + buffer_data["tool_json_buffer"] = "" + buffer_data["is_collecting_tool"] = True + + elif isinstance(event, RawContentBlockDeltaEvent): + delta_obj = getattr(event, "delta", None) + delta_type = getattr(delta_obj, "type", None) + + # Handle thinking deltas + if delta_type == "thinking_delta" and hasattr( + delta_obj, "thinking" + ): + thinking_chunk = delta_obj.thinking # type: ignore + if buffer_data["is_collecting_thinking"]: + buffer_data["thinking_buffer"] += thinking_chunk + # Stream thinking chunks as they come in + chunk = make_base_chunk() + chunk["choices"][0]["delta"] = {"thinking": thinking_chunk} + chunks.append(chunk) + + # Handle signature deltas for thinking blocks + elif delta_type == "signature_delta" and hasattr( + delta_obj, "signature" + ): + if buffer_data["is_collecting_thinking"]: + buffer_data["thinking_signature"] = delta_obj.signature # type: ignore + # No need to emit anything for the signature + chunk = make_base_chunk() + chunk["choices"][0]["delta"] = { + "thinking_signature": delta_obj.signature # type: ignore + } + chunks.append(chunk) + + # Handle text deltas + elif delta_type == "text_delta" and hasattr(delta_obj, "text"): + text_chunk = delta_obj.text # type: ignore + if not buffer_data["is_collecting_tool"] and text_chunk: + chunk = make_base_chunk() + chunk["choices"][0]["delta"] = {"content": text_chunk} + chunks.append(chunk) + + # Handle partial JSON for tools + elif hasattr(delta_obj, "partial_json"): + if buffer_data["is_collecting_tool"]: + buffer_data["tool_json_buffer"] += delta_obj.partial_json # type: ignore + + elif isinstance(event, ContentBlockStopEvent): + # Handle the end of a thinking block + if buffer_data.get("is_collecting_thinking"): + # Emit a special "structured_content_delta" with the complete thinking block + if ( + buffer_data["thinking_buffer"] + and buffer_data["thinking_signature"] + ): + chunk = make_base_chunk() + chunk["choices"][0]["delta"] = { + "structured_content": [ + { + "type": "thinking", + "thinking": buffer_data["thinking_buffer"], + "signature": buffer_data["thinking_signature"], + } + ] + } + chunks.append(chunk) + + # Reset thinking collection + buffer_data["is_collecting_thinking"] = False + buffer_data["thinking_buffer"] = "" + buffer_data["thinking_signature"] = None + + # Handle the end of a tool use block + elif buffer_data.get("is_collecting_tool"): + try: + json.loads(buffer_data["tool_json_buffer"]) + chunk = make_base_chunk() + chunk["choices"][0]["delta"] = { + "tool_calls": [ + { + "index": 0, + "type": "function", + "id": buffer_data["tool_id"] + or f"call_{generate_tool_id()}", + "function": { + "name": buffer_data["tool_name"], + "arguments": buffer_data[ + "tool_json_buffer" + ], + }, + } + ] + } + chunks.append(chunk) + buffer_data["is_collecting_tool"] = False + buffer_data["tool_json_buffer"] = "" + buffer_data["tool_name"] = None + buffer_data["tool_id"] = None + except json.JSONDecodeError: + logger.warning( + "Incomplete JSON in tool call, skipping chunk" + ) + + elif isinstance(event, MessageStopEvent): + # Check if the event has a message attribute before accessing it + stop_reason = getattr(event, "message", None) + if stop_reason and hasattr(stop_reason, "stop_reason"): + stop_reason = stop_reason.stop_reason + chunk = make_base_chunk() + if stop_reason == "tool_use": + chunk["choices"][0]["delta"] = {} + chunk["choices"][0]["finish_reason"] = "tool_calls" + else: + chunk["choices"][0]["delta"] = {} + chunk["choices"][0]["finish_reason"] = "stop" + chunks.append(chunk) + else: + # Handle the case where message is not available + chunk = make_base_chunk() + chunk["choices"][0]["delta"] = {} + chunk["choices"][0]["finish_reason"] = "stop" + chunks.append(chunk) + + return chunks diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/azure_foundry.py b/.venv/lib/python3.12/site-packages/core/providers/llm/azure_foundry.py new file mode 100644 index 00000000..863e44ec --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/providers/llm/azure_foundry.py @@ -0,0 +1,110 @@ +import logging +import os +from typing import Any, Optional + +from azure.ai.inference import ( + ChatCompletionsClient as AzureChatCompletionsClient, +) +from azure.ai.inference.aio import ( + ChatCompletionsClient as AsyncAzureChatCompletionsClient, +) +from azure.core.credentials import AzureKeyCredential + +from core.base.abstractions import GenerationConfig +from core.base.providers.llm import CompletionConfig, CompletionProvider + +logger = logging.getLogger(__name__) + + +class AzureFoundryCompletionProvider(CompletionProvider): + def __init__(self, config: CompletionConfig, *args, **kwargs) -> None: + super().__init__(config) + self.azure_foundry_client: Optional[AzureChatCompletionsClient] = None + self.async_azure_foundry_client: Optional[ + AsyncAzureChatCompletionsClient + ] = None + + # Initialize Azure Foundry clients if credentials exist. + azure_foundry_api_key = os.getenv("AZURE_FOUNDRY_API_KEY") + azure_foundry_api_endpoint = os.getenv("AZURE_FOUNDRY_API_ENDPOINT") + + if azure_foundry_api_key and azure_foundry_api_endpoint: + self.azure_foundry_client = AzureChatCompletionsClient( + endpoint=azure_foundry_api_endpoint, + credential=AzureKeyCredential(azure_foundry_api_key), + api_version=os.getenv( + "AZURE_FOUNDRY_API_VERSION", "2024-05-01-preview" + ), + ) + self.async_azure_foundry_client = AsyncAzureChatCompletionsClient( + endpoint=azure_foundry_api_endpoint, + credential=AzureKeyCredential(azure_foundry_api_key), + api_version=os.getenv( + "AZURE_FOUNDRY_API_VERSION", "2024-05-01-preview" + ), + ) + logger.debug("Azure Foundry clients initialized successfully") + + def _get_base_args( + self, generation_config: GenerationConfig + ) -> dict[str, Any]: + # Construct arguments similar to the other providers. + args: dict[str, Any] = { + "top_p": generation_config.top_p, + "stream": generation_config.stream, + "max_tokens": generation_config.max_tokens_to_sample, + "temperature": generation_config.temperature, + } + + if generation_config.functions is not None: + args["functions"] = generation_config.functions + if generation_config.tools is not None: + args["tools"] = generation_config.tools + if generation_config.response_format is not None: + args["response_format"] = generation_config.response_format + return args + + async def _execute_task(self, task: dict[str, Any]): + messages = task["messages"] + generation_config = task["generation_config"] + kwargs = task["kwargs"] + + args = self._get_base_args(generation_config) + # Azure Foundry does not require a "model" argument; the endpoint is fixed. + args["messages"] = messages + args = {**args, **kwargs} + logger.debug(f"Executing async Azure Foundry task with args: {args}") + + try: + if self.async_azure_foundry_client is None: + raise ValueError("Azure Foundry client is not initialized") + + response = await self.async_azure_foundry_client.complete(**args) + logger.debug("Async Azure Foundry task executed successfully") + return response + except Exception as e: + logger.error( + f"Async Azure Foundry task execution failed: {str(e)}" + ) + raise + + def _execute_task_sync(self, task: dict[str, Any]): + messages = task["messages"] + generation_config = task["generation_config"] + kwargs = task["kwargs"] + + args = self._get_base_args(generation_config) + args["messages"] = messages + args = {**args, **kwargs} + logger.debug(f"Executing sync Azure Foundry task with args: {args}") + + try: + if self.azure_foundry_client is None: + raise ValueError("Azure Foundry client is not initialized") + + response = self.azure_foundry_client.complete(**args) + logger.debug("Sync Azure Foundry task executed successfully") + return response + except Exception as e: + logger.error(f"Sync Azure Foundry task execution failed: {str(e)}") + raise diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/litellm.py b/.venv/lib/python3.12/site-packages/core/providers/llm/litellm.py new file mode 100644 index 00000000..44d467c2 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/providers/llm/litellm.py @@ -0,0 +1,80 @@ +import logging +from typing import Any + +import litellm +from litellm import acompletion, completion + +from core.base.abstractions import GenerationConfig +from core.base.providers.llm import CompletionConfig, CompletionProvider + +logger = logging.getLogger() + + +class LiteLLMCompletionProvider(CompletionProvider): + def __init__(self, config: CompletionConfig, *args, **kwargs) -> None: + super().__init__(config) + litellm.modify_params = True + self.acompletion = acompletion + self.completion = completion + + # if config.provider != "litellm": + # logger.error(f"Invalid provider: {config.provider}") + # raise ValueError( + # "LiteLLMCompletionProvider must be initialized with config with `litellm` provider." + # ) + + def _get_base_args( + self, generation_config: GenerationConfig + ) -> dict[str, Any]: + args: dict[str, Any] = { + "model": generation_config.model, + "temperature": generation_config.temperature, + "top_p": generation_config.top_p, + "stream": generation_config.stream, + "max_tokens": generation_config.max_tokens_to_sample, + "api_base": generation_config.api_base, + } + + # Fix the type errors by properly typing these assignments + if generation_config.functions is not None: + args["functions"] = generation_config.functions + if generation_config.tools is not None: + args["tools"] = generation_config.tools + if generation_config.response_format is not None: + args["response_format"] = generation_config.response_format + + return args + + async def _execute_task(self, task: dict[str, Any]): + messages = task["messages"] + generation_config = task["generation_config"] + kwargs = task["kwargs"] + + args = self._get_base_args(generation_config) + args["messages"] = messages + args = {**args, **kwargs} + + logger.debug( + f"Executing LiteLLM task with generation_config={generation_config}" + ) + + return await self.acompletion(**args) + + def _execute_task_sync(self, task: dict[str, Any]): + messages = task["messages"] + generation_config = task["generation_config"] + kwargs = task["kwargs"] + + args = self._get_base_args(generation_config) + args["messages"] = messages + args = {**args, **kwargs} + + logger.debug( + f"Executing LiteLLM task with generation_config={generation_config}" + ) + + try: + return self.completion(**args) + except Exception as e: + logger.error(f"Sync LiteLLM task execution failed: {str(e)}") + raise diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/openai.py b/.venv/lib/python3.12/site-packages/core/providers/llm/openai.py new file mode 100644 index 00000000..30ef37ab --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/providers/llm/openai.py @@ -0,0 +1,522 @@ +import logging +import os +from typing import Any + +from openai import AsyncAzureOpenAI, AsyncOpenAI, OpenAI + +from core.base.abstractions import GenerationConfig +from core.base.providers.llm import CompletionConfig, CompletionProvider + +from .utils import resize_base64_image + +logger = logging.getLogger() + + +class OpenAICompletionProvider(CompletionProvider): + def __init__(self, config: CompletionConfig, *args, **kwargs) -> None: + super().__init__(config) + self.openai_client = None + self.async_openai_client = None + self.azure_client = None + self.async_azure_client = None + self.deepseek_client = None + self.async_deepseek_client = None + self.ollama_client = None + self.async_ollama_client = None + self.lmstudio_client = None + self.async_lmstudio_client = None + # NEW: Azure Foundry clients using the Azure Inference API + self.azure_foundry_client = None + self.async_azure_foundry_client = None + + # Initialize OpenAI clients if credentials exist + if os.getenv("OPENAI_API_KEY"): + self.openai_client = OpenAI() + self.async_openai_client = AsyncOpenAI() + logger.debug("OpenAI clients initialized successfully") + + # Initialize Azure OpenAI clients if credentials exist + azure_api_key = os.getenv("AZURE_API_KEY") + azure_api_base = os.getenv("AZURE_API_BASE") + if azure_api_key and azure_api_base: + self.azure_client = AsyncAzureOpenAI( + api_key=azure_api_key, + api_version=os.getenv( + "AZURE_API_VERSION", "2024-02-15-preview" + ), + azure_endpoint=azure_api_base, + ) + self.async_azure_client = AsyncAzureOpenAI( + api_key=azure_api_key, + api_version=os.getenv( + "AZURE_API_VERSION", "2024-02-15-preview" + ), + azure_endpoint=azure_api_base, + ) + logger.debug("Azure OpenAI clients initialized successfully") + + # Initialize Deepseek clients if credentials exist + deepseek_api_key = os.getenv("DEEPSEEK_API_KEY") + deepseek_api_base = os.getenv( + "DEEPSEEK_API_BASE", "https://api.deepseek.com" + ) + if deepseek_api_key and deepseek_api_base: + self.deepseek_client = OpenAI( + api_key=deepseek_api_key, + base_url=deepseek_api_base, + ) + self.async_deepseek_client = AsyncOpenAI( + api_key=deepseek_api_key, + base_url=deepseek_api_base, + ) + logger.debug("Deepseek OpenAI clients initialized successfully") + + # Initialize Ollama clients with default API key + ollama_api_base = os.getenv( + "OLLAMA_API_BASE", "http://localhost:11434/v1" + ) + if ollama_api_base: + self.ollama_client = OpenAI( + api_key=os.getenv("OLLAMA_API_KEY", "dummy"), + base_url=ollama_api_base, + ) + self.async_ollama_client = AsyncOpenAI( + api_key=os.getenv("OLLAMA_API_KEY", "dummy"), + base_url=ollama_api_base, + ) + logger.debug("Ollama OpenAI clients initialized successfully") + + # Initialize LMStudio clients + lmstudio_api_base = os.getenv( + "LMSTUDIO_API_BASE", "http://localhost:1234/v1" + ) + if lmstudio_api_base: + self.lmstudio_client = OpenAI( + api_key=os.getenv("LMSTUDIO_API_KEY", "lm-studio"), + base_url=lmstudio_api_base, + ) + self.async_lmstudio_client = AsyncOpenAI( + api_key=os.getenv("LMSTUDIO_API_KEY", "lm-studio"), + base_url=lmstudio_api_base, + ) + logger.debug("LMStudio OpenAI clients initialized successfully") + + # Initialize Azure Foundry clients if credentials exist. + # These use the Azure Inference API (currently pasted into this handler). + azure_foundry_api_key = os.getenv("AZURE_FOUNDRY_API_KEY") + azure_foundry_api_endpoint = os.getenv("AZURE_FOUNDRY_API_ENDPOINT") + if azure_foundry_api_key and azure_foundry_api_endpoint: + from azure.ai.inference import ( + ChatCompletionsClient as AzureChatCompletionsClient, + ) + from azure.ai.inference.aio import ( + ChatCompletionsClient as AsyncAzureChatCompletionsClient, + ) + from azure.core.credentials import AzureKeyCredential + + self.azure_foundry_client = AzureChatCompletionsClient( + endpoint=azure_foundry_api_endpoint, + credential=AzureKeyCredential(azure_foundry_api_key), + api_version=os.getenv( + "AZURE_FOUNDRY_API_VERSION", "2024-05-01-preview" + ), + ) + self.async_azure_foundry_client = AsyncAzureChatCompletionsClient( + endpoint=azure_foundry_api_endpoint, + credential=AzureKeyCredential(azure_foundry_api_key), + api_version=os.getenv( + "AZURE_FOUNDRY_API_VERSION", "2024-05-01-preview" + ), + ) + logger.debug("Azure Foundry clients initialized successfully") + + if not any( + [ + self.openai_client, + self.azure_client, + self.ollama_client, + self.lmstudio_client, + self.azure_foundry_client, + ] + ): + raise ValueError( + "No valid client credentials found. Please set either OPENAI_API_KEY, " + "both AZURE_API_KEY and AZURE_API_BASE environment variables, " + "OLLAMA_API_BASE, LMSTUDIO_API_BASE, or AZURE_FOUNDRY_API_KEY and AZURE_FOUNDRY_API_ENDPOINT." + ) + + def _get_client_and_model(self, model: str): + """Determine which client to use based on model prefix and return the + appropriate client and model name.""" + if model.startswith("azure/"): + if not self.azure_client: + raise ValueError( + "Azure OpenAI credentials not configured but azure/ model prefix used" + ) + return self.azure_client, model[6:] # Strip 'azure/' prefix + elif model.startswith("openai/"): + if not self.openai_client: + raise ValueError( + "OpenAI credentials not configured but openai/ model prefix used" + ) + return self.openai_client, model[7:] # Strip 'openai/' prefix + elif model.startswith("deepseek/"): + if not self.deepseek_client: + raise ValueError( + "Deepseek OpenAI credentials not configured but deepseek/ model prefix used" + ) + return self.deepseek_client, model[9:] # Strip 'deepseek/' prefix + elif model.startswith("ollama/"): + if not self.ollama_client: + raise ValueError( + "Ollama OpenAI credentials not configured but ollama/ model prefix used" + ) + return self.ollama_client, model[7:] # Strip 'ollama/' prefix + elif model.startswith("lmstudio/"): + if not self.lmstudio_client: + raise ValueError( + "LMStudio credentials not configured but lmstudio/ model prefix used" + ) + return self.lmstudio_client, model[9:] # Strip 'lmstudio/' prefix + elif model.startswith("azure-foundry/"): + if not self.azure_foundry_client: + raise ValueError( + "Azure Foundry credentials not configured but azure-foundry/ model prefix used" + ) + return ( + self.azure_foundry_client, + model[14:], + ) # Strip 'azure-foundry/' prefix + else: + # Default to OpenAI if no prefix is provided. + if self.openai_client: + return self.openai_client, model + elif self.azure_client: + return self.azure_client, model + elif self.ollama_client: + return self.ollama_client, model + elif self.lmstudio_client: + return self.lmstudio_client, model + elif self.azure_foundry_client: + return self.azure_foundry_client, model + else: + raise ValueError("No valid client available for model prefix") + + def _get_async_client_and_model(self, model: str): + """Get async client and model name based on prefix.""" + if model.startswith("azure/"): + if not self.async_azure_client: + raise ValueError( + "Azure OpenAI credentials not configured but azure/ model prefix used" + ) + return self.async_azure_client, model[6:] + elif model.startswith("openai/"): + if not self.async_openai_client: + raise ValueError( + "OpenAI credentials not configured but openai/ model prefix used" + ) + return self.async_openai_client, model[7:] + elif model.startswith("deepseek/"): + if not self.async_deepseek_client: + raise ValueError( + "Deepseek OpenAI credentials not configured but deepseek/ model prefix used" + ) + return self.async_deepseek_client, model[9:].strip() + elif model.startswith("ollama/"): + if not self.async_ollama_client: + raise ValueError( + "Ollama OpenAI credentials not configured but ollama/ model prefix used" + ) + return self.async_ollama_client, model[7:] + elif model.startswith("lmstudio/"): + if not self.async_lmstudio_client: + raise ValueError( + "LMStudio credentials not configured but lmstudio/ model prefix used" + ) + return self.async_lmstudio_client, model[9:] + elif model.startswith("azure-foundry/"): + if not self.async_azure_foundry_client: + raise ValueError( + "Azure Foundry credentials not configured but azure-foundry/ model prefix used" + ) + return self.async_azure_foundry_client, model[14:] + else: + if self.async_openai_client: + return self.async_openai_client, model + elif self.async_azure_client: + return self.async_azure_client, model + elif self.async_ollama_client: + return self.async_ollama_client, model + elif self.async_lmstudio_client: + return self.async_lmstudio_client, model + elif self.async_azure_foundry_client: + return self.async_azure_foundry_client, model + else: + raise ValueError( + "No valid async client available for model prefix" + ) + + def _process_messages_with_images( + self, messages: list[dict] + ) -> list[dict]: + """ + Process messages that may contain image_url or image_data fields. + Now includes aggressive image resizing similar to Anthropic provider. + """ + processed_messages = [] + + for msg in messages: + if msg.get("role") == "system": + # System messages don't support content arrays in OpenAI + processed_messages.append(msg) + continue + + # Check if the message contains image data + image_url = msg.pop("image_url", None) + image_data = msg.pop("image_data", None) + content = msg.get("content") + + if image_url or image_data: + # Convert to content array format + new_content = [] + + # Add image content + if image_url: + new_content.append( + {"type": "image_url", "image_url": {"url": image_url}} + ) + elif image_data: + # Resize the base64 image data if available + media_type = image_data.get("media_type", "image/jpeg") + data = image_data.get("data", "") + + # Apply image resizing if PIL is available + if data: + data = resize_base64_image(data) + logger.debug( + f"Image resized, new size: {len(data)} chars" + ) + + # OpenAI expects base64 images in data URL format + data_url = f"data:{media_type};base64,{data}" + new_content.append( + {"type": "image_url", "image_url": {"url": data_url}} + ) + + # Add text content if present + if content: + new_content.append({"type": "text", "text": content}) + + # Update the message + new_msg = dict(msg) + new_msg["content"] = new_content + processed_messages.append(new_msg) + else: + processed_messages.append(msg) + + return processed_messages + + def _process_array_content_with_images(self, content: list) -> list: + """ + Process content array that may contain image_url items. + Used for messages that already have content in array format. + """ + if not content or not isinstance(content, list): + return content + + processed_content = [] + + for item in content: + if isinstance(item, dict): + if item.get("type") == "image_url": + # Process image URL if needed + processed_content.append(item) + elif item.get("type") == "image" and item.get("source"): + # Convert Anthropic-style to OpenAI-style + source = item.get("source", {}) + if source.get("type") == "base64" and source.get("data"): + # Resize the base64 image data + resized_data = resize_base64_image(source.get("data")) + + media_type = source.get("media_type", "image/jpeg") + data_url = f"data:{media_type};base64,{resized_data}" + + processed_content.append( + { + "type": "image_url", + "image_url": {"url": data_url}, + } + ) + elif source.get("type") == "url" and source.get("url"): + processed_content.append( + { + "type": "image_url", + "image_url": {"url": source.get("url")}, + } + ) + else: + # Pass through other types + processed_content.append(item) + else: + processed_content.append(item) + + return processed_content + + def _preprocess_messages(self, messages: list[dict]) -> list[dict]: + """ + Preprocess all messages to optimize images before sending to OpenAI API. + """ + if not messages or not isinstance(messages, list): + return messages + + processed_messages = [] + + for msg in messages: + # Skip system messages as they're handled separately + if msg.get("role") == "system": + processed_messages.append(msg) + continue + + # Process array-format content (might contain images) + if isinstance(msg.get("content"), list): + new_msg = dict(msg) + new_msg["content"] = self._process_array_content_with_images( + msg["content"] + ) + processed_messages.append(new_msg) + else: + # Standard processing for non-array content + processed_messages.append(msg) + + return processed_messages + + def _get_base_args(self, generation_config: GenerationConfig) -> dict: + # Keep existing implementation... + args: dict[str, Any] = { + "model": generation_config.model, + "stream": generation_config.stream, + } + + model_str = generation_config.model or "" + + if "o1" not in model_str and "o3" not in model_str: + args["max_tokens"] = generation_config.max_tokens_to_sample + args["temperature"] = generation_config.temperature + args["top_p"] = generation_config.top_p + else: + args["max_completion_tokens"] = ( + generation_config.max_tokens_to_sample + ) + + if generation_config.reasoning_effort is not None: + args["reasoning_effort"] = generation_config.reasoning_effort + if generation_config.functions is not None: + args["functions"] = generation_config.functions + if generation_config.tools is not None: + args["tools"] = generation_config.tools + if generation_config.response_format is not None: + args["response_format"] = generation_config.response_format + return args + + async def _execute_task(self, task: dict[str, Any]): + messages = task["messages"] + generation_config = task["generation_config"] + kwargs = task["kwargs"] + + # First preprocess to handle any images in array format + messages = self._preprocess_messages(messages) + + # Then process messages with direct image_url or image_data fields + processed_messages = self._process_messages_with_images(messages) + + args = self._get_base_args(generation_config) + client, model_name = self._get_async_client_and_model(args["model"]) + args["model"] = model_name + args["messages"] = processed_messages + args = {**args, **kwargs} + + # Check if we're using a vision-capable model when images are present + contains_images = any( + isinstance(msg.get("content"), list) + and any( + item.get("type") == "image_url" + for item in msg.get("content", []) + ) + for msg in processed_messages + ) + + if contains_images: + vision_models = ["gpt-4-vision", "gpt-4o"] + if all( + vision_model in model_name for vision_model in vision_models + ): + logger.warning( + f"Using model {model_name} with images, but it may not support vision" + ) + + logger.debug(f"Executing async task with args: {args}") + try: + # Same as before... + if client == self.async_azure_foundry_client: + model_value = args.pop( + "model" + ) # Remove model before passing args + response = await client.complete(**args) + else: + response = await client.chat.completions.create(**args) + logger.debug("Async task executed successfully") + return response + except Exception as e: + logger.error(f"Async task execution failed: {str(e)}") + # HACK: print the exception to the console for debugging + raise + + def _execute_task_sync(self, task: dict[str, Any]): + messages = task["messages"] + generation_config = task["generation_config"] + kwargs = task["kwargs"] + + # First preprocess to handle any images in array format + messages = self._preprocess_messages(messages) + + # Then process messages with direct image_url or image_data fields + processed_messages = self._process_messages_with_images(messages) + + args = self._get_base_args(generation_config) + client, model_name = self._get_client_and_model(args["model"]) + args["model"] = model_name + args["messages"] = processed_messages + args = {**args, **kwargs} + + # Same vision model check as in async version + contains_images = any( + isinstance(msg.get("content"), list) + and any( + item.get("type") == "image_url" + for item in msg.get("content", []) + ) + for msg in processed_messages + ) + + if contains_images: + vision_models = ["gpt-4-vision", "gpt-4o"] + if all( + vision_model in model_name for vision_model in vision_models + ): + logger.warning( + f"Using model {model_name} with images, but it may not support vision" + ) + + logger.debug(f"Executing sync OpenAI task with args: {args}") + try: + # Same as before... + if client == self.azure_foundry_client: + args.pop("model") + response = client.complete(**args) + else: + response = client.chat.completions.create(**args) + logger.debug("Sync task executed successfully") + return response + except Exception as e: + logger.error(f"Sync task execution failed: {str(e)}") + raise diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/r2r_llm.py b/.venv/lib/python3.12/site-packages/core/providers/llm/r2r_llm.py new file mode 100644 index 00000000..b95b310a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/providers/llm/r2r_llm.py @@ -0,0 +1,96 @@ +import logging +from typing import Any + +from core.base.abstractions import GenerationConfig +from core.base.providers.llm import CompletionConfig, CompletionProvider + +from .anthropic import AnthropicCompletionProvider +from .azure_foundry import AzureFoundryCompletionProvider +from .litellm import LiteLLMCompletionProvider +from .openai import OpenAICompletionProvider + +logger = logging.getLogger(__name__) + + +class R2RCompletionProvider(CompletionProvider): + """A provider that routes to the right LLM provider (R2R): + + - If `generation_config.model` starts with "anthropic/", call AnthropicCompletionProvider. + - If it starts with "azure-foundry/", call AzureFoundryCompletionProvider. + - If it starts with one of the other OpenAI-like prefixes ("openai/", "azure/", "deepseek/", "ollama/", "lmstudio/") + or has no prefix (e.g. "gpt-4", "gpt-3.5"), call OpenAICompletionProvider. + - Otherwise, fallback to LiteLLMCompletionProvider. + """ + + def __init__(self, config: CompletionConfig, *args, **kwargs) -> None: + """Initialize sub-providers for OpenAI, Anthropic, LiteLLM, and Azure + Foundry.""" + super().__init__(config) + self.config = config + + logger.info("Initializing R2RCompletionProvider...") + self._openai_provider = OpenAICompletionProvider( + self.config, *args, **kwargs + ) + self._anthropic_provider = AnthropicCompletionProvider( + self.config, *args, **kwargs + ) + self._litellm_provider = LiteLLMCompletionProvider( + self.config, *args, **kwargs + ) + self._azure_foundry_provider = AzureFoundryCompletionProvider( + self.config, *args, **kwargs + ) # New provider + + logger.debug( + "R2RCompletionProvider initialized with OpenAI, Anthropic, LiteLLM, and Azure Foundry sub-providers." + ) + + def _choose_subprovider_by_model( + self, model_name: str, is_streaming: bool = False + ) -> CompletionProvider: + """Decide which underlying sub-provider to call based on the model name + (prefix).""" + # Route to Anthropic if appropriate. + if model_name.startswith("anthropic/"): + return self._anthropic_provider + + # Route to Azure Foundry explicitly. + if model_name.startswith("azure-foundry/"): + return self._azure_foundry_provider + + # OpenAI-like prefixes. + openai_like_prefixes = [ + "openai/", + "azure/", + "deepseek/", + "ollama/", + "lmstudio/", + ] + if ( + any( + model_name.startswith(prefix) + for prefix in openai_like_prefixes + ) + or "/" not in model_name + ): + return self._openai_provider + + # Fallback to LiteLLM. + return self._litellm_provider + + async def _execute_task(self, task: dict[str, Any]): + """Pick the sub-provider based on model name and forward the async + call.""" + generation_config: GenerationConfig = task["generation_config"] + model_name = generation_config.model + sub_provider = self._choose_subprovider_by_model(model_name or "") + return await sub_provider._execute_task(task) + + def _execute_task_sync(self, task: dict[str, Any]): + """Pick the sub-provider based on model name and forward the sync + call.""" + generation_config: GenerationConfig = task["generation_config"] + model_name = generation_config.model + sub_provider = self._choose_subprovider_by_model(model_name or "") + return sub_provider._execute_task_sync(task) diff --git a/.venv/lib/python3.12/site-packages/core/providers/llm/utils.py b/.venv/lib/python3.12/site-packages/core/providers/llm/utils.py new file mode 100644 index 00000000..619b2e73 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/providers/llm/utils.py @@ -0,0 +1,106 @@ +import base64 +import io +import logging +from typing import Tuple + +from PIL import Image + +logger = logging.getLogger() + + +def resize_base64_image( + base64_string: str, + max_size: Tuple[int, int] = (512, 512), + max_megapixels: float = 0.25, +) -> str: + """Aggressively resize images with better error handling and debug output""" + logger.debug( + f"RESIZING NOW!!! Original length: {len(base64_string)} chars" + ) + + # Decode base64 string to bytes + try: + image_data = base64.b64decode(base64_string) + image = Image.open(io.BytesIO(image_data)) + logger.debug(f"Image opened successfully: {image.format} {image.size}") + except Exception as e: + logger.debug(f"Failed to decode/open image: {e}") + # Emergency fallback - truncate the base64 string to reduce tokens + if len(base64_string) > 50000: + return base64_string[:50000] + return base64_string + + try: + width, height = image.size + current_megapixels = (width * height) / 1_000_000 + logger.debug( + f"Original dimensions: {width}x{height} ({current_megapixels:.2f} MP)" + ) + + # MUCH more aggressive resizing for large images + if current_megapixels > 0.5: + max_size = (384, 384) + max_megapixels = 0.15 + logger.debug("Large image detected! Using more aggressive limits") + + # Calculate new dimensions with strict enforcement + # Always resize if the image is larger than we want + scale_factor = min( + max_size[0] / width, + max_size[1] / height, + (max_megapixels / current_megapixels) ** 0.5, + ) + + if scale_factor >= 1.0: + # No resize needed, but still compress + new_width, new_height = width, height + else: + # Apply scaling + new_width = max(int(width * scale_factor), 64) # Min width + new_height = max(int(height * scale_factor), 64) # Min height + + # Always resize/recompress the image + logger.debug(f"Resizing to: {new_width}x{new_height}") + resized_image = image.resize((new_width, new_height), Image.LANCZOS) # type: ignore + + # Convert back to base64 with strong compression + buffer = io.BytesIO() + if image.format == "JPEG" or image.format is None: + # Apply very aggressive JPEG compression + quality = 50 # Very low quality to reduce size + resized_image.save( + buffer, format="JPEG", quality=quality, optimize=True + ) + else: + # For other formats + resized_image.save( + buffer, format=image.format or "PNG", optimize=True + ) + + resized_base64 = base64.b64encode(buffer.getvalue()).decode("utf-8") + + logger.debug( + f"Resized base64 length: {len(resized_base64)} chars (reduction: {100 * (1 - len(resized_base64) / len(base64_string)):.1f}%)" + ) + return resized_base64 + + except Exception as e: + logger.debug(f"Error during resize: {e}") + # If anything goes wrong, truncate the base64 to a reasonable size + if len(base64_string) > 50000: + return base64_string[:50000] + return base64_string + + +def estimate_image_tokens(width: int, height: int) -> int: + """ + Estimate the number of tokens an image will use based on Anthropic's formula. + + Args: + width: Image width in pixels + height: Image height in pixels + + Returns: + Estimated number of tokens + """ + return int((width * height) / 750) |