import logging
import os
from typing import Any, Optional
from r2r.base import (
AsyncPipe,
EmbeddingConfig,
EmbeddingProvider,
EvalProvider,
KGProvider,
KVLoggingSingleton,
LLMConfig,
LLMProvider,
PromptProvider,
VectorDBConfig,
VectorDBProvider,
)
from r2r.pipelines import (
EvalPipeline,
IngestionPipeline,
RAGPipeline,
SearchPipeline,
)
from ..abstractions import R2RPipelines, R2RPipes, R2RProviders
from .config import R2RConfig
logger = logging.getLogger(__name__)
class R2RProviderFactory:
def __init__(self, config: R2RConfig):
self.config = config
def create_vector_db_provider(
self, vector_db_config: VectorDBConfig, *args, **kwargs
) -> VectorDBProvider:
vector_db_provider: Optional[VectorDBProvider] = None
if vector_db_config.provider == "pgvector":
from r2r.providers.vector_dbs import PGVectorDB
vector_db_provider = PGVectorDB(vector_db_config)
else:
raise ValueError(
f"Vector database provider {vector_db_config.provider} not supported"
)
if not vector_db_provider:
raise ValueError("Vector database provider not found")
if not self.config.embedding.base_dimension:
raise ValueError("Search dimension not found in embedding config")
vector_db_provider.initialize_collection(
self.config.embedding.base_dimension
)
return vector_db_provider
def create_embedding_provider(
self, embedding: EmbeddingConfig, *args, **kwargs
) -> EmbeddingProvider:
embedding_provider: Optional[EmbeddingProvider] = None
if embedding.provider == "openai":
if not os.getenv("OPENAI_API_KEY"):
raise ValueError(
"Must set OPENAI_API_KEY in order to initialize OpenAIEmbeddingProvider."
)
from r2r.providers.embeddings import OpenAIEmbeddingProvider
embedding_provider = OpenAIEmbeddingProvider(embedding)
elif embedding.provider == "ollama":
from r2r.providers.embeddings import OllamaEmbeddingProvider
embedding_provider = OllamaEmbeddingProvider(embedding)
elif embedding.provider == "sentence-transformers":
from r2r.providers.embeddings import (
SentenceTransformerEmbeddingProvider,
)
embedding_provider = SentenceTransformerEmbeddingProvider(
embedding
)
elif embedding is None:
embedding_provider = None
else:
raise ValueError(
f"Embedding provider {embedding.provider} not supported"
)
return embedding_provider
def create_eval_provider(
self, eval_config, prompt_provider, *args, **kwargs
) -> Optional[EvalProvider]:
if eval_config.provider == "local":
from r2r.providers.eval import LLMEvalProvider
llm_provider = self.create_llm_provider(eval_config.llm)
eval_provider = LLMEvalProvider(
eval_config,
llm_provider=llm_provider,
prompt_provider=prompt_provider,
)
elif eval_config.provider is None:
eval_provider = None
else:
raise ValueError(
f"Eval provider {eval_config.provider} not supported."
)
return eval_provider
def create_llm_provider(
self, llm_config: LLMConfig, *args, **kwargs
) -> LLMProvider:
llm_provider: Optional[LLMProvider] = None
if llm_config.provider == "openai":
from r2r.providers.llms import OpenAILLM
llm_provider = OpenAILLM(llm_config)
elif llm_config.provider == "litellm":
from r2r.providers.llms import LiteLLM
llm_provider = LiteLLM(llm_config)
else:
raise ValueError(
f"Language model provider {llm_config.provider} not supported"
)
if not llm_provider:
raise ValueError("Language model provider not found")
return llm_provider
def create_prompt_provider(
self, prompt_config, *args, **kwargs
) -> PromptProvider:
prompt_provider = None
if prompt_config.provider == "local":
from r2r.prompts import R2RPromptProvider
prompt_provider = R2RPromptProvider()
else:
raise ValueError(
f"Prompt provider {prompt_config.provider} not supported"
)
return prompt_provider
def create_kg_provider(self, kg_config, *args, **kwargs):
if kg_config.provider == "neo4j":
from r2r.providers.kg import Neo4jKGProvider
return Neo4jKGProvider(kg_config)
elif kg_config.provider is None:
return None
else:
raise ValueError(
f"KG provider {kg_config.provider} not supported."
)
def create_providers(
self,
vector_db_provider_override: Optional[VectorDBProvider] = None,
embedding_provider_override: Optional[EmbeddingProvider] = None,
eval_provider_override: Optional[EvalProvider] = None,
llm_provider_override: Optional[LLMProvider] = None,
prompt_provider_override: Optional[PromptProvider] = None,
kg_provider_override: Optional[KGProvider] = None,
*args,
**kwargs,
) -> R2RProviders:
prompt_provider = (
prompt_provider_override
or self.create_prompt_provider(self.config.prompt, *args, **kwargs)
)
return R2RProviders(
vector_db=vector_db_provider_override
or self.create_vector_db_provider(
self.config.vector_database, *args, **kwargs
),
embedding=embedding_provider_override
or self.create_embedding_provider(
self.config.embedding, *args, **kwargs
),
eval=eval_provider_override
or self.create_eval_provider(
self.config.eval,
prompt_provider=prompt_provider,
*args,
**kwargs,
),
llm=llm_provider_override
or self.create_llm_provider(
self.config.completions, *args, **kwargs
),
prompt=prompt_provider_override
or self.create_prompt_provider(
self.config.prompt, *args, **kwargs
),
kg=kg_provider_override
or self.create_kg_provider(self.config.kg, *args, **kwargs),
)
class R2RPipeFactory:
def __init__(self, config: R2RConfig, providers: R2RProviders):
self.config = config
self.providers = providers
def create_pipes(
self,
parsing_pipe_override: Optional[AsyncPipe] = None,
embedding_pipe_override: Optional[AsyncPipe] = None,
kg_pipe_override: Optional[AsyncPipe] = None,
kg_storage_pipe_override: Optional[AsyncPipe] = None,
kg_agent_pipe_override: Optional[AsyncPipe] = None,
vector_storage_pipe_override: Optional[AsyncPipe] = None,
vector_search_pipe_override: Optional[AsyncPipe] = None,
rag_pipe_override: Optional[AsyncPipe] = None,
streaming_rag_pipe_override: Optional[AsyncPipe] = None,
eval_pipe_override: Optional[AsyncPipe] = None,
*args,
**kwargs,
) -> R2RPipes:
return R2RPipes(
parsing_pipe=parsing_pipe_override
or self.create_parsing_pipe(
self.config.ingestion.get("excluded_parsers"), *args, **kwargs
),
embedding_pipe=embedding_pipe_override
or self.create_embedding_pipe(*args, **kwargs),
kg_pipe=kg_pipe_override or self.create_kg_pipe(*args, **kwargs),
kg_storage_pipe=kg_storage_pipe_override
or self.create_kg_storage_pipe(*args, **kwargs),
kg_agent_search_pipe=kg_agent_pipe_override
or self.create_kg_agent_pipe(*args, **kwargs),
vector_storage_pipe=vector_storage_pipe_override
or self.create_vector_storage_pipe(*args, **kwargs),
vector_search_pipe=vector_search_pipe_override
or self.create_vector_search_pipe(*args, **kwargs),
rag_pipe=rag_pipe_override
or self.create_rag_pipe(*args, **kwargs),
streaming_rag_pipe=streaming_rag_pipe_override
or self.create_rag_pipe(stream=True, *args, **kwargs),
eval_pipe=eval_pipe_override
or self.create_eval_pipe(*args, **kwargs),
)
def create_parsing_pipe(
self, excluded_parsers: Optional[list] = None, *args, **kwargs
) -> Any:
from r2r.pipes import ParsingPipe
return ParsingPipe(excluded_parsers=excluded_parsers or [])
def create_embedding_pipe(self, *args, **kwargs) -> Any:
if self.config.embedding.provider is None:
return None
from r2r.base import RecursiveCharacterTextSplitter
from r2r.pipes import EmbeddingPipe
text_splitter_config = self.config.embedding.extra_fields.get(
"text_splitter"
)
if not text_splitter_config:
raise ValueError(
"Text splitter config not found in embedding config"
)
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=text_splitter_config["chunk_size"],
chunk_overlap=text_splitter_config["chunk_overlap"],
length_function=len,
is_separator_regex=False,
)
return EmbeddingPipe(
embedding_provider=self.providers.embedding,
vector_db_provider=self.providers.vector_db,
text_splitter=text_splitter,
embedding_batch_size=self.config.embedding.batch_size,
)
def create_vector_storage_pipe(self, *args, **kwargs) -> Any:
if self.config.embedding.provider is None:
return None
from r2r.pipes import VectorStoragePipe
return VectorStoragePipe(vector_db_provider=self.providers.vector_db)
def create_vector_search_pipe(self, *args, **kwargs) -> Any:
if self.config.embedding.provider is None:
return None
from r2r.pipes import VectorSearchPipe
return VectorSearchPipe(
vector_db_provider=self.providers.vector_db,
embedding_provider=self.providers.embedding,
)
def create_kg_pipe(self, *args, **kwargs) -> Any:
if self.config.kg.provider is None:
return None
from r2r.base import RecursiveCharacterTextSplitter
from r2r.pipes import KGExtractionPipe
text_splitter_config = self.config.kg.extra_fields.get("text_splitter")
if not text_splitter_config:
raise ValueError("Text splitter config not found in kg config.")
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=text_splitter_config["chunk_size"],
chunk_overlap=text_splitter_config["chunk_overlap"],
length_function=len,
is_separator_regex=False,
)
return KGExtractionPipe(
kg_provider=self.providers.kg,
llm_provider=self.providers.llm,
prompt_provider=self.providers.prompt,
vector_db_provider=self.providers.vector_db,
text_splitter=text_splitter,
kg_batch_size=self.config.kg.batch_size,
)
def create_kg_storage_pipe(self, *args, **kwargs) -> Any:
if self.config.kg.provider is None:
return None
from r2r.pipes import KGStoragePipe
return KGStoragePipe(
kg_provider=self.providers.kg,
embedding_provider=self.providers.embedding,
)
def create_kg_agent_pipe(self, *args, **kwargs) -> Any:
if self.config.kg.provider is None:
return None
from r2r.pipes import KGAgentSearchPipe
return KGAgentSearchPipe(
kg_provider=self.providers.kg,
llm_provider=self.providers.llm,
prompt_provider=self.providers.prompt,
)
def create_rag_pipe(self, stream: bool = False, *args, **kwargs) -> Any:
if stream:
from r2r.pipes import StreamingSearchRAGPipe
return StreamingSearchRAGPipe(
llm_provider=self.providers.llm,
prompt_provider=self.providers.prompt,
)
else:
from r2r.pipes import SearchRAGPipe
return SearchRAGPipe(
llm_provider=self.providers.llm,
prompt_provider=self.providers.prompt,
)
def create_eval_pipe(self, *args, **kwargs) -> Any:
from r2r.pipes import EvalPipe
return EvalPipe(eval_provider=self.providers.eval)
class R2RPipelineFactory:
def __init__(self, config: R2RConfig, pipes: R2RPipes):
self.config = config
self.pipes = pipes
def create_ingestion_pipeline(self, *args, **kwargs) -> IngestionPipeline:
"""factory method to create an ingestion pipeline."""
ingestion_pipeline = IngestionPipeline()
ingestion_pipeline.add_pipe(
pipe=self.pipes.parsing_pipe, parsing_pipe=True
)
# Add embedding pipes if provider is set
if self.config.embedding.provider is not None:
ingestion_pipeline.add_pipe(
self.pipes.embedding_pipe, embedding_pipe=True
)
ingestion_pipeline.add_pipe(
self.pipes.vector_storage_pipe, embedding_pipe=True
)
# Add KG pipes if provider is set
if self.config.kg.provider is not None:
ingestion_pipeline.add_pipe(self.pipes.kg_pipe, kg_pipe=True)
ingestion_pipeline.add_pipe(
self.pipes.kg_storage_pipe, kg_pipe=True
)
return ingestion_pipeline
def create_search_pipeline(self, *args, **kwargs) -> SearchPipeline:
"""factory method to create an ingestion pipeline."""
search_pipeline = SearchPipeline()
# Add vector search pipes if embedding provider and vector provider is set
if (
self.config.embedding.provider is not None
and self.config.vector_database.provider is not None
):
search_pipeline.add_pipe(
self.pipes.vector_search_pipe, vector_search_pipe=True
)
# Add KG pipes if provider is set
if self.config.kg.provider is not None:
search_pipeline.add_pipe(
self.pipes.kg_agent_search_pipe, kg_pipe=True
)
return search_pipeline
def create_rag_pipeline(
self,
search_pipeline: SearchPipeline,
stream: bool = False,
*args,
**kwargs,
) -> RAGPipeline:
rag_pipe = (
self.pipes.streaming_rag_pipe if stream else self.pipes.rag_pipe
)
rag_pipeline = RAGPipeline()
rag_pipeline.set_search_pipeline(search_pipeline)
rag_pipeline.add_pipe(rag_pipe)
return rag_pipeline
def create_eval_pipeline(self, *args, **kwargs) -> EvalPipeline:
eval_pipeline = EvalPipeline()
eval_pipeline.add_pipe(self.pipes.eval_pipe)
return eval_pipeline
def create_pipelines(
self,
ingestion_pipeline: Optional[IngestionPipeline] = None,
search_pipeline: Optional[SearchPipeline] = None,
rag_pipeline: Optional[RAGPipeline] = None,
streaming_rag_pipeline: Optional[RAGPipeline] = None,
eval_pipeline: Optional[EvalPipeline] = None,
*args,
**kwargs,
) -> R2RPipelines:
try:
self.configure_logging()
except Exception as e:
logger.warn(f"Error configuring logging: {e}")
search_pipeline = search_pipeline or self.create_search_pipeline(
*args, **kwargs
)
return R2RPipelines(
ingestion_pipeline=ingestion_pipeline
or self.create_ingestion_pipeline(*args, **kwargs),
search_pipeline=search_pipeline,
rag_pipeline=rag_pipeline
or self.create_rag_pipeline(
search_pipeline=search_pipeline,
stream=False,
*args,
**kwargs,
),
streaming_rag_pipeline=streaming_rag_pipeline
or self.create_rag_pipeline(
search_pipeline=search_pipeline,
stream=True,
*args,
**kwargs,
),
eval_pipeline=eval_pipeline
or self.create_eval_pipeline(*args, **kwargs),
)
def configure_logging(self):
KVLoggingSingleton.configure(self.config.logging)