from typing import Optional
from r2r.base import KVLoggingSingleton, RunManager
from r2r.base.abstractions.base import AsyncSyncMeta, syncable
from .abstractions import R2RPipelines, R2RProviders
from .assembly.config import R2RConfig
from .services.ingestion_service import IngestionService
from .services.management_service import ManagementService
from .services.retrieval_service import RetrievalService
class R2REngine(metaclass=AsyncSyncMeta):
def __init__(
self,
config: R2RConfig,
providers: R2RProviders,
pipelines: R2RPipelines,
run_manager: Optional[RunManager] = None,
):
logging_connection = KVLoggingSingleton()
run_manager = run_manager or RunManager(logging_connection)
self.config = config
self.providers = providers
self.pipelines = pipelines
self.logging_connection = KVLoggingSingleton()
self.run_manager = run_manager
self.ingestion_service = IngestionService(
config, providers, pipelines, run_manager, logging_connection
)
self.retrieval_service = RetrievalService(
config, providers, pipelines, run_manager, logging_connection
)
self.management_service = ManagementService(
config, providers, pipelines, run_manager, logging_connection
)
# Ingestion routes
@syncable
async def aingest_documents(self, *args, **kwargs):
return await self.ingestion_service.ingest_documents(*args, **kwargs)
@syncable
async def aupdate_documents(self, *args, **kwargs):
return await self.ingestion_service.update_documents(*args, **kwargs)
@syncable
async def aingest_files(self, *args, **kwargs):
return await self.ingestion_service.ingest_files(*args, **kwargs)
@syncable
async def aupdate_files(self, *args, **kwargs):
return await self.ingestion_service.update_files(*args, **kwargs)
# Retrieval routes
@syncable
async def asearch(self, *args, **kwargs):
return await self.retrieval_service.search(*args, **kwargs)
@syncable
async def arag(self, *args, **kwargs):
return await self.retrieval_service.rag(*args, **kwargs)
@syncable
async def aevaluate(self, *args, **kwargs):
return await self.retrieval_service.evaluate(*args, **kwargs)
# Management routes
@syncable
async def aupdate_prompt(self, *args, **kwargs):
return await self.management_service.update_prompt(*args, **kwargs)
@syncable
async def alogs(self, *args, **kwargs):
return await self.management_service.alogs(*args, **kwargs)
@syncable
async def aanalytics(self, *args, **kwargs):
return await self.management_service.aanalytics(*args, **kwargs)
@syncable
async def aapp_settings(self, *args, **kwargs):
return await self.management_service.aapp_settings(*args, **kwargs)
@syncable
async def ausers_overview(self, *args, **kwargs):
return await self.management_service.ausers_overview(*args, **kwargs)
@syncable
async def adelete(self, *args, **kwargs):
return await self.management_service.delete(*args, **kwargs)
@syncable
async def adocuments_overview(self, *args, **kwargs):
return await self.management_service.adocuments_overview(
*args, **kwargs
)
@syncable
async def inspect_knowledge_graph(self, *args, **kwargs):
return await self.management_service.inspect_knowledge_graph(
*args, **kwargs
)
@syncable
async def adocument_chunks(self, *args, **kwargs):
return await self.management_service.document_chunks(*args, **kwargs)