import json
import os
import subprocess
import time
import uuid
import click
from dotenv import load_dotenv
from r2r.main.execution import R2RExecutionWrapper
class JsonParamType(click.ParamType):
name = "json"
def convert(self, value, param, ctx):
try:
return json.loads(value)
except json.JSONDecodeError:
self.fail(f"'{value}' is not a valid JSON string", param, ctx)
JSON = JsonParamType()
@click.group()
@click.option(
"--config-path", default=None, help="Path to the configuration file"
)
@click.option(
"--config-name", default=None, help="Name of the configuration to use"
)
@click.option("--client-mode", default=True, help="Run in client mode")
@click.option(
"--base-url",
default="http://localhost:8000",
help="Base URL for client mode",
)
@click.pass_context
def cli(ctx, config_path, config_name, client_mode, base_url):
"""R2R CLI for all core operations."""
if config_path and config_name:
raise click.UsageError(
"Cannot specify both config_path and config_name"
)
# Convert relative config path to absolute path
if config_path:
config_path = os.path.abspath(config_path)
if ctx.invoked_subcommand != "serve":
ctx.obj = R2RExecutionWrapper(
config_path,
config_name,
client_mode if ctx.invoked_subcommand != "serve" else False,
base_url,
)
else:
ctx.obj = {
"config_path": config_path,
"config_name": config_name,
"base_url": base_url,
}
@cli.command()
@click.option("--host", default="0.0.0.0", help="Host to run the server on")
@click.option("--port", default=8000, help="Port to run the server on")
@click.option("--docker", is_flag=True, help="Run using Docker")
@click.option(
"--docker-ext-neo4j",
is_flag=True,
help="Run using Docker with external Neo4j",
)
@click.option("--project-name", default="r2r", help="Project name for Docker")
@click.pass_obj
def serve(obj, host, port, docker, docker_ext_neo4j, project_name):
"""Start the R2R server."""
# Load environment variables from .env file if it exists
load_dotenv()
if docker:
if x := obj.get("config_path", None):
os.environ["CONFIG_PATH"] = x
else:
os.environ["CONFIG_NAME"] = (
obj.get("config_name", None) or "default"
)
os.environ["OLLAMA_API_BASE"] = "http://host.docker.internal:11434"
# Check if compose files exist in the package directory
package_dir = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "..", ".."
)
compose_yaml = os.path.join(package_dir, "compose.yaml")
compose_neo4j_yaml = os.path.join(package_dir, "compose.neo4j.yaml")
if not os.path.exists(compose_yaml) or not os.path.exists(
compose_neo4j_yaml
):
click.echo(
"Error: Docker Compose files not found in the package directory."
)
return
# Build the docker-compose command with the specified host and port
docker_command = f"docker-compose -f {compose_yaml}"
if docker_ext_neo4j:
docker_command += f" -f {compose_neo4j_yaml}"
if host != "0.0.0.0" or port != 8000:
docker_command += (
f" --build-arg HOST={host} --build-arg PORT={port}"
)
docker_command += f" --project-name {project_name}"
docker_command += " up -d"
os.system(docker_command)
else:
wrapper = R2RExecutionWrapper(**obj, client_mode=False)
wrapper.serve(host, port)
@cli.command()
@click.option(
"--volumes",
is_flag=True,
help="Remove named volumes declared in the `volumes` section of the Compose file",
)
@click.option(
"--remove-orphans",
is_flag=True,
help="Remove containers for services not defined in the Compose file",
)
@click.option("--project-name", default="r2r", help="Project name for Docker")
@click.pass_context
def docker_down(ctx, volumes, remove_orphans, project_name):
"""Bring down the Docker Compose setup and attempt to remove the network if necessary."""
package_dir = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "..", ".."
)
compose_yaml = os.path.join(package_dir, "compose.yaml")
compose_neo4j_yaml = os.path.join(package_dir, "compose.neo4j.yaml")
if not os.path.exists(compose_yaml) or not os.path.exists(
compose_neo4j_yaml
):
click.echo(
"Error: Docker Compose files not found in the package directory."
)
return
docker_command = (
f"docker-compose -f {compose_yaml} -f {compose_neo4j_yaml}"
)
docker_command += f" --project-name {project_name}"
if volumes:
docker_command += " --volumes"
if remove_orphans:
docker_command += " --remove-orphans"
docker_command += " down"
click.echo("Bringing down Docker Compose setup...")
result = os.system(docker_command)
if result != 0:
click.echo(
"An error occurred while bringing down the Docker Compose setup. Attempting to remove the network..."
)
# Get the list of networks
networks = (
subprocess.check_output(
["docker", "network", "ls", "--format", "{{.Name}}"]
)
.decode()
.split()
)
# Find the r2r network
r2r_network = next(
(
network
for network in networks
if network.startswith("r2r_") and "network" in network
),
None,
)
if r2r_network:
# Try to remove the network
for _ in range(1): # Try 1 extra times
remove_command = f"docker network rm {r2r_network}"
remove_result = os.system(remove_command)
if remove_result == 0:
click.echo(f"Successfully removed network: {r2r_network}")
return
else:
click.echo(
f"Failed to remove network: {r2r_network}. Retrying in 5 seconds..."
)
time.sleep(5)
click.echo(
"Failed to remove the network after multiple attempts. Please try the following steps:"
)
click.echo(
"1. Run 'docker ps' to check for any running containers using this network."
)
click.echo(
"2. Stop any running containers with 'docker stop <container_id>'."
)
click.echo(
f"3. Try removing the network manually with 'docker network rm {r2r_network}'."
)
click.echo(
"4. If the above steps don't work, you may need to restart the Docker daemon."
)
else:
click.echo("Could not find the r2r network to remove.")
else:
click.echo("Docker Compose setup has been successfully brought down.")
@cli.command()
@click.argument("file-paths", nargs=-1)
@click.option(
"--document-ids", multiple=True, help="Document IDs for ingestion"
)
@click.option("--metadatas", multiple=True, help="Metadatas for ingestion")
@click.option(
"--versions",
multiple=True,
help="Starting version for ingested files (e.g. `v1`)",
)
@click.pass_obj
def ingest_files(obj, file_paths, document_ids, metadatas, versions):
"""Ingest files into R2R."""
t0 = time.time()
# Default to None if empty tuples are provided
document_ids = None if not document_ids else list(document_ids)
metadatas = None if not metadatas else list(metadatas)
versions = None if not versions else list(versions)
response = obj.ingest_files(
list(file_paths), document_ids, metadatas, versions
)
t1 = time.time()
click.echo(f"Time taken to ingest files: {t1 - t0:.2f} seconds")
click.echo(response)
@cli.command()
@click.argument("file-paths", nargs=-1)
@click.option(
"--document-ids", multiple=True, help="Document IDs for ingestion"
)
@click.option("--metadatas", multiple=True, help="Metadatas for ingestion")
@click.pass_obj
def update_files(obj, file_paths, document_ids, metadatas):
"""Ingest files into R2R."""
t0 = time.time()
# Default to None if empty tuples are provided
metadatas = None if not metadatas else list(metadatas)
response = obj.update_files(
list(file_paths), list(document_ids), metadatas
)
t1 = time.time()
click.echo(f"Time taken to ingest files: {t1 - t0:.2f} seconds")
click.echo(response)
@cli.command()
@click.option(
"--query", prompt="Enter your search query", help="The search query"
)
@click.option(
"--use-vector-search", is_flag=True, default=True, help="Use vector search"
)
@click.option(
"--search-filters", type=JsonParamType(), help="Search filters as JSON"
)
@click.option(
"--search-limit", default=10, help="Number of search results to return"
)
@click.option("--do-hybrid-search", is_flag=True, help="Perform hybrid search")
@click.option(
"--use-kg-search", is_flag=True, help="Use knowledge graph search"
)
@click.option("--kg-agent-model", default=None, help="Model for KG agent")
@click.pass_obj
def search(
obj,
query,
use_vector_search,
search_filters,
search_limit,
do_hybrid_search,
use_kg_search,
kg_agent_model,
):
"""Perform a search query."""
kg_agent_generation_config = {}
if kg_agent_model:
kg_agent_generation_config["model"] = kg_agent_model
t0 = time.time()
results = obj.search(
query,
use_vector_search,
search_filters,
search_limit,
do_hybrid_search,
use_kg_search,
kg_agent_generation_config,
)
if isinstance(results, dict) and "results" in results:
results = results["results"]
if "vector_search_results" in results:
click.echo("Vector search results:")
for result in results["vector_search_results"]:
click.echo(result)
if "kg_search_results" in results and results["kg_search_results"]:
click.echo("KG search results:", results["kg_search_results"])
t1 = time.time()
click.echo(f"Time taken to search: {t1 - t0:.2f} seconds")
@cli.command()
@click.option("--query", prompt="Enter your query", help="The query for RAG")
@click.option(
"--use-vector-search", is_flag=True, default=True, help="Use vector search"
)
@click.option(
"--search-filters", type=JsonParamType(), help="Search filters as JSON"
)
@click.option(
"--search-limit", default=10, help="Number of search results to return"
)
@click.option("--do-hybrid-search", is_flag=True, help="Perform hybrid search")
@click.option(
"--use-kg-search", is_flag=True, help="Use knowledge graph search"
)
@click.option("--kg-agent-model", default=None, help="Model for KG agent")
@click.option("--stream", is_flag=True, help="Stream the RAG response")
@click.option("--rag-model", default=None, help="Model for RAG")
@click.pass_obj
def rag(
obj,
query,
use_vector_search,
search_filters,
search_limit,
do_hybrid_search,
use_kg_search,
kg_agent_model,
stream,
rag_model,
):
"""Perform a RAG query."""
kg_agent_generation_config = {}
if kg_agent_model:
kg_agent_generation_config = {"model": kg_agent_model}
rag_generation_config = {"stream": stream}
if rag_model:
rag_generation_config["model"] = rag_model
t0 = time.time()
response = obj.rag(
query,
use_vector_search,
search_filters,
search_limit,
do_hybrid_search,
use_kg_search,
kg_agent_generation_config,
stream,
rag_generation_config,
)
if stream:
for chunk in response:
click.echo(chunk, nl=False)
click.echo()
else:
if obj.client_mode:
click.echo(f"Search Results:\n{response['search_results']}")
click.echo(f"Completion:\n{response['completion']}")
else:
click.echo(f"Search Results:\n{response.search_results}")
click.echo(f"Completion:\n{response.completion}")
t1 = time.time()
click.echo(f"Time taken for RAG: {t1 - t0:.2f} seconds")
@cli.command()
@click.option("--keys", multiple=True, help="Keys for deletion")
@click.option("--values", multiple=True, help="Values for deletion")
@click.pass_obj
def delete(obj, keys, values):
"""Delete documents based on keys and values."""
if len(keys) != len(values):
raise click.UsageError("Number of keys must match number of values")
t0 = time.time()
response = obj.delete(list(keys), list(values))
t1 = time.time()
click.echo(response)
click.echo(f"Time taken for deletion: {t1 - t0:.2f} seconds")
@cli.command()
@click.option("--log-type-filter", help="Filter for log types")
@click.pass_obj
def logs(obj, log_type_filter):
"""Retrieve logs with optional type filter."""
t0 = time.time()
response = obj.logs(log_type_filter)
t1 = time.time()
click.echo(response)
click.echo(f"Time taken to retrieve logs: {t1 - t0:.2f} seconds")
@cli.command()
@click.option("--document-ids", multiple=True, help="Document IDs to overview")
@click.option("--user-ids", multiple=True, help="User IDs to overview")
@click.pass_obj
def documents_overview(obj, document_ids, user_ids):
"""Get an overview of documents."""
document_ids = list(document_ids) if document_ids else None
user_ids = list(user_ids) if user_ids else None
t0 = time.time()
response = obj.documents_overview(document_ids, user_ids)
t1 = time.time()
for document in response:
click.echo(document)
click.echo(f"Time taken to get document overview: {t1 - t0:.2f} seconds")
@cli.command()
@click.argument("document_id")
@click.pass_obj
def document_chunks(obj, document_id):
"""Get chunks of a specific document."""
t0 = time.time()
response = obj.document_chunks(document_id)
t1 = time.time()
for chunk in response:
click.echo(chunk)
click.echo(f"Time taken to get document chunks: {t1 - t0:.2f} seconds")
@cli.command()
@click.pass_obj
def app_settings(obj):
"""Retrieve application settings."""
t0 = time.time()
response = obj.app_settings()
t1 = time.time()
click.echo(response)
click.echo(f"Time taken to get app settings: {t1 - t0:.2f} seconds")
@cli.command()
@click.option("--user-ids", multiple=True, help="User IDs to overview")
@click.pass_obj
def users_overview(obj, user_ids):
"""Get an overview of users."""
user_ids = (
[uuid.UUID(user_id) for user_id in user_ids] if user_ids else None
)
t0 = time.time()
response = obj.users_overview(user_ids)
t1 = time.time()
for user in response:
click.echo(user)
click.echo(f"Time taken to get users overview: {t1 - t0:.2f} seconds")
@cli.command()
@click.option(
"--filters", type=JsonParamType(), help="Filters for analytics as JSON"
)
@click.option(
"--analysis-types", type=JsonParamType(), help="Analysis types as JSON"
)
@click.pass_obj
def analytics(obj, filters, analysis_types):
"""Retrieve analytics data."""
t0 = time.time()
response = obj.analytics(filters, analysis_types)
t1 = time.time()
click.echo(response)
click.echo(f"Time taken to get analytics: {t1 - t0:.2f} seconds")
@cli.command()
@click.option(
"--limit", default=100, help="Limit the number of relationships returned"
)
@click.pass_obj
def inspect_knowledge_graph(obj, limit):
"""Print relationships from the knowledge graph."""
t0 = time.time()
response = obj.inspect_knowledge_graph(limit)
t1 = time.time()
click.echo(response)
click.echo(f"Time taken to print relationships: {t1 - t0:.2f} seconds")
@cli.command()
@click.option(
"--no-media",
default=True,
help="Exclude media files from ingestion",
)
@click.option("--option", default=0, help="Which file to ingest?")
@click.pass_obj
def ingest_sample_file(obj, no_media, option):
t0 = time.time()
response = obj.ingest_sample_file(no_media=no_media, option=option)
t1 = time.time()
click.echo(response)
click.echo(f"Time taken to ingest sample: {t1 - t0:.2f} seconds")
@cli.command()
@click.option(
"--no-media",
default=True,
help="Exclude media files from ingestion",
)
@click.pass_obj
def ingest_sample_files(obj, no_media):
"""Ingest all sample files into R2R."""
t0 = time.time()
response = obj.ingest_sample_files(no_media=no_media)
t1 = time.time()
click.echo(response)
click.echo(f"Time taken to ingest sample files: {t1 - t0:.2f} seconds")
@cli.command()
@click.pass_obj
def health(obj):
"""Check the health of the server."""
t0 = time.time()
response = obj.health()
t1 = time.time()
click.echo(response)
click.echo(f"Time taken to ingest sample: {t1 - t0:.2f} seconds")
@cli.command()
def version():
"""Print the version of R2R."""
from importlib.metadata import version
click.echo(version("r2r"))
def main():
cli()
if __name__ == "__main__":
main()