import os
from typing import Optional, Type
from r2r.base import (
AsyncPipe,
EmbeddingProvider,
EvalProvider,
LLMProvider,
PromptProvider,
VectorDBProvider,
)
from r2r.pipelines import (
EvalPipeline,
IngestionPipeline,
RAGPipeline,
SearchPipeline,
)
from ..app import R2RApp
from ..engine import R2REngine
from ..r2r import R2R
from .config import R2RConfig
from .factory import R2RPipeFactory, R2RPipelineFactory, R2RProviderFactory
class R2RBuilder:
current_file_path = os.path.dirname(__file__)
config_root = os.path.join(
current_file_path, "..", "..", "examples", "configs"
)
CONFIG_OPTIONS = {
"default": None,
"local_ollama": os.path.join(config_root, "local_ollama.json"),
"local_ollama_rerank": os.path.join(
config_root, "local_ollama_rerank.json"
),
"neo4j_kg": os.path.join(config_root, "neo4j_kg.json"),
"local_neo4j_kg": os.path.join(config_root, "local_neo4j_kg.json"),
"postgres_logging": os.path.join(config_root, "postgres_logging.json"),
}
@staticmethod
def _get_config(config_name):
if config_name is None:
return R2RConfig.from_json()
if config_name in R2RBuilder.CONFIG_OPTIONS:
return R2RConfig.from_json(R2RBuilder.CONFIG_OPTIONS[config_name])
raise ValueError(f"Invalid config name: {config_name}")
def __init__(
self,
config: Optional[R2RConfig] = None,
from_config: Optional[str] = None,
):
if config and from_config:
raise ValueError("Cannot specify both config and config_name")
self.config = config or R2RBuilder._get_config(from_config)
self.r2r_app_override: Optional[Type[R2REngine]] = None
self.provider_factory_override: Optional[Type[R2RProviderFactory]] = (
None
)
self.pipe_factory_override: Optional[R2RPipeFactory] = None
self.pipeline_factory_override: Optional[R2RPipelineFactory] = None
self.vector_db_provider_override: Optional[VectorDBProvider] = None
self.embedding_provider_override: Optional[EmbeddingProvider] = None
self.eval_provider_override: Optional[EvalProvider] = None
self.llm_provider_override: Optional[LLMProvider] = None
self.prompt_provider_override: Optional[PromptProvider] = None
self.parsing_pipe_override: Optional[AsyncPipe] = None
self.embedding_pipe_override: Optional[AsyncPipe] = None
self.vector_storage_pipe_override: Optional[AsyncPipe] = None
self.vector_search_pipe_override: Optional[AsyncPipe] = None
self.rag_pipe_override: Optional[AsyncPipe] = None
self.streaming_rag_pipe_override: Optional[AsyncPipe] = None
self.eval_pipe_override: Optional[AsyncPipe] = None
self.ingestion_pipeline: Optional[IngestionPipeline] = None
self.search_pipeline: Optional[SearchPipeline] = None
self.rag_pipeline: Optional[RAGPipeline] = None
self.streaming_rag_pipeline: Optional[RAGPipeline] = None
self.eval_pipeline: Optional[EvalPipeline] = None
def with_app(self, app: Type[R2REngine]):
self.r2r_app_override = app
return self
def with_provider_factory(self, factory: Type[R2RProviderFactory]):
self.provider_factory_override = factory
return self
def with_pipe_factory(self, factory: R2RPipeFactory):
self.pipe_factory_override = factory
return self
def with_pipeline_factory(self, factory: R2RPipelineFactory):
self.pipeline_factory_override = factory
return self
def with_vector_db_provider(self, provider: VectorDBProvider):
self.vector_db_provider_override = provider
return self
def with_embedding_provider(self, provider: EmbeddingProvider):
self.embedding_provider_override = provider
return self
def with_eval_provider(self, provider: EvalProvider):
self.eval_provider_override = provider
return self
def with_llm_provider(self, provider: LLMProvider):
self.llm_provider_override = provider
return self
def with_prompt_provider(self, provider: PromptProvider):
self.prompt_provider_override = provider
return self
def with_parsing_pipe(self, pipe: AsyncPipe):
self.parsing_pipe_override = pipe
return self
def with_embedding_pipe(self, pipe: AsyncPipe):
self.embedding_pipe_override = pipe
return self
def with_vector_storage_pipe(self, pipe: AsyncPipe):
self.vector_storage_pipe_override = pipe
return self
def with_vector_search_pipe(self, pipe: AsyncPipe):
self.vector_search_pipe_override = pipe
return self
def with_rag_pipe(self, pipe: AsyncPipe):
self.rag_pipe_override = pipe
return self
def with_streaming_rag_pipe(self, pipe: AsyncPipe):
self.streaming_rag_pipe_override = pipe
return self
def with_eval_pipe(self, pipe: AsyncPipe):
self.eval_pipe_override = pipe
return self
def with_ingestion_pipeline(self, pipeline: IngestionPipeline):
self.ingestion_pipeline = pipeline
return self
def with_vector_search_pipeline(self, pipeline: SearchPipeline):
self.search_pipeline = pipeline
return self
def with_rag_pipeline(self, pipeline: RAGPipeline):
self.rag_pipeline = pipeline
return self
def with_streaming_rag_pipeline(self, pipeline: RAGPipeline):
self.streaming_rag_pipeline = pipeline
return self
def with_eval_pipeline(self, pipeline: EvalPipeline):
self.eval_pipeline = pipeline
return self
def build(self, *args, **kwargs) -> R2R:
provider_factory = self.provider_factory_override or R2RProviderFactory
pipe_factory = self.pipe_factory_override or R2RPipeFactory
pipeline_factory = self.pipeline_factory_override or R2RPipelineFactory
providers = provider_factory(self.config).create_providers(
vector_db_provider_override=self.vector_db_provider_override,
embedding_provider_override=self.embedding_provider_override,
eval_provider_override=self.eval_provider_override,
llm_provider_override=self.llm_provider_override,
prompt_provider_override=self.prompt_provider_override,
*args,
**kwargs,
)
pipes = pipe_factory(self.config, providers).create_pipes(
parsing_pipe_override=self.parsing_pipe_override,
embedding_pipe_override=self.embedding_pipe_override,
vector_storage_pipe_override=self.vector_storage_pipe_override,
vector_search_pipe_override=self.vector_search_pipe_override,
rag_pipe_override=self.rag_pipe_override,
streaming_rag_pipe_override=self.streaming_rag_pipe_override,
eval_pipe_override=self.eval_pipe_override,
*args,
**kwargs,
)
pipelines = pipeline_factory(self.config, pipes).create_pipelines(
ingestion_pipeline=self.ingestion_pipeline,
search_pipeline=self.search_pipeline,
rag_pipeline=self.rag_pipeline,
streaming_rag_pipeline=self.streaming_rag_pipeline,
eval_pipeline=self.eval_pipeline,
*args,
**kwargs,
)
engine = (self.r2r_app_override or R2REngine)(
self.config, providers, pipelines
)
r2r_app = R2RApp(engine)
return R2R(engine=engine, app=r2r_app)