diff options
Diffstat (limited to 'R2R/r2r/vecs/adapter')
-rwxr-xr-x | R2R/r2r/vecs/adapter/__init__.py | 15 | ||||
-rwxr-xr-x | R2R/r2r/vecs/adapter/base.py | 111 | ||||
-rwxr-xr-x | R2R/r2r/vecs/adapter/markdown.py | 88 | ||||
-rwxr-xr-x | R2R/r2r/vecs/adapter/noop.py | 55 | ||||
-rwxr-xr-x | R2R/r2r/vecs/adapter/text.py | 151 |
5 files changed, 420 insertions, 0 deletions
diff --git a/R2R/r2r/vecs/adapter/__init__.py b/R2R/r2r/vecs/adapter/__init__.py new file mode 100755 index 00000000..9cd9860d --- /dev/null +++ b/R2R/r2r/vecs/adapter/__init__.py @@ -0,0 +1,15 @@ +from .base import Adapter, AdapterContext, AdapterStep +from .markdown import MarkdownChunker +from .noop import NoOp +from .text import ParagraphChunker, TextEmbedding, TextEmbeddingModel + +__all__ = [ + "Adapter", + "AdapterContext", + "AdapterStep", + "NoOp", + "ParagraphChunker", + "TextEmbedding", + "TextEmbeddingModel", + "MarkdownChunker", +] diff --git a/R2R/r2r/vecs/adapter/base.py b/R2R/r2r/vecs/adapter/base.py new file mode 100755 index 00000000..7734e802 --- /dev/null +++ b/R2R/r2r/vecs/adapter/base.py @@ -0,0 +1,111 @@ +""" +The `vecs.experimental.adapter.base` module provides abstract classes and utilities +for creating and handling adapters in vecs. Adapters allow users to interact with +a collection using media types other than vectors. + +All public classes, enums, and functions are re-exported by `vecs.adapters` module. +""" + +from abc import ABC, abstractmethod +from enum import Enum +from typing import Any, Dict, Generator, Iterable, Optional, Tuple + +from vecs.exc import ArgError + + +class AdapterContext(str, Enum): + """ + An enum representing the different contexts in which a Pipeline + will be invoked. + + Attributes: + upsert (str): The Collection.upsert method + query (str): The Collection.query method + """ + + upsert = "upsert" + query = "query" + + +class AdapterStep(ABC): + """ + Abstract class representing a step in the adapter pipeline. + + Each adapter step should adapt a user media into a tuple of: + - id (str) + - media (unknown type) + - metadata (dict) + + If the user provides id or metadata, default production is overridden. + """ + + @property + def exported_dimension(self) -> Optional[int]: + """ + Property that should be overridden by subclasses to provide the output dimension + of the adapter step. + """ + return None + + @abstractmethod + def __call__( + self, + records: Iterable[Tuple[str, Any, Optional[Dict]]], + adapter_context: AdapterContext, + ) -> Generator[Tuple[str, Any, Dict], None, None]: + """ + Abstract method that should be overridden by subclasses to handle each record. + """ + + +class Adapter: + """ + Class representing a sequence of AdapterStep instances forming a pipeline. + """ + + def __init__(self, steps: list[AdapterStep]): + """ + Initialize an Adapter instance with a list of AdapterStep instances. + + Args: + steps: list of AdapterStep instances. + + Raises: + ArgError: Raised if the steps list is empty. + """ + self.steps = steps + if len(steps) < 1: + raise ArgError("Adapter must contain at least 1 step") + + @property + def exported_dimension(self) -> Optional[int]: + """ + The output dimension of the adapter. Returns the exported dimension of the last + AdapterStep that provides one (from end to start of the steps list). + """ + for step in reversed(self.steps): + step_dim = step.exported_dimension + if step_dim is not None: + return step_dim + return None + + def __call__( + self, + records: Iterable[Tuple[str, Any, Optional[Dict]]], + adapter_context: AdapterContext, + ) -> Generator[Tuple[str, Any, Dict], None, None]: + """ + Invokes the adapter pipeline on an iterable of records. + + Args: + records: Iterable of tuples each containing an id, a media and an optional dict. + adapter_context: Context of the adapter. + + Yields: + Tuples each containing an id, a media and a dict. + """ + pipeline = records + for step in self.steps: + pipeline = step(pipeline, adapter_context) + + yield from pipeline # type: ignore diff --git a/R2R/r2r/vecs/adapter/markdown.py b/R2R/r2r/vecs/adapter/markdown.py new file mode 100755 index 00000000..149573f4 --- /dev/null +++ b/R2R/r2r/vecs/adapter/markdown.py @@ -0,0 +1,88 @@ +import re +from typing import Any, Dict, Generator, Iterable, Optional, Tuple + +from flupy import flu + +from .base import AdapterContext, AdapterStep + + +class MarkdownChunker(AdapterStep): + """ + MarkdownChunker is an AdapterStep that splits a markdown string into chunks where a heading signifies the start of a chunk, and yields each chunk as a separate record. + """ + + def __init__(self, *, skip_during_query: bool): + """ + Initializes the MarkdownChunker adapter. + + Args: + skip_during_query (bool): Whether to skip chunking during querying. + """ + self.skip_during_query = skip_during_query + + @staticmethod + def split_by_heading( + md: str, max_tokens: int + ) -> Generator[str, None, None]: + regex_split = r"^(#{1,6}\s+.+)$" + headings = [ + match.span()[0] + for match in re.finditer(regex_split, md, flags=re.MULTILINE) + ] + + if headings == [] or headings[0] != 0: + headings.insert(0, 0) + + sections = [md[i:j] for i, j in zip(headings, headings[1:] + [None])] + + for section in sections: + chunks = flu(section.split(" ")).chunk(max_tokens) + + is_not_useless_chunk = lambda i: not i in ["", "\n", []] + + joined_chunks = filter( + is_not_useless_chunk, [" ".join(chunk) for chunk in chunks] + ) + + for joined_chunk in joined_chunks: + yield joined_chunk + + def __call__( + self, + records: Iterable[Tuple[str, Any, Optional[Dict]]], + adapter_context: AdapterContext, + max_tokens: int = 99999999, + ) -> Generator[Tuple[str, Any, Dict], None, None]: + """ + Splits each markdown string in the records into chunks where each heading starts a new chunk, and yields each chunk + as a separate record. If the `skip_during_query` attribute is set to True, + this step is skipped during querying. + + Args: + records (Iterable[Tuple[str, Any, Optional[Dict]]]): Iterable of tuples each containing an id, a markdown string and an optional dict. + adapter_context (AdapterContext): Context of the adapter. + max_tokens (int): The maximum number of tokens per chunk + + Yields: + Tuple[str, Any, Dict]: The id appended with chunk index, the chunk, and the metadata. + """ + if max_tokens and max_tokens < 1: + raise ValueError("max_tokens must be a nonzero positive integer") + + if ( + adapter_context == AdapterContext("query") + and self.skip_during_query + ): + for id, markdown, metadata in records: + yield (id, markdown, metadata or {}) + else: + for id, markdown, metadata in records: + headings = MarkdownChunker.split_by_heading( + markdown, max_tokens + ) + for heading_ix, heading in enumerate(headings): + yield ( + f"{id}_head_{str(heading_ix).zfill(3)}", + heading, + metadata or {}, + ) diff --git a/R2R/r2r/vecs/adapter/noop.py b/R2R/r2r/vecs/adapter/noop.py new file mode 100755 index 00000000..b587a552 --- /dev/null +++ b/R2R/r2r/vecs/adapter/noop.py @@ -0,0 +1,55 @@ +""" +The `vecs.experimental.adapter.noop` module provides a default no-op (no operation) adapter +that passes the inputs through without any modification. This can be useful when no specific +adapter processing is required. + +All public classes, enums, and functions are re-exported by `vecs.adapters` module. +""" + +from typing import Any, Dict, Generator, Iterable, Optional, Tuple + +from .base import AdapterContext, AdapterStep + + +class NoOp(AdapterStep): + """ + NoOp is a no-operation AdapterStep. It is a default adapter that passes through + the input records without any modifications. + """ + + def __init__(self, dimension: int): + """ + Initializes the NoOp adapter with a dimension. + + Args: + dimension (int): The dimension of the input vectors. + """ + self._dimension = dimension + + @property + def exported_dimension(self) -> Optional[int]: + """ + Returns the dimension of the adapter. + + Returns: + int: The dimension of the input vectors. + """ + return self._dimension + + def __call__( + self, + records: Iterable[Tuple[str, Any, Optional[Dict]]], + adapter_context: AdapterContext, + ) -> Generator[Tuple[str, Any, Dict], None, None]: + """ + Yields the input records without any modification. + + Args: + records: Iterable of tuples each containing an id, a media and an optional dict. + adapter_context: Context of the adapter. + + Yields: + Tuple[str, Any, Dict]: The input record. + """ + for id, media, metadata in records: + yield (id, media, metadata or {}) diff --git a/R2R/r2r/vecs/adapter/text.py b/R2R/r2r/vecs/adapter/text.py new file mode 100755 index 00000000..78ae7732 --- /dev/null +++ b/R2R/r2r/vecs/adapter/text.py @@ -0,0 +1,151 @@ +""" +The `vecs.experimental.adapter.text` module provides adapter steps specifically designed for +handling text data. It provides two main classes, `TextEmbedding` and `ParagraphChunker`. + +All public classes, enums, and functions are re-exported by `vecs.adapters` module. +""" + +from typing import Any, Dict, Generator, Iterable, Literal, Optional, Tuple + +from flupy import flu +from vecs.exc import MissingDependency + +from .base import AdapterContext, AdapterStep + +TextEmbeddingModel = Literal[ + "all-mpnet-base-v2", + "multi-qa-mpnet-base-dot-v1", + "all-distilroberta-v1", + "all-MiniLM-L12-v2", + "multi-qa-distilbert-cos-v1", + "mixedbread-ai/mxbai-embed-large-v1", + "multi-qa-MiniLM-L6-cos-v1", + "paraphrase-multilingual-mpnet-base-v2", + "paraphrase-albert-small-v2", + "paraphrase-multilingual-MiniLM-L12-v2", + "paraphrase-MiniLM-L3-v2", + "distiluse-base-multilingual-cased-v1", + "distiluse-base-multilingual-cased-v2", +] + + +class TextEmbedding(AdapterStep): + """ + TextEmbedding is an AdapterStep that converts text media into + embeddings using a specified sentence transformers model. + """ + + def __init__( + self, + *, + model: TextEmbeddingModel, + batch_size: int = 8, + use_auth_token: str = None, + ): + """ + Initializes the TextEmbedding adapter with a sentence transformers model. + + Args: + model (TextEmbeddingModel): The sentence transformers model to use for embeddings. + batch_size (int): The number of records to encode simultaneously. + use_auth_token (str): The HuggingFace Hub auth token to use for private models. + + Raises: + MissingDependency: If the sentence_transformers library is not installed. + """ + try: + from sentence_transformers import SentenceTransformer as ST + except ImportError: + raise MissingDependency( + "Missing feature vecs[text_embedding]. Hint: `pip install 'vecs[text_embedding]'`" + ) + + self.model = ST(model, use_auth_token=use_auth_token) + self._exported_dimension = ( + self.model.get_sentence_embedding_dimension() + ) + self.batch_size = batch_size + + @property + def exported_dimension(self) -> Optional[int]: + """ + Returns the dimension of the embeddings produced by the sentence transformers model. + + Returns: + int: The dimension of the embeddings. + """ + return self._exported_dimension + + def __call__( + self, + records: Iterable[Tuple[str, Any, Optional[Dict]]], + adapter_context: AdapterContext, # pyright: ignore + ) -> Generator[Tuple[str, Any, Dict], None, None]: + """ + Converts each media in the records to an embedding and yields the result. + + Args: + records: Iterable of tuples each containing an id, a media and an optional dict. + adapter_context: Context of the adapter. + + Yields: + Tuple[str, Any, Dict]: The id, the embedding, and the metadata. + """ + for batch in flu(records).chunk(self.batch_size): + batch_records = [x for x in batch] + media = [text for _, text, _ in batch_records] + + embeddings = self.model.encode(media, normalize_embeddings=True) + + for (id, _, metadata), embedding in zip(batch_records, embeddings): # type: ignore + yield (id, embedding, metadata or {}) + + +class ParagraphChunker(AdapterStep): + """ + ParagraphChunker is an AdapterStep that splits text media into + paragraphs and yields each paragraph as a separate record. + """ + + def __init__(self, *, skip_during_query: bool): + """ + Initializes the ParagraphChunker adapter. + + Args: + skip_during_query (bool): Whether to skip chunking during querying. + """ + self.skip_during_query = skip_during_query + + def __call__( + self, + records: Iterable[Tuple[str, Any, Optional[Dict]]], + adapter_context: AdapterContext, + ) -> Generator[Tuple[str, Any, Dict], None, None]: + """ + Splits each media in the records into paragraphs and yields each paragraph + as a separate record. If the `skip_during_query` attribute is set to True, + this step is skipped during querying. + + Args: + records (Iterable[Tuple[str, Any, Optional[Dict]]]): Iterable of tuples each containing an id, a media and an optional dict. + adapter_context (AdapterContext): Context of the adapter. + + Yields: + Tuple[str, Any, Dict]: The id appended with paragraph index, the paragraph, and the metadata. + """ + if ( + adapter_context == AdapterContext("query") + and self.skip_during_query + ): + for id, media, metadata in records: + yield (id, media, metadata or {}) + else: + for id, media, metadata in records: + paragraphs = media.split("\n\n") + + for paragraph_ix, paragraph in enumerate(paragraphs): + yield ( + f"{id}_para_{str(paragraph_ix).zfill(3)}", + paragraph, + metadata or {}, + ) |