diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /R2R/r2r/pipelines | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to 'R2R/r2r/pipelines')
-rwxr-xr-x | R2R/r2r/pipelines/__init__.py | 11 | ||||
-rwxr-xr-x | R2R/r2r/pipelines/eval_pipeline.py | 37 | ||||
-rwxr-xr-x | R2R/r2r/pipelines/ingestion_pipeline.py | 144 | ||||
-rwxr-xr-x | R2R/r2r/pipelines/rag_pipeline.py | 115 | ||||
-rwxr-xr-x | R2R/r2r/pipelines/search_pipeline.py | 140 |
5 files changed, 447 insertions, 0 deletions
diff --git a/R2R/r2r/pipelines/__init__.py b/R2R/r2r/pipelines/__init__.py new file mode 100755 index 00000000..ebe3a0c3 --- /dev/null +++ b/R2R/r2r/pipelines/__init__.py @@ -0,0 +1,11 @@ +from .eval_pipeline import EvalPipeline +from .ingestion_pipeline import IngestionPipeline +from .rag_pipeline import RAGPipeline +from .search_pipeline import SearchPipeline + +__all__ = [ + "IngestionPipeline", + "SearchPipeline", + "RAGPipeline", + "EvalPipeline", +] diff --git a/R2R/r2r/pipelines/eval_pipeline.py b/R2R/r2r/pipelines/eval_pipeline.py new file mode 100755 index 00000000..60aa50d4 --- /dev/null +++ b/R2R/r2r/pipelines/eval_pipeline.py @@ -0,0 +1,37 @@ +import logging +from typing import Any, Optional + +from r2r.base.logging.run_manager import RunManager +from r2r.base.pipeline.base_pipeline import AsyncPipeline +from r2r.base.pipes.base_pipe import AsyncPipe, AsyncState + +logger = logging.getLogger(__name__) + + +class EvalPipeline(AsyncPipeline): + """A pipeline for evaluation.""" + + pipeline_type: str = "eval" + + async def run( + self, + input: Any, + state: Optional[AsyncState] = None, + stream: bool = False, + run_manager: Optional[RunManager] = None, + *args: Any, + **kwargs: Any, + ): + return await super().run( + input, state, stream, run_manager, *args, **kwargs + ) + + def add_pipe( + self, + pipe: AsyncPipe, + add_upstream_outputs: Optional[list[dict[str, str]]] = None, + *args, + **kwargs, + ) -> None: + logger.debug(f"Adding pipe {pipe.config.name} to the EvalPipeline") + return super().add_pipe(pipe, add_upstream_outputs, *args, **kwargs) diff --git a/R2R/r2r/pipelines/ingestion_pipeline.py b/R2R/r2r/pipelines/ingestion_pipeline.py new file mode 100755 index 00000000..df1263f9 --- /dev/null +++ b/R2R/r2r/pipelines/ingestion_pipeline.py @@ -0,0 +1,144 @@ +import asyncio +import logging +from asyncio import Queue +from typing import Any, Optional + +from r2r.base.logging.kv_logger import KVLoggingSingleton +from r2r.base.logging.run_manager import RunManager, manage_run +from r2r.base.pipeline.base_pipeline import AsyncPipeline, dequeue_requests +from r2r.base.pipes.base_pipe import AsyncPipe, AsyncState + +logger = logging.getLogger(__name__) + + +class IngestionPipeline(AsyncPipeline): + """A pipeline for ingestion.""" + + pipeline_type: str = "ingestion" + + def __init__( + self, + pipe_logger: Optional[KVLoggingSingleton] = None, + run_manager: Optional[RunManager] = None, + ): + super().__init__(pipe_logger, run_manager) + self.parsing_pipe = None + self.embedding_pipeline = None + self.kg_pipeline = None + + async def run( + self, + input: Any, + state: Optional[AsyncState] = None, + stream: bool = False, + run_manager: Optional[RunManager] = None, + log_run_info: bool = True, + *args: Any, + **kwargs: Any, + ) -> None: + self.state = state or AsyncState() + async with manage_run(run_manager, self.pipeline_type): + if log_run_info: + await run_manager.log_run_info( + key="pipeline_type", + value=self.pipeline_type, + is_info_log=True, + ) + if self.parsing_pipe is None: + raise ValueError( + "parsing_pipeline must be set before running the ingestion pipeline" + ) + if self.embedding_pipeline is None and self.kg_pipeline is None: + raise ValueError( + "At least one of embedding_pipeline or kg_pipeline must be set before running the ingestion pipeline" + ) + # Use queues to duplicate the documents for each pipeline + embedding_queue = Queue() + kg_queue = Queue() + + async def enqueue_documents(): + async for document in await self.parsing_pipe.run( + self.parsing_pipe.Input(message=input), + state, + run_manager, + *args, + **kwargs, + ): + if self.embedding_pipeline: + await embedding_queue.put(document) + if self.kg_pipeline: + await kg_queue.put(document) + await embedding_queue.put(None) + await kg_queue.put(None) + + # Start the document enqueuing process + enqueue_task = asyncio.create_task(enqueue_documents()) + + # Start the embedding and KG pipelines in parallel + if self.embedding_pipeline: + embedding_task = asyncio.create_task( + self.embedding_pipeline.run( + dequeue_requests(embedding_queue), + state, + stream, + run_manager, + log_run_info=False, # Do not log run info since we have already done so + *args, + **kwargs, + ) + ) + + if self.kg_pipeline: + kg_task = asyncio.create_task( + self.kg_pipeline.run( + dequeue_requests(kg_queue), + state, + stream, + run_manager, + log_run_info=False, # Do not log run info since we have already done so + *args, + **kwargs, + ) + ) + + # Wait for the enqueueing task to complete + await enqueue_task + + results = {} + # Wait for the embedding and KG tasks to complete + if self.embedding_pipeline: + results["embedding_pipeline_output"] = await embedding_task + if self.kg_pipeline: + results["kg_pipeline_output"] = await kg_task + return results + + def add_pipe( + self, + pipe: AsyncPipe, + add_upstream_outputs: Optional[list[dict[str, str]]] = None, + parsing_pipe: bool = False, + kg_pipe: bool = False, + embedding_pipe: bool = False, + *args, + **kwargs, + ) -> None: + logger.debug( + f"Adding pipe {pipe.config.name} to the IngestionPipeline" + ) + + if parsing_pipe: + self.parsing_pipe = pipe + elif kg_pipe: + if not self.kg_pipeline: + self.kg_pipeline = AsyncPipeline() + self.kg_pipeline.add_pipe( + pipe, add_upstream_outputs, *args, **kwargs + ) + elif embedding_pipe: + if not self.embedding_pipeline: + self.embedding_pipeline = AsyncPipeline() + self.embedding_pipeline.add_pipe( + pipe, add_upstream_outputs, *args, **kwargs + ) + else: + raise ValueError("Pipe must be a parsing, embedding, or KG pipe") diff --git a/R2R/r2r/pipelines/rag_pipeline.py b/R2R/r2r/pipelines/rag_pipeline.py new file mode 100755 index 00000000..b257ccaa --- /dev/null +++ b/R2R/r2r/pipelines/rag_pipeline.py @@ -0,0 +1,115 @@ +import asyncio +import logging +from typing import Any, Optional + +from ..base.abstractions.llm import GenerationConfig +from ..base.abstractions.search import KGSearchSettings, VectorSearchSettings +from ..base.logging.kv_logger import KVLoggingSingleton +from ..base.logging.run_manager import RunManager, manage_run +from ..base.pipeline.base_pipeline import AsyncPipeline +from ..base.pipes.base_pipe import AsyncPipe, AsyncState +from ..base.utils import to_async_generator + +logger = logging.getLogger(__name__) + + +class RAGPipeline(AsyncPipeline): + """A pipeline for RAG.""" + + pipeline_type: str = "rag" + + def __init__( + self, + pipe_logger: Optional[KVLoggingSingleton] = None, + run_manager: Optional[RunManager] = None, + ): + super().__init__(pipe_logger, run_manager) + self._search_pipeline = None + self._rag_pipeline = None + + async def run( + self, + input: Any, + state: Optional[AsyncState] = None, + run_manager: Optional[RunManager] = None, + log_run_info=True, + vector_search_settings: VectorSearchSettings = VectorSearchSettings(), + kg_search_settings: KGSearchSettings = KGSearchSettings(), + rag_generation_config: GenerationConfig = GenerationConfig(), + *args: Any, + **kwargs: Any, + ): + self.state = state or AsyncState() + async with manage_run(run_manager, self.pipeline_type): + if log_run_info: + await run_manager.log_run_info( + key="pipeline_type", + value=self.pipeline_type, + is_info_log=True, + ) + + if not self._search_pipeline: + raise ValueError( + "_search_pipeline must be set before running the RAG pipeline" + ) + + async def multi_query_generator(input): + tasks = [] + async for query in input: + task = asyncio.create_task( + self._search_pipeline.run( + to_async_generator([query]), + state=state, + stream=False, # do not stream the search results + run_manager=run_manager, + log_run_info=False, # do not log the run info as it is already logged above + vector_search_settings=vector_search_settings, + kg_search_settings=kg_search_settings, + *args, + **kwargs, + ) + ) + tasks.append((query, task)) + + for query, task in tasks: + yield (query, await task) + + rag_results = await self._rag_pipeline.run( + input=multi_query_generator(input), + state=state, + stream=rag_generation_config.stream, + run_manager=run_manager, + log_run_info=False, + rag_generation_config=rag_generation_config, + *args, + **kwargs, + ) + return rag_results + + def add_pipe( + self, + pipe: AsyncPipe, + add_upstream_outputs: Optional[list[dict[str, str]]] = None, + rag_pipe: bool = True, + *args, + **kwargs, + ) -> None: + logger.debug(f"Adding pipe {pipe.config.name} to the RAGPipeline") + if not rag_pipe: + raise ValueError( + "Only pipes that are part of the RAG pipeline can be added to the RAG pipeline" + ) + if not self._rag_pipeline: + self._rag_pipeline = AsyncPipeline() + self._rag_pipeline.add_pipe( + pipe, add_upstream_outputs, *args, **kwargs + ) + + def set_search_pipeline( + self, + _search_pipeline: AsyncPipeline, + *args, + **kwargs, + ) -> None: + logger.debug(f"Setting search pipeline for the RAGPipeline") + self._search_pipeline = _search_pipeline diff --git a/R2R/r2r/pipelines/search_pipeline.py b/R2R/r2r/pipelines/search_pipeline.py new file mode 100755 index 00000000..25e0c7bb --- /dev/null +++ b/R2R/r2r/pipelines/search_pipeline.py @@ -0,0 +1,140 @@ +import asyncio +import logging +from asyncio import Queue +from typing import Any, Optional + +from ..base.abstractions.search import ( + AggregateSearchResult, + KGSearchSettings, + VectorSearchSettings, +) +from ..base.logging.kv_logger import KVLoggingSingleton +from ..base.logging.run_manager import RunManager, manage_run +from ..base.pipeline.base_pipeline import AsyncPipeline, dequeue_requests +from ..base.pipes.base_pipe import AsyncPipe, AsyncState + +logger = logging.getLogger(__name__) + + +class SearchPipeline(AsyncPipeline): + """A pipeline for search.""" + + pipeline_type: str = "search" + + def __init__( + self, + pipe_logger: Optional[KVLoggingSingleton] = None, + run_manager: Optional[RunManager] = None, + ): + super().__init__(pipe_logger, run_manager) + self._parsing_pipe = None + self._vector_search_pipeline = None + self._kg_search_pipeline = None + + async def run( + self, + input: Any, + state: Optional[AsyncState] = None, + stream: bool = False, + run_manager: Optional[RunManager] = None, + log_run_info: bool = True, + vector_search_settings: VectorSearchSettings = VectorSearchSettings(), + kg_search_settings: KGSearchSettings = KGSearchSettings(), + *args: Any, + **kwargs: Any, + ): + self.state = state or AsyncState() + do_vector_search = ( + self._vector_search_pipeline is not None + and vector_search_settings.use_vector_search + ) + do_kg = ( + self._kg_search_pipeline is not None + and kg_search_settings.use_kg_search + ) + async with manage_run(run_manager, self.pipeline_type): + if log_run_info: + await run_manager.log_run_info( + key="pipeline_type", + value=self.pipeline_type, + is_info_log=True, + ) + + vector_search_queue = Queue() + kg_queue = Queue() + + async def enqueue_requests(): + async for message in input: + if do_vector_search: + await vector_search_queue.put(message) + if do_kg: + await kg_queue.put(message) + + await vector_search_queue.put(None) + await kg_queue.put(None) + + # Start the document enqueuing process + enqueue_task = asyncio.create_task(enqueue_requests()) + + # Start the embedding and KG pipelines in parallel + if do_vector_search: + vector_search_task = asyncio.create_task( + self._vector_search_pipeline.run( + dequeue_requests(vector_search_queue), + state, + stream, + run_manager, + log_run_info=False, + vector_search_settings=vector_search_settings, + ) + ) + + if do_kg: + kg_task = asyncio.create_task( + self._kg_search_pipeline.run( + dequeue_requests(kg_queue), + state, + stream, + run_manager, + log_run_info=False, + kg_search_settings=kg_search_settings, + ) + ) + + await enqueue_task + + vector_search_results = ( + await vector_search_task if do_vector_search else None + ) + kg_results = await kg_task if do_kg else None + + return AggregateSearchResult( + vector_search_results=vector_search_results, + kg_search_results=kg_results, + ) + + def add_pipe( + self, + pipe: AsyncPipe, + add_upstream_outputs: Optional[list[dict[str, str]]] = None, + kg_pipe: bool = False, + vector_search_pipe: bool = False, + *args, + **kwargs, + ) -> None: + logger.debug(f"Adding pipe {pipe.config.name} to the SearchPipeline") + + if kg_pipe: + if not self._kg_search_pipeline: + self._kg_search_pipeline = AsyncPipeline() + self._kg_search_pipeline.add_pipe( + pipe, add_upstream_outputs, *args, **kwargs + ) + elif vector_search_pipe: + if not self._vector_search_pipeline: + self._vector_search_pipeline = AsyncPipeline() + self._vector_search_pipeline.add_pipe( + pipe, add_upstream_outputs, *args, **kwargs + ) + else: + raise ValueError("Pipe must be a vector search or KG pipe") |