about summary refs log tree commit diff
path: root/R2R/r2r/vecs/adapter
diff options
context:
space:
mode:
Diffstat (limited to 'R2R/r2r/vecs/adapter')
-rwxr-xr-xR2R/r2r/vecs/adapter/__init__.py15
-rwxr-xr-xR2R/r2r/vecs/adapter/base.py111
-rwxr-xr-xR2R/r2r/vecs/adapter/markdown.py88
-rwxr-xr-xR2R/r2r/vecs/adapter/noop.py55
-rwxr-xr-xR2R/r2r/vecs/adapter/text.py151
5 files changed, 420 insertions, 0 deletions
diff --git a/R2R/r2r/vecs/adapter/__init__.py b/R2R/r2r/vecs/adapter/__init__.py
new file mode 100755
index 00000000..9cd9860d
--- /dev/null
+++ b/R2R/r2r/vecs/adapter/__init__.py
@@ -0,0 +1,15 @@
+from .base import Adapter, AdapterContext, AdapterStep
+from .markdown import MarkdownChunker
+from .noop import NoOp
+from .text import ParagraphChunker, TextEmbedding, TextEmbeddingModel
+
+__all__ = [
+    "Adapter",
+    "AdapterContext",
+    "AdapterStep",
+    "NoOp",
+    "ParagraphChunker",
+    "TextEmbedding",
+    "TextEmbeddingModel",
+    "MarkdownChunker",
+]
diff --git a/R2R/r2r/vecs/adapter/base.py b/R2R/r2r/vecs/adapter/base.py
new file mode 100755
index 00000000..7734e802
--- /dev/null
+++ b/R2R/r2r/vecs/adapter/base.py
@@ -0,0 +1,111 @@
+"""
+The `vecs.experimental.adapter.base` module provides abstract classes and utilities
+for creating and handling adapters in vecs. Adapters allow users to interact with
+a collection using media types other than vectors.
+
+All public classes, enums, and functions are re-exported by `vecs.adapters` module.
+"""
+
+from abc import ABC, abstractmethod
+from enum import Enum
+from typing import Any, Dict, Generator, Iterable, Optional, Tuple
+
+from vecs.exc import ArgError
+
+
+class AdapterContext(str, Enum):
+    """
+    An enum representing the different contexts in which a Pipeline
+    will be invoked.
+
+    Attributes:
+        upsert (str): The Collection.upsert method
+        query (str): The Collection.query method
+    """
+
+    upsert = "upsert"
+    query = "query"
+
+
+class AdapterStep(ABC):
+    """
+    Abstract class representing a step in the adapter pipeline.
+
+    Each adapter step should adapt a user media into a tuple of:
+     - id (str)
+     - media (unknown type)
+     - metadata (dict)
+
+    If the user provides id or metadata, default production is overridden.
+    """
+
+    @property
+    def exported_dimension(self) -> Optional[int]:
+        """
+        Property that should be overridden by subclasses to provide the output dimension
+        of the adapter step.
+        """
+        return None
+
+    @abstractmethod
+    def __call__(
+        self,
+        records: Iterable[Tuple[str, Any, Optional[Dict]]],
+        adapter_context: AdapterContext,
+    ) -> Generator[Tuple[str, Any, Dict], None, None]:
+        """
+        Abstract method that should be overridden by subclasses to handle each record.
+        """
+
+
+class Adapter:
+    """
+    Class representing a sequence of AdapterStep instances forming a pipeline.
+    """
+
+    def __init__(self, steps: list[AdapterStep]):
+        """
+        Initialize an Adapter instance with a list of AdapterStep instances.
+
+        Args:
+            steps: list of AdapterStep instances.
+
+        Raises:
+            ArgError: Raised if the steps list is empty.
+        """
+        self.steps = steps
+        if len(steps) < 1:
+            raise ArgError("Adapter must contain at least 1 step")
+
+    @property
+    def exported_dimension(self) -> Optional[int]:
+        """
+        The output dimension of the adapter. Returns the exported dimension of the last
+        AdapterStep that provides one (from end to start of the steps list).
+        """
+        for step in reversed(self.steps):
+            step_dim = step.exported_dimension
+            if step_dim is not None:
+                return step_dim
+        return None
+
+    def __call__(
+        self,
+        records: Iterable[Tuple[str, Any, Optional[Dict]]],
+        adapter_context: AdapterContext,
+    ) -> Generator[Tuple[str, Any, Dict], None, None]:
+        """
+        Invokes the adapter pipeline on an iterable of records.
+
+        Args:
+            records: Iterable of tuples each containing an id, a media and an optional dict.
+            adapter_context: Context of the adapter.
+
+        Yields:
+            Tuples each containing an id, a media and a dict.
+        """
+        pipeline = records
+        for step in self.steps:
+            pipeline = step(pipeline, adapter_context)
+
+        yield from pipeline  # type: ignore
diff --git a/R2R/r2r/vecs/adapter/markdown.py b/R2R/r2r/vecs/adapter/markdown.py
new file mode 100755
index 00000000..149573f4
--- /dev/null
+++ b/R2R/r2r/vecs/adapter/markdown.py
@@ -0,0 +1,88 @@
+import re
+from typing import Any, Dict, Generator, Iterable, Optional, Tuple
+
+from flupy import flu
+
+from .base import AdapterContext, AdapterStep
+
+
+class MarkdownChunker(AdapterStep):
+    """
+    MarkdownChunker is an AdapterStep that splits a markdown string into chunks where a heading signifies the start of a chunk, and yields each chunk as a separate record.
+    """
+
+    def __init__(self, *, skip_during_query: bool):
+        """
+        Initializes the MarkdownChunker adapter.
+
+        Args:
+            skip_during_query (bool): Whether to skip chunking during querying.
+        """
+        self.skip_during_query = skip_during_query
+
+    @staticmethod
+    def split_by_heading(
+        md: str, max_tokens: int
+    ) -> Generator[str, None, None]:
+        regex_split = r"^(#{1,6}\s+.+)$"
+        headings = [
+            match.span()[0]
+            for match in re.finditer(regex_split, md, flags=re.MULTILINE)
+        ]
+
+        if headings == [] or headings[0] != 0:
+            headings.insert(0, 0)
+
+        sections = [md[i:j] for i, j in zip(headings, headings[1:] + [None])]
+
+        for section in sections:
+            chunks = flu(section.split(" ")).chunk(max_tokens)
+
+            is_not_useless_chunk = lambda i: not i in ["", "\n", []]
+
+            joined_chunks = filter(
+                is_not_useless_chunk, [" ".join(chunk) for chunk in chunks]
+            )
+
+            for joined_chunk in joined_chunks:
+                yield joined_chunk
+
+    def __call__(
+        self,
+        records: Iterable[Tuple[str, Any, Optional[Dict]]],
+        adapter_context: AdapterContext,
+        max_tokens: int = 99999999,
+    ) -> Generator[Tuple[str, Any, Dict], None, None]:
+        """
+        Splits each markdown string in the records into chunks where each heading starts a new chunk, and yields each chunk
+        as a separate record. If the `skip_during_query` attribute is set to True,
+        this step is skipped during querying.
+
+        Args:
+            records (Iterable[Tuple[str, Any, Optional[Dict]]]): Iterable of tuples each containing an id, a markdown string and an optional dict.
+            adapter_context (AdapterContext): Context of the adapter.
+            max_tokens (int): The maximum number of tokens per chunk
+
+        Yields:
+            Tuple[str, Any, Dict]: The id appended with chunk index, the chunk, and the metadata.
+        """
+        if max_tokens and max_tokens < 1:
+            raise ValueError("max_tokens must be a nonzero positive integer")
+
+        if (
+            adapter_context == AdapterContext("query")
+            and self.skip_during_query
+        ):
+            for id, markdown, metadata in records:
+                yield (id, markdown, metadata or {})
+        else:
+            for id, markdown, metadata in records:
+                headings = MarkdownChunker.split_by_heading(
+                    markdown, max_tokens
+                )
+                for heading_ix, heading in enumerate(headings):
+                    yield (
+                        f"{id}_head_{str(heading_ix).zfill(3)}",
+                        heading,
+                        metadata or {},
+                    )
diff --git a/R2R/r2r/vecs/adapter/noop.py b/R2R/r2r/vecs/adapter/noop.py
new file mode 100755
index 00000000..b587a552
--- /dev/null
+++ b/R2R/r2r/vecs/adapter/noop.py
@@ -0,0 +1,55 @@
+"""
+The `vecs.experimental.adapter.noop` module provides a default no-op (no operation) adapter
+that passes the inputs through without any modification. This can be useful when no specific
+adapter processing is required.
+
+All public classes, enums, and functions are re-exported by `vecs.adapters` module.
+"""
+
+from typing import Any, Dict, Generator, Iterable, Optional, Tuple
+
+from .base import AdapterContext, AdapterStep
+
+
+class NoOp(AdapterStep):
+    """
+    NoOp is a no-operation AdapterStep. It is a default adapter that passes through
+    the input records without any modifications.
+    """
+
+    def __init__(self, dimension: int):
+        """
+        Initializes the NoOp adapter with a dimension.
+
+        Args:
+            dimension (int): The dimension of the input vectors.
+        """
+        self._dimension = dimension
+
+    @property
+    def exported_dimension(self) -> Optional[int]:
+        """
+        Returns the dimension of the adapter.
+
+        Returns:
+            int: The dimension of the input vectors.
+        """
+        return self._dimension
+
+    def __call__(
+        self,
+        records: Iterable[Tuple[str, Any, Optional[Dict]]],
+        adapter_context: AdapterContext,
+    ) -> Generator[Tuple[str, Any, Dict], None, None]:
+        """
+        Yields the input records without any modification.
+
+        Args:
+            records: Iterable of tuples each containing an id, a media and an optional dict.
+            adapter_context: Context of the adapter.
+
+        Yields:
+            Tuple[str, Any, Dict]: The input record.
+        """
+        for id, media, metadata in records:
+            yield (id, media, metadata or {})
diff --git a/R2R/r2r/vecs/adapter/text.py b/R2R/r2r/vecs/adapter/text.py
new file mode 100755
index 00000000..78ae7732
--- /dev/null
+++ b/R2R/r2r/vecs/adapter/text.py
@@ -0,0 +1,151 @@
+"""
+The `vecs.experimental.adapter.text` module provides adapter steps specifically designed for
+handling text data. It provides two main classes, `TextEmbedding` and `ParagraphChunker`.
+
+All public classes, enums, and functions are re-exported by `vecs.adapters` module.
+"""
+
+from typing import Any, Dict, Generator, Iterable, Literal, Optional, Tuple
+
+from flupy import flu
+from vecs.exc import MissingDependency
+
+from .base import AdapterContext, AdapterStep
+
+TextEmbeddingModel = Literal[
+    "all-mpnet-base-v2",
+    "multi-qa-mpnet-base-dot-v1",
+    "all-distilroberta-v1",
+    "all-MiniLM-L12-v2",
+    "multi-qa-distilbert-cos-v1",
+    "mixedbread-ai/mxbai-embed-large-v1",
+    "multi-qa-MiniLM-L6-cos-v1",
+    "paraphrase-multilingual-mpnet-base-v2",
+    "paraphrase-albert-small-v2",
+    "paraphrase-multilingual-MiniLM-L12-v2",
+    "paraphrase-MiniLM-L3-v2",
+    "distiluse-base-multilingual-cased-v1",
+    "distiluse-base-multilingual-cased-v2",
+]
+
+
+class TextEmbedding(AdapterStep):
+    """
+    TextEmbedding is an AdapterStep that converts text media into
+    embeddings using a specified sentence transformers model.
+    """
+
+    def __init__(
+        self,
+        *,
+        model: TextEmbeddingModel,
+        batch_size: int = 8,
+        use_auth_token: str = None,
+    ):
+        """
+        Initializes the TextEmbedding adapter with a sentence transformers model.
+
+        Args:
+            model (TextEmbeddingModel): The sentence transformers model to use for embeddings.
+            batch_size (int): The number of records to encode simultaneously.
+            use_auth_token (str): The HuggingFace Hub auth token to use for private models.
+
+        Raises:
+            MissingDependency: If the sentence_transformers library is not installed.
+        """
+        try:
+            from sentence_transformers import SentenceTransformer as ST
+        except ImportError:
+            raise MissingDependency(
+                "Missing feature vecs[text_embedding]. Hint: `pip install 'vecs[text_embedding]'`"
+            )
+
+        self.model = ST(model, use_auth_token=use_auth_token)
+        self._exported_dimension = (
+            self.model.get_sentence_embedding_dimension()
+        )
+        self.batch_size = batch_size
+
+    @property
+    def exported_dimension(self) -> Optional[int]:
+        """
+        Returns the dimension of the embeddings produced by the sentence transformers model.
+
+        Returns:
+            int: The dimension of the embeddings.
+        """
+        return self._exported_dimension
+
+    def __call__(
+        self,
+        records: Iterable[Tuple[str, Any, Optional[Dict]]],
+        adapter_context: AdapterContext,  # pyright: ignore
+    ) -> Generator[Tuple[str, Any, Dict], None, None]:
+        """
+        Converts each media in the records to an embedding and yields the result.
+
+        Args:
+            records: Iterable of tuples each containing an id, a media and an optional dict.
+            adapter_context: Context of the adapter.
+
+        Yields:
+            Tuple[str, Any, Dict]: The id, the embedding, and the metadata.
+        """
+        for batch in flu(records).chunk(self.batch_size):
+            batch_records = [x for x in batch]
+            media = [text for _, text, _ in batch_records]
+
+            embeddings = self.model.encode(media, normalize_embeddings=True)
+
+            for (id, _, metadata), embedding in zip(batch_records, embeddings):  # type: ignore
+                yield (id, embedding, metadata or {})
+
+
+class ParagraphChunker(AdapterStep):
+    """
+    ParagraphChunker is an AdapterStep that splits text media into
+    paragraphs and yields each paragraph as a separate record.
+    """
+
+    def __init__(self, *, skip_during_query: bool):
+        """
+        Initializes the ParagraphChunker adapter.
+
+        Args:
+            skip_during_query (bool): Whether to skip chunking during querying.
+        """
+        self.skip_during_query = skip_during_query
+
+    def __call__(
+        self,
+        records: Iterable[Tuple[str, Any, Optional[Dict]]],
+        adapter_context: AdapterContext,
+    ) -> Generator[Tuple[str, Any, Dict], None, None]:
+        """
+        Splits each media in the records into paragraphs and yields each paragraph
+        as a separate record. If the `skip_during_query` attribute is set to True,
+        this step is skipped during querying.
+
+        Args:
+            records (Iterable[Tuple[str, Any, Optional[Dict]]]): Iterable of tuples each containing an id, a media and an optional dict.
+            adapter_context (AdapterContext): Context of the adapter.
+
+        Yields:
+            Tuple[str, Any, Dict]: The id appended with paragraph index, the paragraph, and the metadata.
+        """
+        if (
+            adapter_context == AdapterContext("query")
+            and self.skip_during_query
+        ):
+            for id, media, metadata in records:
+                yield (id, media, metadata or {})
+        else:
+            for id, media, metadata in records:
+                paragraphs = media.split("\n\n")
+
+                for paragraph_ix, paragraph in enumerate(paragraphs):
+                    yield (
+                        f"{id}_para_{str(paragraph_ix).zfill(3)}",
+                        paragraph,
+                        metadata or {},
+                    )