diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /R2R/r2r/pipelines/ingestion_pipeline.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz |
Diffstat (limited to 'R2R/r2r/pipelines/ingestion_pipeline.py')
-rwxr-xr-x | R2R/r2r/pipelines/ingestion_pipeline.py | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/R2R/r2r/pipelines/ingestion_pipeline.py b/R2R/r2r/pipelines/ingestion_pipeline.py new file mode 100755 index 00000000..df1263f9 --- /dev/null +++ b/R2R/r2r/pipelines/ingestion_pipeline.py @@ -0,0 +1,144 @@ +import asyncio +import logging +from asyncio import Queue +from typing import Any, Optional + +from r2r.base.logging.kv_logger import KVLoggingSingleton +from r2r.base.logging.run_manager import RunManager, manage_run +from r2r.base.pipeline.base_pipeline import AsyncPipeline, dequeue_requests +from r2r.base.pipes.base_pipe import AsyncPipe, AsyncState + +logger = logging.getLogger(__name__) + + +class IngestionPipeline(AsyncPipeline): + """A pipeline for ingestion.""" + + pipeline_type: str = "ingestion" + + def __init__( + self, + pipe_logger: Optional[KVLoggingSingleton] = None, + run_manager: Optional[RunManager] = None, + ): + super().__init__(pipe_logger, run_manager) + self.parsing_pipe = None + self.embedding_pipeline = None + self.kg_pipeline = None + + async def run( + self, + input: Any, + state: Optional[AsyncState] = None, + stream: bool = False, + run_manager: Optional[RunManager] = None, + log_run_info: bool = True, + *args: Any, + **kwargs: Any, + ) -> None: + self.state = state or AsyncState() + async with manage_run(run_manager, self.pipeline_type): + if log_run_info: + await run_manager.log_run_info( + key="pipeline_type", + value=self.pipeline_type, + is_info_log=True, + ) + if self.parsing_pipe is None: + raise ValueError( + "parsing_pipeline must be set before running the ingestion pipeline" + ) + if self.embedding_pipeline is None and self.kg_pipeline is None: + raise ValueError( + "At least one of embedding_pipeline or kg_pipeline must be set before running the ingestion pipeline" + ) + # Use queues to duplicate the documents for each pipeline + embedding_queue = Queue() + kg_queue = Queue() + + async def enqueue_documents(): + async for document in await self.parsing_pipe.run( + self.parsing_pipe.Input(message=input), + state, + run_manager, + *args, + **kwargs, + ): + if self.embedding_pipeline: + await embedding_queue.put(document) + if self.kg_pipeline: + await kg_queue.put(document) + await embedding_queue.put(None) + await kg_queue.put(None) + + # Start the document enqueuing process + enqueue_task = asyncio.create_task(enqueue_documents()) + + # Start the embedding and KG pipelines in parallel + if self.embedding_pipeline: + embedding_task = asyncio.create_task( + self.embedding_pipeline.run( + dequeue_requests(embedding_queue), + state, + stream, + run_manager, + log_run_info=False, # Do not log run info since we have already done so + *args, + **kwargs, + ) + ) + + if self.kg_pipeline: + kg_task = asyncio.create_task( + self.kg_pipeline.run( + dequeue_requests(kg_queue), + state, + stream, + run_manager, + log_run_info=False, # Do not log run info since we have already done so + *args, + **kwargs, + ) + ) + + # Wait for the enqueueing task to complete + await enqueue_task + + results = {} + # Wait for the embedding and KG tasks to complete + if self.embedding_pipeline: + results["embedding_pipeline_output"] = await embedding_task + if self.kg_pipeline: + results["kg_pipeline_output"] = await kg_task + return results + + def add_pipe( + self, + pipe: AsyncPipe, + add_upstream_outputs: Optional[list[dict[str, str]]] = None, + parsing_pipe: bool = False, + kg_pipe: bool = False, + embedding_pipe: bool = False, + *args, + **kwargs, + ) -> None: + logger.debug( + f"Adding pipe {pipe.config.name} to the IngestionPipeline" + ) + + if parsing_pipe: + self.parsing_pipe = pipe + elif kg_pipe: + if not self.kg_pipeline: + self.kg_pipeline = AsyncPipeline() + self.kg_pipeline.add_pipe( + pipe, add_upstream_outputs, *args, **kwargs + ) + elif embedding_pipe: + if not self.embedding_pipeline: + self.embedding_pipeline = AsyncPipeline() + self.embedding_pipeline.add_pipe( + pipe, add_upstream_outputs, *args, **kwargs + ) + else: + raise ValueError("Pipe must be a parsing, embedding, or KG pipe") |