import asyncio import json import logging import re from abc import ABCMeta from typing import AsyncGenerator, Optional, Tuple from core.base import AsyncSyncMeta, LLMChatCompletion, Message, syncable from core.base.agent import Agent, Conversation from core.utils import ( CitationTracker, SearchResultsCollector, SSEFormatter, convert_nonserializable_objects, dump_obj, find_new_citation_spans, ) logger = logging.getLogger() class CombinedMeta(AsyncSyncMeta, ABCMeta): pass def sync_wrapper(async_gen): loop = asyncio.get_event_loop() def wrapper(): try: while True: try: yield loop.run_until_complete(async_gen.__anext__()) except StopAsyncIteration: break finally: loop.run_until_complete(async_gen.aclose()) return wrapper() class R2RAgent(Agent, metaclass=CombinedMeta): def __init__(self, *args, **kwargs): self.search_results_collector = SearchResultsCollector() super().__init__(*args, **kwargs) self._reset() async def _generate_llm_summary(self, iterations_count: int) -> str: """ Generate a summary of the conversation using the LLM when max iterations are exceeded. Args: iterations_count: The number of iterations that were completed Returns: A string containing the LLM-generated summary """ try: # Get all messages in the conversation all_messages = await self.conversation.get_messages() # Create a prompt for the LLM to summarize summary_prompt = { "role": "user", "content": ( f"The conversation has reached the maximum limit of {iterations_count} iterations " f"without completing the task. Please provide a concise summary of: " f"1) The key information you've gathered that's relevant to the original query, " f"2) What you've attempted so far and why it's incomplete, and " f"3) A specific recommendation for how to proceed. " f"Keep your summary brief (3-4 sentences total) and focused on the most valuable insights. If it is possible to answer the original user query, then do so now instead." f"Start with '⚠️ **Maximum iterations exceeded**'" ), } # Create a new message list with just the conversation history and summary request summary_messages = all_messages + [summary_prompt] # Get a completion for the summary generation_config = self.get_generation_config(summary_prompt) response = await self.llm_provider.aget_completion( summary_messages, generation_config, ) return response.choices[0].message.content except Exception as e: logger.error(f"Error generating LLM summary: {str(e)}") # Fall back to basic summary if LLM generation fails return ( "⚠️ **Maximum iterations exceeded**\n\n" "The agent reached the maximum iteration limit without completing the task. " "Consider breaking your request into smaller steps or refining your query." ) def _reset(self): self._completed = False self.conversation = Conversation() @syncable async def arun( self, messages: list[Message], system_instruction: Optional[str] = None, *args, **kwargs, ) -> list[dict]: self._reset() await self._setup(system_instruction) if messages: for message in messages: await self.conversation.add_message(message) iterations_count = 0 while ( not self._completed and iterations_count < self.config.max_iterations ): iterations_count += 1 messages_list = await self.conversation.get_messages() generation_config = self.get_generation_config(messages_list[-1]) response = await self.llm_provider.aget_completion( messages_list, generation_config, ) logger.debug(f"R2RAgent response: {response}") await self.process_llm_response(response, *args, **kwargs) if not self._completed: # Generate a summary of the conversation using the LLM summary = await self._generate_llm_summary(iterations_count) await self.conversation.add_message( Message(role="assistant", content=summary) ) # Return final content all_messages: list[dict] = await self.conversation.get_messages() all_messages.reverse() output_messages = [] for message_2 in all_messages: if ( # message_2.get("content") message_2.get("content") != messages[-1].content ): output_messages.append(message_2) else: break output_messages.reverse() return output_messages async def process_llm_response( self, response: LLMChatCompletion, *args, **kwargs ) -> None: if not self._completed: message = response.choices[0].message finish_reason = response.choices[0].finish_reason if finish_reason == "stop": self._completed = True # Determine which provider we're using using_anthropic = ( "anthropic" in self.rag_generation_config.model.lower() ) # OPENAI HANDLING if not using_anthropic: if message.tool_calls: assistant_msg = Message( role="assistant", content="", tool_calls=[msg.dict() for msg in message.tool_calls], ) await self.conversation.add_message(assistant_msg) # If there are multiple tool_calls, call them sequentially here for tool_call in message.tool_calls: await self.handle_function_or_tool_call( tool_call.function.name, tool_call.function.arguments, tool_id=tool_call.id, *args, **kwargs, ) else: await self.conversation.add_message( Message(role="assistant", content=message.content) ) self._completed = True else: # First handle thinking blocks if present if ( hasattr(message, "structured_content") and message.structured_content ): # Check if structured_content contains any tool_use blocks has_tool_use = any( block.get("type") == "tool_use" for block in message.structured_content ) if not has_tool_use and message.tool_calls: # If it has thinking but no tool_use, add a separate message with structured_content assistant_msg = Message( role="assistant", structured_content=message.structured_content, # Use structured_content field ) await self.conversation.add_message(assistant_msg) # Add explicit tool_use blocks in a separate message tool_uses = [] for tool_call in message.tool_calls: # Safely parse arguments if they're a string try: if isinstance( tool_call.function.arguments, str ): input_args = json.loads( tool_call.function.arguments ) else: input_args = tool_call.function.arguments except json.JSONDecodeError: logger.error( f"Failed to parse tool arguments: {tool_call.function.arguments}" ) input_args = { "_raw": tool_call.function.arguments } tool_uses.append( { "type": "tool_use", "id": tool_call.id, "name": tool_call.function.name, "input": input_args, } ) # Add tool_use blocks as a separate assistant message with structured content if tool_uses: await self.conversation.add_message( Message( role="assistant", structured_content=tool_uses, content="", ) ) else: # If it already has tool_use or no tool_calls, preserve original structure assistant_msg = Message( role="assistant", structured_content=message.structured_content, ) await self.conversation.add_message(assistant_msg) elif message.content: # For regular text content await self.conversation.add_message( Message(role="assistant", content=message.content) ) # If there are tool calls, add them as structured content if message.tool_calls: tool_uses = [] for tool_call in message.tool_calls: # Same safe parsing as above try: if isinstance( tool_call.function.arguments, str ): input_args = json.loads( tool_call.function.arguments ) else: input_args = tool_call.function.arguments except json.JSONDecodeError: logger.error( f"Failed to parse tool arguments: {tool_call.function.arguments}" ) input_args = { "_raw": tool_call.function.arguments } tool_uses.append( { "type": "tool_use", "id": tool_call.id, "name": tool_call.function.name, "input": input_args, } ) await self.conversation.add_message( Message( role="assistant", structured_content=tool_uses ) ) # NEW CASE: Handle tool_calls with no content or structured_content elif message.tool_calls: # Create tool_uses for the message with only tool_calls tool_uses = [] for tool_call in message.tool_calls: try: if isinstance(tool_call.function.arguments, str): input_args = json.loads( tool_call.function.arguments ) else: input_args = tool_call.function.arguments except json.JSONDecodeError: logger.error( f"Failed to parse tool arguments: {tool_call.function.arguments}" ) input_args = {"_raw": tool_call.function.arguments} tool_uses.append( { "type": "tool_use", "id": tool_call.id, "name": tool_call.function.name, "input": input_args, } ) # Add tool_use blocks as a message before processing tools if tool_uses: await self.conversation.add_message( Message( role="assistant", structured_content=tool_uses, ) ) # Process the tool calls if message.tool_calls: for tool_call in message.tool_calls: await self.handle_function_or_tool_call( tool_call.function.name, tool_call.function.arguments, tool_id=tool_call.id, *args, **kwargs, ) class R2RStreamingAgent(R2RAgent): """ Base class for all streaming agents with core streaming functionality. Supports emitting messages, tool calls, and results as SSE events. """ # These two regexes will detect bracket references and then find short IDs. BRACKET_PATTERN = re.compile(r"\[([^\]]+)\]") SHORT_ID_PATTERN = re.compile( r"[A-Za-z0-9]{7,8}" ) # 7-8 chars, for example def __init__(self, *args, **kwargs): # Force streaming on if hasattr(kwargs.get("config", {}), "stream"): kwargs["config"].stream = True super().__init__(*args, **kwargs) async def arun( self, system_instruction: str | None = None, messages: list[Message] | None = None, *args, **kwargs, ) -> AsyncGenerator[str, None]: """ Main streaming entrypoint: returns an async generator of SSE lines. """ self._reset() await self._setup(system_instruction) if messages: for m in messages: await self.conversation.add_message(m) # Initialize citation tracker for this run citation_tracker = CitationTracker() # Dictionary to store citation payloads by ID citation_payloads = {} # Track all citations emitted during streaming for final persistence self.streaming_citations: list[dict] = [] async def sse_generator() -> AsyncGenerator[str, None]: pending_tool_calls = {} partial_text_buffer = "" iterations_count = 0 try: # Keep streaming until we complete while ( not self._completed and iterations_count < self.config.max_iterations ): iterations_count += 1 # 1) Get current messages msg_list = await self.conversation.get_messages() gen_cfg = self.get_generation_config( msg_list[-1], stream=True ) accumulated_thinking = "" thinking_signatures = {} # Map thinking content to signatures # 2) Start streaming from LLM llm_stream = self.llm_provider.aget_completion_stream( msg_list, gen_cfg ) async for chunk in llm_stream: delta = chunk.choices[0].delta finish_reason = chunk.choices[0].finish_reason if hasattr(delta, "thinking") and delta.thinking: # Accumulate thinking for later use in messages accumulated_thinking += delta.thinking # Emit SSE "thinking" event async for ( line ) in SSEFormatter.yield_thinking_event( delta.thinking ): yield line # Add this new handler for thinking signatures if hasattr(delta, "thinking_signature"): thinking_signatures[accumulated_thinking] = ( delta.thinking_signature ) accumulated_thinking = "" # 3) If new text, accumulate it if delta.content: partial_text_buffer += delta.content # (a) Now emit the newly streamed text as a "message" event async for line in SSEFormatter.yield_message_event( delta.content ): yield line # (b) Find new citation spans in the accumulated text new_citation_spans = find_new_citation_spans( partial_text_buffer, citation_tracker ) # Process each new citation span for cid, spans in new_citation_spans.items(): for span in spans: # Check if this is the first time we've seen this citation ID is_new_citation = ( citation_tracker.is_new_citation(cid) ) # Get payload if it's a new citation payload = None if is_new_citation: source_obj = self.search_results_collector.find_by_short_id( cid ) if source_obj: # Store payload for reuse payload = dump_obj(source_obj) citation_payloads[cid] = payload # Create citation event payload citation_data = { "id": cid, "object": "citation", "is_new": is_new_citation, "span": { "start": span[0], "end": span[1], }, } # Only include full payload for new citations if is_new_citation and payload: citation_data["payload"] = payload # Add to streaming citations for final answer self.streaming_citations.append( citation_data ) # Emit the citation event async for ( line ) in SSEFormatter.yield_citation_event( citation_data ): yield line if delta.tool_calls: for tc in delta.tool_calls: idx = tc.index if idx not in pending_tool_calls: pending_tool_calls[idx] = { "id": tc.id, "name": tc.function.name or "", "arguments": tc.function.arguments or "", } else: # Accumulate partial name/arguments if tc.function.name: pending_tool_calls[idx]["name"] = ( tc.function.name ) if tc.function.arguments: pending_tool_calls[idx][ "arguments" ] += tc.function.arguments # 5) If the stream signals we should handle "tool_calls" if finish_reason == "tool_calls": # Handle thinking if present await self._handle_thinking( thinking_signatures, accumulated_thinking ) calls_list = [] for idx in sorted(pending_tool_calls.keys()): cinfo = pending_tool_calls[idx] calls_list.append( { "tool_call_id": cinfo["id"] or f"call_{idx}", "name": cinfo["name"], "arguments": cinfo["arguments"], } ) # (a) Emit SSE "tool_call" events for c in calls_list: tc_data = self._create_tool_call_data(c) async for ( line ) in SSEFormatter.yield_tool_call_event( tc_data ): yield line # (b) Add an assistant message capturing these calls await self._add_tool_calls_message( calls_list, partial_text_buffer ) # (c) Execute each tool call in parallel await asyncio.gather( *[ self.handle_function_or_tool_call( c["name"], c["arguments"], tool_id=c["tool_call_id"], ) for c in calls_list ] ) # Reset buffer & calls pending_tool_calls.clear() partial_text_buffer = "" elif finish_reason == "stop": # Handle thinking if present await self._handle_thinking( thinking_signatures, accumulated_thinking ) # 6) The LLM is done. If we have any leftover partial text, # finalize it in the conversation if partial_text_buffer: # Create the final message with metadata including citations final_message = Message( role="assistant", content=partial_text_buffer, metadata={ "citations": self.streaming_citations }, ) # Add it to the conversation await self.conversation.add_message( final_message ) # (a) Prepare final answer with optimized citations consolidated_citations = [] # Group citations by ID with all their spans for ( cid, spans, ) in citation_tracker.get_all_spans().items(): if cid in citation_payloads: consolidated_citations.append( { "id": cid, "object": "citation", "spans": [ {"start": s[0], "end": s[1]} for s in spans ], "payload": citation_payloads[cid], } ) # Create final answer payload final_evt_payload = { "id": "msg_final", "object": "agent.final_answer", "generated_answer": partial_text_buffer, "citations": consolidated_citations, } # Emit final answer event async for ( line ) in SSEFormatter.yield_final_answer_event( final_evt_payload ): yield line # (b) Signal the end of the SSE stream yield SSEFormatter.yield_done_event() self._completed = True break # If we exit the while loop due to hitting max iterations if not self._completed: # Generate a summary using the LLM summary = await self._generate_llm_summary( iterations_count ) # Send the summary as a message event async for line in SSEFormatter.yield_message_event( summary ): yield line # Add summary to conversation with citations metadata await self.conversation.add_message( Message( role="assistant", content=summary, metadata={"citations": self.streaming_citations}, ) ) # Create and emit a final answer payload with the summary final_evt_payload = { "id": "msg_final", "object": "agent.final_answer", "generated_answer": summary, "citations": consolidated_citations, } async for line in SSEFormatter.yield_final_answer_event( final_evt_payload ): yield line # Signal the end of the SSE stream yield SSEFormatter.yield_done_event() self._completed = True except Exception as e: logger.error(f"Error in streaming agent: {str(e)}") # Emit error event for client async for line in SSEFormatter.yield_error_event( f"Agent error: {str(e)}" ): yield line # Send done event to close the stream yield SSEFormatter.yield_done_event() # Finally, we return the async generator async for line in sse_generator(): yield line async def _handle_thinking( self, thinking_signatures, accumulated_thinking ): """Process any accumulated thinking content""" if accumulated_thinking: structured_content = [ { "type": "thinking", "thinking": accumulated_thinking, # Anthropic will validate this in their API "signature": "placeholder_signature", } ] assistant_msg = Message( role="assistant", structured_content=structured_content, ) await self.conversation.add_message(assistant_msg) elif thinking_signatures: for ( accumulated_thinking, thinking_signature, ) in thinking_signatures.items(): structured_content = [ { "type": "thinking", "thinking": accumulated_thinking, # Anthropic will validate this in their API "signature": thinking_signature, } ] assistant_msg = Message( role="assistant", structured_content=structured_content, ) await self.conversation.add_message(assistant_msg) async def _add_tool_calls_message(self, calls_list, partial_text_buffer): """Add a message with tool calls to the conversation""" assistant_msg = Message( role="assistant", content=partial_text_buffer or "", tool_calls=[ { "id": c["tool_call_id"], "type": "function", "function": { "name": c["name"], "arguments": c["arguments"], }, } for c in calls_list ], ) await self.conversation.add_message(assistant_msg) def _create_tool_call_data(self, call_info): """Create tool call data structure from call info""" return { "tool_call_id": call_info["tool_call_id"], "name": call_info["name"], "arguments": call_info["arguments"], } def _create_citation_payload(self, short_id, payload): """Create citation payload for a short ID""" # This will be overridden in RAG subclasses # check if as_dict is on payload if hasattr(payload, "as_dict"): payload = payload.as_dict() if hasattr(payload, "dict"): payload = payload.dict if hasattr(payload, "to_dict"): payload = payload.to_dict() return { "id": f"{short_id}", "object": "citation", "payload": dump_obj(payload), # Will be populated in RAG agents } def _create_final_answer_payload(self, answer_text, citations): """Create the final answer payload""" # This will be extended in RAG subclasses return { "id": "msg_final", "object": "agent.final_answer", "generated_answer": answer_text, "citations": citations, } class R2RXMLStreamingAgent(R2RStreamingAgent): """ A streaming agent that parses XML-formatted responses with special handling for: - or blocks for chain-of-thought reasoning - , , blocks for tool execution """ # We treat or as the same token boundaries THOUGHT_OPEN = re.compile(r"<(Thought|think)>", re.IGNORECASE) THOUGHT_CLOSE = re.compile(r"", re.IGNORECASE) # Regexes to parse out , , , , , ACTION_PATTERN = re.compile( r"(.*?)", re.IGNORECASE | re.DOTALL ) TOOLCALLS_PATTERN = re.compile( r"(.*?)", re.IGNORECASE | re.DOTALL ) TOOLCALL_PATTERN = re.compile( r"(.*?)", re.IGNORECASE | re.DOTALL ) NAME_PATTERN = re.compile(r"(.*?)", re.IGNORECASE | re.DOTALL) PARAMS_PATTERN = re.compile( r"(.*?)", re.IGNORECASE | re.DOTALL ) RESPONSE_PATTERN = re.compile( r"(.*?)", re.IGNORECASE | re.DOTALL ) async def arun( self, system_instruction: str | None = None, messages: list[Message] | None = None, *args, **kwargs, ) -> AsyncGenerator[str, None]: """ Main streaming entrypoint: returns an async generator of SSE lines. """ self._reset() await self._setup(system_instruction) if messages: for m in messages: await self.conversation.add_message(m) # Initialize citation tracker for this run citation_tracker = CitationTracker() # Dictionary to store citation payloads by ID citation_payloads = {} # Track all citations emitted during streaming for final persistence self.streaming_citations: list[dict] = [] async def sse_generator() -> AsyncGenerator[str, None]: iterations_count = 0 try: # Keep streaming until we complete while ( not self._completed and iterations_count < self.config.max_iterations ): iterations_count += 1 # 1) Get current messages msg_list = await self.conversation.get_messages() gen_cfg = self.get_generation_config( msg_list[-1], stream=True ) # 2) Start streaming from LLM llm_stream = self.llm_provider.aget_completion_stream( msg_list, gen_cfg ) # Create state variables for each iteration iteration_buffer = "" yielded_first_event = False in_action_block = False is_thinking = False accumulated_thinking = "" thinking_signatures = {} async for chunk in llm_stream: delta = chunk.choices[0].delta finish_reason = chunk.choices[0].finish_reason # Handle thinking if present if hasattr(delta, "thinking") and delta.thinking: # Accumulate thinking for later use in messages accumulated_thinking += delta.thinking # Emit SSE "thinking" event async for ( line ) in SSEFormatter.yield_thinking_event( delta.thinking ): yield line # Add this new handler for thinking signatures if hasattr(delta, "thinking_signature"): thinking_signatures[accumulated_thinking] = ( delta.thinking_signature ) accumulated_thinking = "" # 3) If new text, accumulate it if delta.content: iteration_buffer += delta.content # Check if we have accumulated enough text for a `` block if len(iteration_buffer) < len(""): continue # Check if we have yielded the first event if not yielded_first_event: # Emit the first chunk if self.THOUGHT_OPEN.findall(iteration_buffer): is_thinking = True async for ( line ) in SSEFormatter.yield_thinking_event( iteration_buffer ): yield line else: async for ( line ) in SSEFormatter.yield_message_event( iteration_buffer ): yield line # Mark as yielded yielded_first_event = True continue # Check if we are in a thinking block if is_thinking: # Still thinking, so keep yielding thinking events if not self.THOUGHT_CLOSE.findall( iteration_buffer ): # Emit SSE "thinking" event async for ( line ) in SSEFormatter.yield_thinking_event( delta.content ): yield line continue # Done thinking, so emit the last thinking event else: is_thinking = False thought_text = delta.content.split( "" )[0].split("")[0] async for ( line ) in SSEFormatter.yield_thinking_event( thought_text ): yield line post_thought_text = delta.content.split( "" )[-1].split("")[-1] delta.content = post_thought_text # (b) Find new citation spans in the accumulated text new_citation_spans = find_new_citation_spans( iteration_buffer, citation_tracker ) # Process each new citation span for cid, spans in new_citation_spans.items(): for span in spans: # Check if this is the first time we've seen this citation ID is_new_citation = ( citation_tracker.is_new_citation(cid) ) # Get payload if it's a new citation payload = None if is_new_citation: source_obj = self.search_results_collector.find_by_short_id( cid ) if source_obj: # Store payload for reuse payload = dump_obj(source_obj) citation_payloads[cid] = payload # Create citation event payload citation_data = { "id": cid, "object": "citation", "is_new": is_new_citation, "span": { "start": span[0], "end": span[1], }, } # Only include full payload for new citations if is_new_citation and payload: citation_data["payload"] = payload # Add to streaming citations for final answer self.streaming_citations.append( citation_data ) # Emit the citation event async for ( line ) in SSEFormatter.yield_citation_event( citation_data ): yield line # Now prepare to emit the newly streamed text as a "message" event if ( iteration_buffer.count("<") and not in_action_block ): in_action_block = True if ( in_action_block and len( self.ACTION_PATTERN.findall( iteration_buffer ) ) < 2 ): continue elif in_action_block: in_action_block = False # Emit the post action block text, if it is there post_action_text = iteration_buffer.split( "" )[-1] if post_action_text: async for ( line ) in SSEFormatter.yield_message_event( post_action_text ): yield line else: async for ( line ) in SSEFormatter.yield_message_event( delta.content ): yield line elif finish_reason == "stop": break # Process any accumulated thinking await self._handle_thinking( thinking_signatures, accumulated_thinking ) # 6) The LLM is done. If we have any leftover partial text, # finalize it in the conversation if iteration_buffer: # Create the final message with metadata including citations final_message = Message( role="assistant", content=iteration_buffer, metadata={"citations": self.streaming_citations}, ) # Add it to the conversation await self.conversation.add_message(final_message) # --- 4) Process any / blocks, or mark completed action_matches = self.ACTION_PATTERN.findall( iteration_buffer ) if len(action_matches) > 0: # Process each ToolCall xml_toolcalls = "" for action_block in action_matches: tool_calls_text = [] # Look for ToolCalls wrapper, or use the raw action block calls_wrapper = self.TOOLCALLS_PATTERN.findall( action_block ) if calls_wrapper: for tw in calls_wrapper: tool_calls_text.append(tw) else: tool_calls_text.append(action_block) for calls_region in tool_calls_text: calls_found = self.TOOLCALL_PATTERN.findall( calls_region ) for tc_block in calls_found: tool_name, tool_params = ( self._parse_single_tool_call(tc_block) ) if tool_name: # Emit SSE event for tool call tool_call_id = ( f"call_{abs(hash(tc_block))}" ) call_evt_data = { "tool_call_id": tool_call_id, "name": tool_name, "arguments": json.dumps( tool_params ), } async for line in ( SSEFormatter.yield_tool_call_event( call_evt_data ) ): yield line try: tool_result = await self.handle_function_or_tool_call( tool_name, json.dumps(tool_params), tool_id=tool_call_id, save_messages=False, ) result_content = tool_result.llm_formatted_result except Exception as e: result_content = f"Error in tool '{tool_name}': {str(e)}" xml_toolcalls += ( f"" f"{tool_name}" f"{json.dumps(tool_params)}" f"{result_content}" f"" ) # Emit SSE tool result for non-result tools result_data = { "tool_call_id": tool_call_id, "role": "tool", "content": json.dumps( convert_nonserializable_objects( result_content ) ), } async for line in SSEFormatter.yield_tool_result_event( result_data ): yield line xml_toolcalls += "" pre_action_text = iteration_buffer[ : iteration_buffer.find(action_block) ] post_action_text = iteration_buffer[ iteration_buffer.find(action_block) + len(action_block) : ] iteration_text = ( pre_action_text + xml_toolcalls + post_action_text ) # Update the conversation with tool results await self.conversation.add_message( Message( role="assistant", content=iteration_text, metadata={ "citations": self.streaming_citations }, ) ) else: # (a) Prepare final answer with optimized citations consolidated_citations = [] # Group citations by ID with all their spans for ( cid, spans, ) in citation_tracker.get_all_spans().items(): if cid in citation_payloads: consolidated_citations.append( { "id": cid, "object": "citation", "spans": [ {"start": s[0], "end": s[1]} for s in spans ], "payload": citation_payloads[cid], } ) # Create final answer payload final_evt_payload = { "id": "msg_final", "object": "agent.final_answer", "generated_answer": iteration_buffer, "citations": consolidated_citations, } # Emit final answer event async for ( line ) in SSEFormatter.yield_final_answer_event( final_evt_payload ): yield line # (b) Signal the end of the SSE stream yield SSEFormatter.yield_done_event() self._completed = True # If we exit the while loop due to hitting max iterations if not self._completed: # Generate a summary using the LLM summary = await self._generate_llm_summary( iterations_count ) # Send the summary as a message event async for line in SSEFormatter.yield_message_event( summary ): yield line # Add summary to conversation with citations metadata await self.conversation.add_message( Message( role="assistant", content=summary, metadata={"citations": self.streaming_citations}, ) ) # Create and emit a final answer payload with the summary final_evt_payload = { "id": "msg_final", "object": "agent.final_answer", "generated_answer": summary, "citations": consolidated_citations, } async for line in SSEFormatter.yield_final_answer_event( final_evt_payload ): yield line # Signal the end of the SSE stream yield SSEFormatter.yield_done_event() self._completed = True except Exception as e: logger.error(f"Error in streaming agent: {str(e)}") # Emit error event for client async for line in SSEFormatter.yield_error_event( f"Agent error: {str(e)}" ): yield line # Send done event to close the stream yield SSEFormatter.yield_done_event() # Finally, we return the async generator async for line in sse_generator(): yield line def _parse_single_tool_call( self, toolcall_text: str ) -> Tuple[Optional[str], dict]: """ Parse a ToolCall block to extract the name and parameters. Args: toolcall_text: The text content of a ToolCall block Returns: Tuple of (tool_name, tool_parameters) """ name_match = self.NAME_PATTERN.search(toolcall_text) if not name_match: return None, {} tool_name = name_match.group(1).strip() params_match = self.PARAMS_PATTERN.search(toolcall_text) if not params_match: return tool_name, {} raw_params = params_match.group(1).strip() try: # Handle potential JSON parsing issues # First try direct parsing tool_params = json.loads(raw_params) except json.JSONDecodeError: # If that fails, try to clean up the JSON string try: # Replace escaped quotes that might cause issues cleaned_params = raw_params.replace('\\"', '"') # Try again with the cleaned string tool_params = json.loads(cleaned_params) except json.JSONDecodeError: # If all else fails, treat as a plain string value tool_params = {"value": raw_params} return tool_name, tool_params class R2RXMLToolsAgent(R2RAgent): """ A non-streaming agent that: - parses or blocks as chain-of-thought - filters out XML tags related to tool calls and actions - processes blocks - properly extracts citations when they appear in the text """ # We treat or as the same token boundaries THOUGHT_OPEN = re.compile(r"<(Thought|think)>", re.IGNORECASE) THOUGHT_CLOSE = re.compile(r"", re.IGNORECASE) # Regexes to parse out , , , , , ACTION_PATTERN = re.compile( r"(.*?)", re.IGNORECASE | re.DOTALL ) TOOLCALLS_PATTERN = re.compile( r"(.*?)", re.IGNORECASE | re.DOTALL ) TOOLCALL_PATTERN = re.compile( r"(.*?)", re.IGNORECASE | re.DOTALL ) NAME_PATTERN = re.compile(r"(.*?)", re.IGNORECASE | re.DOTALL) PARAMS_PATTERN = re.compile( r"(.*?)", re.IGNORECASE | re.DOTALL ) RESPONSE_PATTERN = re.compile( r"(.*?)", re.IGNORECASE | re.DOTALL ) async def process_llm_response(self, response, *args, **kwargs): """ Override the base process_llm_response to handle XML structured responses including thoughts and tool calls. """ if self._completed: return message = response.choices[0].message finish_reason = response.choices[0].finish_reason if not message.content: # If there's no content, let the parent class handle the normal tool_calls flow return await super().process_llm_response( response, *args, **kwargs ) # Get the response content content = message.content # HACK for gemini content = content.replace("```action", "") content = content.replace("```tool_code", "") content = content.replace("```", "") if ( not content.startswith("<") and "deepseek" in self.rag_generation_config.model ): # HACK - fix issues with adding `` to the beginning content = "" + content # Process any tool calls in the content action_matches = self.ACTION_PATTERN.findall(content) if action_matches: xml_toolcalls = "" for action_block in action_matches: tool_calls_text = [] # Look for ToolCalls wrapper, or use the raw action block calls_wrapper = self.TOOLCALLS_PATTERN.findall(action_block) if calls_wrapper: for tw in calls_wrapper: tool_calls_text.append(tw) else: tool_calls_text.append(action_block) # Process each ToolCall for calls_region in tool_calls_text: calls_found = self.TOOLCALL_PATTERN.findall(calls_region) for tc_block in calls_found: tool_name, tool_params = self._parse_single_tool_call( tc_block ) if tool_name: tool_call_id = f"call_{abs(hash(tc_block))}" try: tool_result = ( await self.handle_function_or_tool_call( tool_name, json.dumps(tool_params), tool_id=tool_call_id, save_messages=False, ) ) # Add tool result to XML xml_toolcalls += ( f"" f"{tool_name}" f"{json.dumps(tool_params)}" f"{tool_result.llm_formatted_result}" f"" ) except Exception as e: logger.error(f"Error in tool call: {str(e)}") # Add error to XML xml_toolcalls += ( f"" f"{tool_name}" f"{json.dumps(tool_params)}" f"Error: {str(e)}" f"" ) xml_toolcalls += "" pre_action_text = content[: content.find(action_block)] post_action_text = content[ content.find(action_block) + len(action_block) : ] iteration_text = pre_action_text + xml_toolcalls + post_action_text # Create the assistant message await self.conversation.add_message( Message(role="assistant", content=iteration_text) ) else: # Create an assistant message with the content as-is await self.conversation.add_message( Message(role="assistant", content=content) ) # Only mark as completed if the finish_reason is "stop" or there are no action calls # This allows the agent to continue the conversation when tool calls are processed if finish_reason == "stop": self._completed = True def _parse_single_tool_call( self, toolcall_text: str ) -> Tuple[Optional[str], dict]: """ Parse a ToolCall block to extract the name and parameters. Args: toolcall_text: The text content of a ToolCall block Returns: Tuple of (tool_name, tool_parameters) """ name_match = self.NAME_PATTERN.search(toolcall_text) if not name_match: return None, {} tool_name = name_match.group(1).strip() params_match = self.PARAMS_PATTERN.search(toolcall_text) if not params_match: return tool_name, {} raw_params = params_match.group(1).strip() try: # Handle potential JSON parsing issues # First try direct parsing tool_params = json.loads(raw_params) except json.JSONDecodeError: # If that fails, try to clean up the JSON string try: # Replace escaped quotes that might cause issues cleaned_params = raw_params.replace('\\"', '"') # Try again with the cleaned string tool_params = json.loads(cleaned_params) except json.JSONDecodeError: # If all else fails, treat as a plain string value tool_params = {"value": raw_params} return tool_name, tool_params