aboutsummaryrefslogtreecommitdiff
path: root/R2R/r2r/pipelines/ingestion_pipeline.py
diff options
context:
space:
mode:
Diffstat (limited to 'R2R/r2r/pipelines/ingestion_pipeline.py')
-rwxr-xr-xR2R/r2r/pipelines/ingestion_pipeline.py144
1 files changed, 144 insertions, 0 deletions
diff --git a/R2R/r2r/pipelines/ingestion_pipeline.py b/R2R/r2r/pipelines/ingestion_pipeline.py
new file mode 100755
index 00000000..df1263f9
--- /dev/null
+++ b/R2R/r2r/pipelines/ingestion_pipeline.py
@@ -0,0 +1,144 @@
+import asyncio
+import logging
+from asyncio import Queue
+from typing import Any, Optional
+
+from r2r.base.logging.kv_logger import KVLoggingSingleton
+from r2r.base.logging.run_manager import RunManager, manage_run
+from r2r.base.pipeline.base_pipeline import AsyncPipeline, dequeue_requests
+from r2r.base.pipes.base_pipe import AsyncPipe, AsyncState
+
+logger = logging.getLogger(__name__)
+
+
+class IngestionPipeline(AsyncPipeline):
+ """A pipeline for ingestion."""
+
+ pipeline_type: str = "ingestion"
+
+ def __init__(
+ self,
+ pipe_logger: Optional[KVLoggingSingleton] = None,
+ run_manager: Optional[RunManager] = None,
+ ):
+ super().__init__(pipe_logger, run_manager)
+ self.parsing_pipe = None
+ self.embedding_pipeline = None
+ self.kg_pipeline = None
+
+ async def run(
+ self,
+ input: Any,
+ state: Optional[AsyncState] = None,
+ stream: bool = False,
+ run_manager: Optional[RunManager] = None,
+ log_run_info: bool = True,
+ *args: Any,
+ **kwargs: Any,
+ ) -> None:
+ self.state = state or AsyncState()
+ async with manage_run(run_manager, self.pipeline_type):
+ if log_run_info:
+ await run_manager.log_run_info(
+ key="pipeline_type",
+ value=self.pipeline_type,
+ is_info_log=True,
+ )
+ if self.parsing_pipe is None:
+ raise ValueError(
+ "parsing_pipeline must be set before running the ingestion pipeline"
+ )
+ if self.embedding_pipeline is None and self.kg_pipeline is None:
+ raise ValueError(
+ "At least one of embedding_pipeline or kg_pipeline must be set before running the ingestion pipeline"
+ )
+ # Use queues to duplicate the documents for each pipeline
+ embedding_queue = Queue()
+ kg_queue = Queue()
+
+ async def enqueue_documents():
+ async for document in await self.parsing_pipe.run(
+ self.parsing_pipe.Input(message=input),
+ state,
+ run_manager,
+ *args,
+ **kwargs,
+ ):
+ if self.embedding_pipeline:
+ await embedding_queue.put(document)
+ if self.kg_pipeline:
+ await kg_queue.put(document)
+ await embedding_queue.put(None)
+ await kg_queue.put(None)
+
+ # Start the document enqueuing process
+ enqueue_task = asyncio.create_task(enqueue_documents())
+
+ # Start the embedding and KG pipelines in parallel
+ if self.embedding_pipeline:
+ embedding_task = asyncio.create_task(
+ self.embedding_pipeline.run(
+ dequeue_requests(embedding_queue),
+ state,
+ stream,
+ run_manager,
+ log_run_info=False, # Do not log run info since we have already done so
+ *args,
+ **kwargs,
+ )
+ )
+
+ if self.kg_pipeline:
+ kg_task = asyncio.create_task(
+ self.kg_pipeline.run(
+ dequeue_requests(kg_queue),
+ state,
+ stream,
+ run_manager,
+ log_run_info=False, # Do not log run info since we have already done so
+ *args,
+ **kwargs,
+ )
+ )
+
+ # Wait for the enqueueing task to complete
+ await enqueue_task
+
+ results = {}
+ # Wait for the embedding and KG tasks to complete
+ if self.embedding_pipeline:
+ results["embedding_pipeline_output"] = await embedding_task
+ if self.kg_pipeline:
+ results["kg_pipeline_output"] = await kg_task
+ return results
+
+ def add_pipe(
+ self,
+ pipe: AsyncPipe,
+ add_upstream_outputs: Optional[list[dict[str, str]]] = None,
+ parsing_pipe: bool = False,
+ kg_pipe: bool = False,
+ embedding_pipe: bool = False,
+ *args,
+ **kwargs,
+ ) -> None:
+ logger.debug(
+ f"Adding pipe {pipe.config.name} to the IngestionPipeline"
+ )
+
+ if parsing_pipe:
+ self.parsing_pipe = pipe
+ elif kg_pipe:
+ if not self.kg_pipeline:
+ self.kg_pipeline = AsyncPipeline()
+ self.kg_pipeline.add_pipe(
+ pipe, add_upstream_outputs, *args, **kwargs
+ )
+ elif embedding_pipe:
+ if not self.embedding_pipeline:
+ self.embedding_pipeline = AsyncPipeline()
+ self.embedding_pipeline.add_pipe(
+ pipe, add_upstream_outputs, *args, **kwargs
+ )
+ else:
+ raise ValueError("Pipe must be a parsing, embedding, or KG pipe")