aboutsummaryrefslogtreecommitdiff
path: root/R2R/r2r/vecs/adapter
diff options
context:
space:
mode:
Diffstat (limited to 'R2R/r2r/vecs/adapter')
-rwxr-xr-xR2R/r2r/vecs/adapter/__init__.py15
-rwxr-xr-xR2R/r2r/vecs/adapter/base.py111
-rwxr-xr-xR2R/r2r/vecs/adapter/markdown.py88
-rwxr-xr-xR2R/r2r/vecs/adapter/noop.py55
-rwxr-xr-xR2R/r2r/vecs/adapter/text.py151
5 files changed, 420 insertions, 0 deletions
diff --git a/R2R/r2r/vecs/adapter/__init__.py b/R2R/r2r/vecs/adapter/__init__.py
new file mode 100755
index 00000000..9cd9860d
--- /dev/null
+++ b/R2R/r2r/vecs/adapter/__init__.py
@@ -0,0 +1,15 @@
+from .base import Adapter, AdapterContext, AdapterStep
+from .markdown import MarkdownChunker
+from .noop import NoOp
+from .text import ParagraphChunker, TextEmbedding, TextEmbeddingModel
+
+__all__ = [
+ "Adapter",
+ "AdapterContext",
+ "AdapterStep",
+ "NoOp",
+ "ParagraphChunker",
+ "TextEmbedding",
+ "TextEmbeddingModel",
+ "MarkdownChunker",
+]
diff --git a/R2R/r2r/vecs/adapter/base.py b/R2R/r2r/vecs/adapter/base.py
new file mode 100755
index 00000000..7734e802
--- /dev/null
+++ b/R2R/r2r/vecs/adapter/base.py
@@ -0,0 +1,111 @@
+"""
+The `vecs.experimental.adapter.base` module provides abstract classes and utilities
+for creating and handling adapters in vecs. Adapters allow users to interact with
+a collection using media types other than vectors.
+
+All public classes, enums, and functions are re-exported by `vecs.adapters` module.
+"""
+
+from abc import ABC, abstractmethod
+from enum import Enum
+from typing import Any, Dict, Generator, Iterable, Optional, Tuple
+
+from vecs.exc import ArgError
+
+
+class AdapterContext(str, Enum):
+ """
+ An enum representing the different contexts in which a Pipeline
+ will be invoked.
+
+ Attributes:
+ upsert (str): The Collection.upsert method
+ query (str): The Collection.query method
+ """
+
+ upsert = "upsert"
+ query = "query"
+
+
+class AdapterStep(ABC):
+ """
+ Abstract class representing a step in the adapter pipeline.
+
+ Each adapter step should adapt a user media into a tuple of:
+ - id (str)
+ - media (unknown type)
+ - metadata (dict)
+
+ If the user provides id or metadata, default production is overridden.
+ """
+
+ @property
+ def exported_dimension(self) -> Optional[int]:
+ """
+ Property that should be overridden by subclasses to provide the output dimension
+ of the adapter step.
+ """
+ return None
+
+ @abstractmethod
+ def __call__(
+ self,
+ records: Iterable[Tuple[str, Any, Optional[Dict]]],
+ adapter_context: AdapterContext,
+ ) -> Generator[Tuple[str, Any, Dict], None, None]:
+ """
+ Abstract method that should be overridden by subclasses to handle each record.
+ """
+
+
+class Adapter:
+ """
+ Class representing a sequence of AdapterStep instances forming a pipeline.
+ """
+
+ def __init__(self, steps: list[AdapterStep]):
+ """
+ Initialize an Adapter instance with a list of AdapterStep instances.
+
+ Args:
+ steps: list of AdapterStep instances.
+
+ Raises:
+ ArgError: Raised if the steps list is empty.
+ """
+ self.steps = steps
+ if len(steps) < 1:
+ raise ArgError("Adapter must contain at least 1 step")
+
+ @property
+ def exported_dimension(self) -> Optional[int]:
+ """
+ The output dimension of the adapter. Returns the exported dimension of the last
+ AdapterStep that provides one (from end to start of the steps list).
+ """
+ for step in reversed(self.steps):
+ step_dim = step.exported_dimension
+ if step_dim is not None:
+ return step_dim
+ return None
+
+ def __call__(
+ self,
+ records: Iterable[Tuple[str, Any, Optional[Dict]]],
+ adapter_context: AdapterContext,
+ ) -> Generator[Tuple[str, Any, Dict], None, None]:
+ """
+ Invokes the adapter pipeline on an iterable of records.
+
+ Args:
+ records: Iterable of tuples each containing an id, a media and an optional dict.
+ adapter_context: Context of the adapter.
+
+ Yields:
+ Tuples each containing an id, a media and a dict.
+ """
+ pipeline = records
+ for step in self.steps:
+ pipeline = step(pipeline, adapter_context)
+
+ yield from pipeline # type: ignore
diff --git a/R2R/r2r/vecs/adapter/markdown.py b/R2R/r2r/vecs/adapter/markdown.py
new file mode 100755
index 00000000..149573f4
--- /dev/null
+++ b/R2R/r2r/vecs/adapter/markdown.py
@@ -0,0 +1,88 @@
+import re
+from typing import Any, Dict, Generator, Iterable, Optional, Tuple
+
+from flupy import flu
+
+from .base import AdapterContext, AdapterStep
+
+
+class MarkdownChunker(AdapterStep):
+ """
+ MarkdownChunker is an AdapterStep that splits a markdown string into chunks where a heading signifies the start of a chunk, and yields each chunk as a separate record.
+ """
+
+ def __init__(self, *, skip_during_query: bool):
+ """
+ Initializes the MarkdownChunker adapter.
+
+ Args:
+ skip_during_query (bool): Whether to skip chunking during querying.
+ """
+ self.skip_during_query = skip_during_query
+
+ @staticmethod
+ def split_by_heading(
+ md: str, max_tokens: int
+ ) -> Generator[str, None, None]:
+ regex_split = r"^(#{1,6}\s+.+)$"
+ headings = [
+ match.span()[0]
+ for match in re.finditer(regex_split, md, flags=re.MULTILINE)
+ ]
+
+ if headings == [] or headings[0] != 0:
+ headings.insert(0, 0)
+
+ sections = [md[i:j] for i, j in zip(headings, headings[1:] + [None])]
+
+ for section in sections:
+ chunks = flu(section.split(" ")).chunk(max_tokens)
+
+ is_not_useless_chunk = lambda i: not i in ["", "\n", []]
+
+ joined_chunks = filter(
+ is_not_useless_chunk, [" ".join(chunk) for chunk in chunks]
+ )
+
+ for joined_chunk in joined_chunks:
+ yield joined_chunk
+
+ def __call__(
+ self,
+ records: Iterable[Tuple[str, Any, Optional[Dict]]],
+ adapter_context: AdapterContext,
+ max_tokens: int = 99999999,
+ ) -> Generator[Tuple[str, Any, Dict], None, None]:
+ """
+ Splits each markdown string in the records into chunks where each heading starts a new chunk, and yields each chunk
+ as a separate record. If the `skip_during_query` attribute is set to True,
+ this step is skipped during querying.
+
+ Args:
+ records (Iterable[Tuple[str, Any, Optional[Dict]]]): Iterable of tuples each containing an id, a markdown string and an optional dict.
+ adapter_context (AdapterContext): Context of the adapter.
+ max_tokens (int): The maximum number of tokens per chunk
+
+ Yields:
+ Tuple[str, Any, Dict]: The id appended with chunk index, the chunk, and the metadata.
+ """
+ if max_tokens and max_tokens < 1:
+ raise ValueError("max_tokens must be a nonzero positive integer")
+
+ if (
+ adapter_context == AdapterContext("query")
+ and self.skip_during_query
+ ):
+ for id, markdown, metadata in records:
+ yield (id, markdown, metadata or {})
+ else:
+ for id, markdown, metadata in records:
+ headings = MarkdownChunker.split_by_heading(
+ markdown, max_tokens
+ )
+ for heading_ix, heading in enumerate(headings):
+ yield (
+ f"{id}_head_{str(heading_ix).zfill(3)}",
+ heading,
+ metadata or {},
+ )
diff --git a/R2R/r2r/vecs/adapter/noop.py b/R2R/r2r/vecs/adapter/noop.py
new file mode 100755
index 00000000..b587a552
--- /dev/null
+++ b/R2R/r2r/vecs/adapter/noop.py
@@ -0,0 +1,55 @@
+"""
+The `vecs.experimental.adapter.noop` module provides a default no-op (no operation) adapter
+that passes the inputs through without any modification. This can be useful when no specific
+adapter processing is required.
+
+All public classes, enums, and functions are re-exported by `vecs.adapters` module.
+"""
+
+from typing import Any, Dict, Generator, Iterable, Optional, Tuple
+
+from .base import AdapterContext, AdapterStep
+
+
+class NoOp(AdapterStep):
+ """
+ NoOp is a no-operation AdapterStep. It is a default adapter that passes through
+ the input records without any modifications.
+ """
+
+ def __init__(self, dimension: int):
+ """
+ Initializes the NoOp adapter with a dimension.
+
+ Args:
+ dimension (int): The dimension of the input vectors.
+ """
+ self._dimension = dimension
+
+ @property
+ def exported_dimension(self) -> Optional[int]:
+ """
+ Returns the dimension of the adapter.
+
+ Returns:
+ int: The dimension of the input vectors.
+ """
+ return self._dimension
+
+ def __call__(
+ self,
+ records: Iterable[Tuple[str, Any, Optional[Dict]]],
+ adapter_context: AdapterContext,
+ ) -> Generator[Tuple[str, Any, Dict], None, None]:
+ """
+ Yields the input records without any modification.
+
+ Args:
+ records: Iterable of tuples each containing an id, a media and an optional dict.
+ adapter_context: Context of the adapter.
+
+ Yields:
+ Tuple[str, Any, Dict]: The input record.
+ """
+ for id, media, metadata in records:
+ yield (id, media, metadata or {})
diff --git a/R2R/r2r/vecs/adapter/text.py b/R2R/r2r/vecs/adapter/text.py
new file mode 100755
index 00000000..78ae7732
--- /dev/null
+++ b/R2R/r2r/vecs/adapter/text.py
@@ -0,0 +1,151 @@
+"""
+The `vecs.experimental.adapter.text` module provides adapter steps specifically designed for
+handling text data. It provides two main classes, `TextEmbedding` and `ParagraphChunker`.
+
+All public classes, enums, and functions are re-exported by `vecs.adapters` module.
+"""
+
+from typing import Any, Dict, Generator, Iterable, Literal, Optional, Tuple
+
+from flupy import flu
+from vecs.exc import MissingDependency
+
+from .base import AdapterContext, AdapterStep
+
+TextEmbeddingModel = Literal[
+ "all-mpnet-base-v2",
+ "multi-qa-mpnet-base-dot-v1",
+ "all-distilroberta-v1",
+ "all-MiniLM-L12-v2",
+ "multi-qa-distilbert-cos-v1",
+ "mixedbread-ai/mxbai-embed-large-v1",
+ "multi-qa-MiniLM-L6-cos-v1",
+ "paraphrase-multilingual-mpnet-base-v2",
+ "paraphrase-albert-small-v2",
+ "paraphrase-multilingual-MiniLM-L12-v2",
+ "paraphrase-MiniLM-L3-v2",
+ "distiluse-base-multilingual-cased-v1",
+ "distiluse-base-multilingual-cased-v2",
+]
+
+
+class TextEmbedding(AdapterStep):
+ """
+ TextEmbedding is an AdapterStep that converts text media into
+ embeddings using a specified sentence transformers model.
+ """
+
+ def __init__(
+ self,
+ *,
+ model: TextEmbeddingModel,
+ batch_size: int = 8,
+ use_auth_token: str = None,
+ ):
+ """
+ Initializes the TextEmbedding adapter with a sentence transformers model.
+
+ Args:
+ model (TextEmbeddingModel): The sentence transformers model to use for embeddings.
+ batch_size (int): The number of records to encode simultaneously.
+ use_auth_token (str): The HuggingFace Hub auth token to use for private models.
+
+ Raises:
+ MissingDependency: If the sentence_transformers library is not installed.
+ """
+ try:
+ from sentence_transformers import SentenceTransformer as ST
+ except ImportError:
+ raise MissingDependency(
+ "Missing feature vecs[text_embedding]. Hint: `pip install 'vecs[text_embedding]'`"
+ )
+
+ self.model = ST(model, use_auth_token=use_auth_token)
+ self._exported_dimension = (
+ self.model.get_sentence_embedding_dimension()
+ )
+ self.batch_size = batch_size
+
+ @property
+ def exported_dimension(self) -> Optional[int]:
+ """
+ Returns the dimension of the embeddings produced by the sentence transformers model.
+
+ Returns:
+ int: The dimension of the embeddings.
+ """
+ return self._exported_dimension
+
+ def __call__(
+ self,
+ records: Iterable[Tuple[str, Any, Optional[Dict]]],
+ adapter_context: AdapterContext, # pyright: ignore
+ ) -> Generator[Tuple[str, Any, Dict], None, None]:
+ """
+ Converts each media in the records to an embedding and yields the result.
+
+ Args:
+ records: Iterable of tuples each containing an id, a media and an optional dict.
+ adapter_context: Context of the adapter.
+
+ Yields:
+ Tuple[str, Any, Dict]: The id, the embedding, and the metadata.
+ """
+ for batch in flu(records).chunk(self.batch_size):
+ batch_records = [x for x in batch]
+ media = [text for _, text, _ in batch_records]
+
+ embeddings = self.model.encode(media, normalize_embeddings=True)
+
+ for (id, _, metadata), embedding in zip(batch_records, embeddings): # type: ignore
+ yield (id, embedding, metadata or {})
+
+
+class ParagraphChunker(AdapterStep):
+ """
+ ParagraphChunker is an AdapterStep that splits text media into
+ paragraphs and yields each paragraph as a separate record.
+ """
+
+ def __init__(self, *, skip_during_query: bool):
+ """
+ Initializes the ParagraphChunker adapter.
+
+ Args:
+ skip_during_query (bool): Whether to skip chunking during querying.
+ """
+ self.skip_during_query = skip_during_query
+
+ def __call__(
+ self,
+ records: Iterable[Tuple[str, Any, Optional[Dict]]],
+ adapter_context: AdapterContext,
+ ) -> Generator[Tuple[str, Any, Dict], None, None]:
+ """
+ Splits each media in the records into paragraphs and yields each paragraph
+ as a separate record. If the `skip_during_query` attribute is set to True,
+ this step is skipped during querying.
+
+ Args:
+ records (Iterable[Tuple[str, Any, Optional[Dict]]]): Iterable of tuples each containing an id, a media and an optional dict.
+ adapter_context (AdapterContext): Context of the adapter.
+
+ Yields:
+ Tuple[str, Any, Dict]: The id appended with paragraph index, the paragraph, and the metadata.
+ """
+ if (
+ adapter_context == AdapterContext("query")
+ and self.skip_during_query
+ ):
+ for id, media, metadata in records:
+ yield (id, media, metadata or {})
+ else:
+ for id, media, metadata in records:
+ paragraphs = media.split("\n\n")
+
+ for paragraph_ix, paragraph in enumerate(paragraphs):
+ yield (
+ f"{id}_para_{str(paragraph_ix).zfill(3)}",
+ paragraph,
+ metadata or {},
+ )