about summary refs log tree commit diff
path: root/R2R/r2r/pipelines
diff options
context:
space:
mode:
Diffstat (limited to 'R2R/r2r/pipelines')
-rwxr-xr-xR2R/r2r/pipelines/__init__.py11
-rwxr-xr-xR2R/r2r/pipelines/eval_pipeline.py37
-rwxr-xr-xR2R/r2r/pipelines/ingestion_pipeline.py144
-rwxr-xr-xR2R/r2r/pipelines/rag_pipeline.py115
-rwxr-xr-xR2R/r2r/pipelines/search_pipeline.py140
5 files changed, 447 insertions, 0 deletions
diff --git a/R2R/r2r/pipelines/__init__.py b/R2R/r2r/pipelines/__init__.py
new file mode 100755
index 00000000..ebe3a0c3
--- /dev/null
+++ b/R2R/r2r/pipelines/__init__.py
@@ -0,0 +1,11 @@
+from .eval_pipeline import EvalPipeline
+from .ingestion_pipeline import IngestionPipeline
+from .rag_pipeline import RAGPipeline
+from .search_pipeline import SearchPipeline
+
+__all__ = [
+    "IngestionPipeline",
+    "SearchPipeline",
+    "RAGPipeline",
+    "EvalPipeline",
+]
diff --git a/R2R/r2r/pipelines/eval_pipeline.py b/R2R/r2r/pipelines/eval_pipeline.py
new file mode 100755
index 00000000..60aa50d4
--- /dev/null
+++ b/R2R/r2r/pipelines/eval_pipeline.py
@@ -0,0 +1,37 @@
+import logging
+from typing import Any, Optional
+
+from r2r.base.logging.run_manager import RunManager
+from r2r.base.pipeline.base_pipeline import AsyncPipeline
+from r2r.base.pipes.base_pipe import AsyncPipe, AsyncState
+
+logger = logging.getLogger(__name__)
+
+
+class EvalPipeline(AsyncPipeline):
+    """A pipeline for evaluation."""
+
+    pipeline_type: str = "eval"
+
+    async def run(
+        self,
+        input: Any,
+        state: Optional[AsyncState] = None,
+        stream: bool = False,
+        run_manager: Optional[RunManager] = None,
+        *args: Any,
+        **kwargs: Any,
+    ):
+        return await super().run(
+            input, state, stream, run_manager, *args, **kwargs
+        )
+
+    def add_pipe(
+        self,
+        pipe: AsyncPipe,
+        add_upstream_outputs: Optional[list[dict[str, str]]] = None,
+        *args,
+        **kwargs,
+    ) -> None:
+        logger.debug(f"Adding pipe {pipe.config.name} to the EvalPipeline")
+        return super().add_pipe(pipe, add_upstream_outputs, *args, **kwargs)
diff --git a/R2R/r2r/pipelines/ingestion_pipeline.py b/R2R/r2r/pipelines/ingestion_pipeline.py
new file mode 100755
index 00000000..df1263f9
--- /dev/null
+++ b/R2R/r2r/pipelines/ingestion_pipeline.py
@@ -0,0 +1,144 @@
+import asyncio
+import logging
+from asyncio import Queue
+from typing import Any, Optional
+
+from r2r.base.logging.kv_logger import KVLoggingSingleton
+from r2r.base.logging.run_manager import RunManager, manage_run
+from r2r.base.pipeline.base_pipeline import AsyncPipeline, dequeue_requests
+from r2r.base.pipes.base_pipe import AsyncPipe, AsyncState
+
+logger = logging.getLogger(__name__)
+
+
+class IngestionPipeline(AsyncPipeline):
+    """A pipeline for ingestion."""
+
+    pipeline_type: str = "ingestion"
+
+    def __init__(
+        self,
+        pipe_logger: Optional[KVLoggingSingleton] = None,
+        run_manager: Optional[RunManager] = None,
+    ):
+        super().__init__(pipe_logger, run_manager)
+        self.parsing_pipe = None
+        self.embedding_pipeline = None
+        self.kg_pipeline = None
+
+    async def run(
+        self,
+        input: Any,
+        state: Optional[AsyncState] = None,
+        stream: bool = False,
+        run_manager: Optional[RunManager] = None,
+        log_run_info: bool = True,
+        *args: Any,
+        **kwargs: Any,
+    ) -> None:
+        self.state = state or AsyncState()
+        async with manage_run(run_manager, self.pipeline_type):
+            if log_run_info:
+                await run_manager.log_run_info(
+                    key="pipeline_type",
+                    value=self.pipeline_type,
+                    is_info_log=True,
+                )
+            if self.parsing_pipe is None:
+                raise ValueError(
+                    "parsing_pipeline must be set before running the ingestion pipeline"
+                )
+            if self.embedding_pipeline is None and self.kg_pipeline is None:
+                raise ValueError(
+                    "At least one of embedding_pipeline or kg_pipeline must be set before running the ingestion pipeline"
+                )
+            # Use queues to duplicate the documents for each pipeline
+            embedding_queue = Queue()
+            kg_queue = Queue()
+
+            async def enqueue_documents():
+                async for document in await self.parsing_pipe.run(
+                    self.parsing_pipe.Input(message=input),
+                    state,
+                    run_manager,
+                    *args,
+                    **kwargs,
+                ):
+                    if self.embedding_pipeline:
+                        await embedding_queue.put(document)
+                    if self.kg_pipeline:
+                        await kg_queue.put(document)
+                await embedding_queue.put(None)
+                await kg_queue.put(None)
+
+            # Start the document enqueuing process
+            enqueue_task = asyncio.create_task(enqueue_documents())
+
+            # Start the embedding and KG pipelines in parallel
+            if self.embedding_pipeline:
+                embedding_task = asyncio.create_task(
+                    self.embedding_pipeline.run(
+                        dequeue_requests(embedding_queue),
+                        state,
+                        stream,
+                        run_manager,
+                        log_run_info=False,  # Do not log run info since we have already done so
+                        *args,
+                        **kwargs,
+                    )
+                )
+
+            if self.kg_pipeline:
+                kg_task = asyncio.create_task(
+                    self.kg_pipeline.run(
+                        dequeue_requests(kg_queue),
+                        state,
+                        stream,
+                        run_manager,
+                        log_run_info=False,  # Do not log run info since we have already done so
+                        *args,
+                        **kwargs,
+                    )
+                )
+
+            # Wait for the enqueueing task to complete
+            await enqueue_task
+
+            results = {}
+            # Wait for the embedding and KG tasks to complete
+            if self.embedding_pipeline:
+                results["embedding_pipeline_output"] = await embedding_task
+            if self.kg_pipeline:
+                results["kg_pipeline_output"] = await kg_task
+            return results
+
+    def add_pipe(
+        self,
+        pipe: AsyncPipe,
+        add_upstream_outputs: Optional[list[dict[str, str]]] = None,
+        parsing_pipe: bool = False,
+        kg_pipe: bool = False,
+        embedding_pipe: bool = False,
+        *args,
+        **kwargs,
+    ) -> None:
+        logger.debug(
+            f"Adding pipe {pipe.config.name} to the IngestionPipeline"
+        )
+
+        if parsing_pipe:
+            self.parsing_pipe = pipe
+        elif kg_pipe:
+            if not self.kg_pipeline:
+                self.kg_pipeline = AsyncPipeline()
+            self.kg_pipeline.add_pipe(
+                pipe, add_upstream_outputs, *args, **kwargs
+            )
+        elif embedding_pipe:
+            if not self.embedding_pipeline:
+                self.embedding_pipeline = AsyncPipeline()
+            self.embedding_pipeline.add_pipe(
+                pipe, add_upstream_outputs, *args, **kwargs
+            )
+        else:
+            raise ValueError("Pipe must be a parsing, embedding, or KG pipe")
diff --git a/R2R/r2r/pipelines/rag_pipeline.py b/R2R/r2r/pipelines/rag_pipeline.py
new file mode 100755
index 00000000..b257ccaa
--- /dev/null
+++ b/R2R/r2r/pipelines/rag_pipeline.py
@@ -0,0 +1,115 @@
+import asyncio
+import logging
+from typing import Any, Optional
+
+from ..base.abstractions.llm import GenerationConfig
+from ..base.abstractions.search import KGSearchSettings, VectorSearchSettings
+from ..base.logging.kv_logger import KVLoggingSingleton
+from ..base.logging.run_manager import RunManager, manage_run
+from ..base.pipeline.base_pipeline import AsyncPipeline
+from ..base.pipes.base_pipe import AsyncPipe, AsyncState
+from ..base.utils import to_async_generator
+
+logger = logging.getLogger(__name__)
+
+
+class RAGPipeline(AsyncPipeline):
+    """A pipeline for RAG."""
+
+    pipeline_type: str = "rag"
+
+    def __init__(
+        self,
+        pipe_logger: Optional[KVLoggingSingleton] = None,
+        run_manager: Optional[RunManager] = None,
+    ):
+        super().__init__(pipe_logger, run_manager)
+        self._search_pipeline = None
+        self._rag_pipeline = None
+
+    async def run(
+        self,
+        input: Any,
+        state: Optional[AsyncState] = None,
+        run_manager: Optional[RunManager] = None,
+        log_run_info=True,
+        vector_search_settings: VectorSearchSettings = VectorSearchSettings(),
+        kg_search_settings: KGSearchSettings = KGSearchSettings(),
+        rag_generation_config: GenerationConfig = GenerationConfig(),
+        *args: Any,
+        **kwargs: Any,
+    ):
+        self.state = state or AsyncState()
+        async with manage_run(run_manager, self.pipeline_type):
+            if log_run_info:
+                await run_manager.log_run_info(
+                    key="pipeline_type",
+                    value=self.pipeline_type,
+                    is_info_log=True,
+                )
+
+            if not self._search_pipeline:
+                raise ValueError(
+                    "_search_pipeline must be set before running the RAG pipeline"
+                )
+
+            async def multi_query_generator(input):
+                tasks = []
+                async for query in input:
+                    task = asyncio.create_task(
+                        self._search_pipeline.run(
+                            to_async_generator([query]),
+                            state=state,
+                            stream=False,  # do not stream the search results
+                            run_manager=run_manager,
+                            log_run_info=False,  # do not log the run info as it is already logged above
+                            vector_search_settings=vector_search_settings,
+                            kg_search_settings=kg_search_settings,
+                            *args,
+                            **kwargs,
+                        )
+                    )
+                    tasks.append((query, task))
+
+                for query, task in tasks:
+                    yield (query, await task)
+
+            rag_results = await self._rag_pipeline.run(
+                input=multi_query_generator(input),
+                state=state,
+                stream=rag_generation_config.stream,
+                run_manager=run_manager,
+                log_run_info=False,
+                rag_generation_config=rag_generation_config,
+                *args,
+                **kwargs,
+            )
+            return rag_results
+
+    def add_pipe(
+        self,
+        pipe: AsyncPipe,
+        add_upstream_outputs: Optional[list[dict[str, str]]] = None,
+        rag_pipe: bool = True,
+        *args,
+        **kwargs,
+    ) -> None:
+        logger.debug(f"Adding pipe {pipe.config.name} to the RAGPipeline")
+        if not rag_pipe:
+            raise ValueError(
+                "Only pipes that are part of the RAG pipeline can be added to the RAG pipeline"
+            )
+        if not self._rag_pipeline:
+            self._rag_pipeline = AsyncPipeline()
+        self._rag_pipeline.add_pipe(
+            pipe, add_upstream_outputs, *args, **kwargs
+        )
+
+    def set_search_pipeline(
+        self,
+        _search_pipeline: AsyncPipeline,
+        *args,
+        **kwargs,
+    ) -> None:
+        logger.debug(f"Setting search pipeline for the RAGPipeline")
+        self._search_pipeline = _search_pipeline
diff --git a/R2R/r2r/pipelines/search_pipeline.py b/R2R/r2r/pipelines/search_pipeline.py
new file mode 100755
index 00000000..25e0c7bb
--- /dev/null
+++ b/R2R/r2r/pipelines/search_pipeline.py
@@ -0,0 +1,140 @@
+import asyncio
+import logging
+from asyncio import Queue
+from typing import Any, Optional
+
+from ..base.abstractions.search import (
+    AggregateSearchResult,
+    KGSearchSettings,
+    VectorSearchSettings,
+)
+from ..base.logging.kv_logger import KVLoggingSingleton
+from ..base.logging.run_manager import RunManager, manage_run
+from ..base.pipeline.base_pipeline import AsyncPipeline, dequeue_requests
+from ..base.pipes.base_pipe import AsyncPipe, AsyncState
+
+logger = logging.getLogger(__name__)
+
+
+class SearchPipeline(AsyncPipeline):
+    """A pipeline for search."""
+
+    pipeline_type: str = "search"
+
+    def __init__(
+        self,
+        pipe_logger: Optional[KVLoggingSingleton] = None,
+        run_manager: Optional[RunManager] = None,
+    ):
+        super().__init__(pipe_logger, run_manager)
+        self._parsing_pipe = None
+        self._vector_search_pipeline = None
+        self._kg_search_pipeline = None
+
+    async def run(
+        self,
+        input: Any,
+        state: Optional[AsyncState] = None,
+        stream: bool = False,
+        run_manager: Optional[RunManager] = None,
+        log_run_info: bool = True,
+        vector_search_settings: VectorSearchSettings = VectorSearchSettings(),
+        kg_search_settings: KGSearchSettings = KGSearchSettings(),
+        *args: Any,
+        **kwargs: Any,
+    ):
+        self.state = state or AsyncState()
+        do_vector_search = (
+            self._vector_search_pipeline is not None
+            and vector_search_settings.use_vector_search
+        )
+        do_kg = (
+            self._kg_search_pipeline is not None
+            and kg_search_settings.use_kg_search
+        )
+        async with manage_run(run_manager, self.pipeline_type):
+            if log_run_info:
+                await run_manager.log_run_info(
+                    key="pipeline_type",
+                    value=self.pipeline_type,
+                    is_info_log=True,
+                )
+
+            vector_search_queue = Queue()
+            kg_queue = Queue()
+
+            async def enqueue_requests():
+                async for message in input:
+                    if do_vector_search:
+                        await vector_search_queue.put(message)
+                    if do_kg:
+                        await kg_queue.put(message)
+
+                await vector_search_queue.put(None)
+                await kg_queue.put(None)
+
+            # Start the document enqueuing process
+            enqueue_task = asyncio.create_task(enqueue_requests())
+
+            # Start the embedding and KG pipelines in parallel
+            if do_vector_search:
+                vector_search_task = asyncio.create_task(
+                    self._vector_search_pipeline.run(
+                        dequeue_requests(vector_search_queue),
+                        state,
+                        stream,
+                        run_manager,
+                        log_run_info=False,
+                        vector_search_settings=vector_search_settings,
+                    )
+                )
+
+            if do_kg:
+                kg_task = asyncio.create_task(
+                    self._kg_search_pipeline.run(
+                        dequeue_requests(kg_queue),
+                        state,
+                        stream,
+                        run_manager,
+                        log_run_info=False,
+                        kg_search_settings=kg_search_settings,
+                    )
+                )
+
+        await enqueue_task
+
+        vector_search_results = (
+            await vector_search_task if do_vector_search else None
+        )
+        kg_results = await kg_task if do_kg else None
+
+        return AggregateSearchResult(
+            vector_search_results=vector_search_results,
+            kg_search_results=kg_results,
+        )
+
+    def add_pipe(
+        self,
+        pipe: AsyncPipe,
+        add_upstream_outputs: Optional[list[dict[str, str]]] = None,
+        kg_pipe: bool = False,
+        vector_search_pipe: bool = False,
+        *args,
+        **kwargs,
+    ) -> None:
+        logger.debug(f"Adding pipe {pipe.config.name} to the SearchPipeline")
+
+        if kg_pipe:
+            if not self._kg_search_pipeline:
+                self._kg_search_pipeline = AsyncPipeline()
+            self._kg_search_pipeline.add_pipe(
+                pipe, add_upstream_outputs, *args, **kwargs
+            )
+        elif vector_search_pipe:
+            if not self._vector_search_pipeline:
+                self._vector_search_pipeline = AsyncPipeline()
+            self._vector_search_pipeline.add_pipe(
+                pipe, add_upstream_outputs, *args, **kwargs
+            )
+        else:
+            raise ValueError("Pipe must be a vector search or KG pipe")