about summary refs log tree commit diff
path: root/R2R/r2r/vecs/adapter/base.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /R2R/r2r/vecs/adapter/base.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to 'R2R/r2r/vecs/adapter/base.py')
-rwxr-xr-xR2R/r2r/vecs/adapter/base.py111
1 files changed, 111 insertions, 0 deletions
diff --git a/R2R/r2r/vecs/adapter/base.py b/R2R/r2r/vecs/adapter/base.py
new file mode 100755
index 00000000..7734e802
--- /dev/null
+++ b/R2R/r2r/vecs/adapter/base.py
@@ -0,0 +1,111 @@
+"""
+The `vecs.experimental.adapter.base` module provides abstract classes and utilities
+for creating and handling adapters in vecs. Adapters allow users to interact with
+a collection using media types other than vectors.
+
+All public classes, enums, and functions are re-exported by `vecs.adapters` module.
+"""
+
+from abc import ABC, abstractmethod
+from enum import Enum
+from typing import Any, Dict, Generator, Iterable, Optional, Tuple
+
+from vecs.exc import ArgError
+
+
+class AdapterContext(str, Enum):
+    """
+    An enum representing the different contexts in which a Pipeline
+    will be invoked.
+
+    Attributes:
+        upsert (str): The Collection.upsert method
+        query (str): The Collection.query method
+    """
+
+    upsert = "upsert"
+    query = "query"
+
+
+class AdapterStep(ABC):
+    """
+    Abstract class representing a step in the adapter pipeline.
+
+    Each adapter step should adapt a user media into a tuple of:
+     - id (str)
+     - media (unknown type)
+     - metadata (dict)
+
+    If the user provides id or metadata, default production is overridden.
+    """
+
+    @property
+    def exported_dimension(self) -> Optional[int]:
+        """
+        Property that should be overridden by subclasses to provide the output dimension
+        of the adapter step.
+        """
+        return None
+
+    @abstractmethod
+    def __call__(
+        self,
+        records: Iterable[Tuple[str, Any, Optional[Dict]]],
+        adapter_context: AdapterContext,
+    ) -> Generator[Tuple[str, Any, Dict], None, None]:
+        """
+        Abstract method that should be overridden by subclasses to handle each record.
+        """
+
+
+class Adapter:
+    """
+    Class representing a sequence of AdapterStep instances forming a pipeline.
+    """
+
+    def __init__(self, steps: list[AdapterStep]):
+        """
+        Initialize an Adapter instance with a list of AdapterStep instances.
+
+        Args:
+            steps: list of AdapterStep instances.
+
+        Raises:
+            ArgError: Raised if the steps list is empty.
+        """
+        self.steps = steps
+        if len(steps) < 1:
+            raise ArgError("Adapter must contain at least 1 step")
+
+    @property
+    def exported_dimension(self) -> Optional[int]:
+        """
+        The output dimension of the adapter. Returns the exported dimension of the last
+        AdapterStep that provides one (from end to start of the steps list).
+        """
+        for step in reversed(self.steps):
+            step_dim = step.exported_dimension
+            if step_dim is not None:
+                return step_dim
+        return None
+
+    def __call__(
+        self,
+        records: Iterable[Tuple[str, Any, Optional[Dict]]],
+        adapter_context: AdapterContext,
+    ) -> Generator[Tuple[str, Any, Dict], None, None]:
+        """
+        Invokes the adapter pipeline on an iterable of records.
+
+        Args:
+            records: Iterable of tuples each containing an id, a media and an optional dict.
+            adapter_context: Context of the adapter.
+
+        Yields:
+            Tuples each containing an id, a media and a dict.
+        """
+        pipeline = records
+        for step in self.steps:
+            pipeline = step(pipeline, adapter_context)
+
+        yield from pipeline  # type: ignore