diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/r2r/mcp.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/r2r/mcp.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/r2r/mcp.py | 150 |
1 files changed, 150 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/r2r/mcp.py b/.venv/lib/python3.12/site-packages/r2r/mcp.py new file mode 100644 index 00000000..33490ea1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/r2r/mcp.py @@ -0,0 +1,150 @@ +# Add to your local machine with `mcp install r2r/mcp.py -v R2R_API_URL=http://localhost:7272` or so. +from r2r import R2RClient + + +def id_to_shorthand(id: str) -> str: + return str(id)[:7] + + +def format_search_results_for_llm( + results, +) -> str: + """ + Instead of resetting 'source_counter' to 1, we: + - For each chunk / graph / web / doc in `results`, + - Find the aggregator index from the collector, + - Print 'Source [X]:' with that aggregator index. + """ + lines = [] + + # We'll build a quick helper to locate aggregator indices for each object: + # Or you can rely on the fact that we've added them to the collector + # in the same order. But let's do a "lookup aggregator index" approach: + + # 1) Chunk search + if results.chunk_search_results: + lines.append("Vector Search Results:") + for c in results.chunk_search_results: + lines.append(f"Source ID [{id_to_shorthand(c.id)}]:") + lines.append(c.text or "") # or c.text[:200] to truncate + + # 2) Graph search + if results.graph_search_results: + lines.append("Graph Search Results:") + for g in results.graph_search_results: + lines.append(f"Source ID [{id_to_shorthand(g.id)}]:") + if hasattr(g.content, "summary"): + lines.append(f"Community Name: {g.content.name}") + lines.append(f"ID: {g.content.id}") + lines.append(f"Summary: {g.content.summary}") + # etc. ... + elif hasattr(g.content, "name") and hasattr( + g.content, "description" + ): + lines.append(f"Entity Name: {g.content.name}") + lines.append(f"Description: {g.content.description}") + elif ( + hasattr(g.content, "subject") + and hasattr(g.content, "predicate") + and hasattr(g.content, "object") + ): + lines.append( + f"Relationship: {g.content.subject}-{g.content.predicate}-{g.content.object}" + ) + # Add metadata if needed + + # 3) Web search + if results.web_search_results: + lines.append("Web Search Results:") + for w in results.web_search_results: + lines.append(f"Source ID [{id_to_shorthand(w.id)}]:") + lines.append(f"Title: {w.title}") + lines.append(f"Link: {w.link}") + lines.append(f"Snippet: {w.snippet}") + + # 4) Local context docs + if results.document_search_results: + lines.append("Local Context Documents:") + for doc_result in results.document_search_results: + doc_title = doc_result.title or "Untitled Document" + doc_id = doc_result.id + summary = doc_result.summary + + lines.append(f"Full Document ID: {doc_id}") + lines.append(f"Shortened Document ID: {id_to_shorthand(doc_id)}") + lines.append(f"Document Title: {doc_title}") + if summary: + lines.append(f"Summary: {summary}") + + if doc_result.chunks: + # Then each chunk inside: + for chunk in doc_result.chunks: + lines.append( + f"\nChunk ID {id_to_shorthand(chunk['id'])}:\n{chunk['text']}" + ) + + result = "\n".join(lines) + return result + + +# Create a FastMCP server + +try: + from mcp.server.fastmcp import FastMCP + + mcp = FastMCP("R2R Retrieval System") +except Exception as e: + raise ImportError( + "MCP is not installed. Please run `pip install mcp`" + ) from e + +# Pass lifespan to server +mcp = FastMCP("R2R Retrieval System") + + +# RAG query tool +@mcp.tool() +async def search(query: str) -> str: + """ + Performs a + + Args: + query: The question to answer using the knowledge base + + Returns: + A response generated based on relevant context from the knowledge base + """ + client = R2RClient() + + # Call the RAG endpoint + search_response = client.retrieval.search( + query=query, + ) + return format_search_results_for_llm(search_response.results) + + +# RAG query tool +@mcp.tool() +async def rag(query: str) -> str: + """ + Perform a Retrieval-Augmented Generation query + + Args: + query: The question to answer using the knowledge base + + Returns: + A response generated based on relevant context from the knowledge base + """ + client = R2RClient() + + # Call the RAG endpoint + rag_response = client.retrieval.rag( + query=query, + ) + + return rag_response.results.generated_answer # type: ignore + + +# Run the server if executed directly +if __name__ == "__main__": + mcp.run() |