aboutsummaryrefslogtreecommitdiff
path: root/R2R/r2r/vecs/adapter/base.py
blob: 7734e80209a0348ff892a6da76928f7145df4ab7 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
The `vecs.experimental.adapter.base` module provides abstract classes and utilities
for creating and handling adapters in vecs. Adapters allow users to interact with
a collection using media types other than vectors.

All public classes, enums, and functions are re-exported by `vecs.adapters` module.
"""

from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, Generator, Iterable, Optional, Tuple

from vecs.exc import ArgError


class AdapterContext(str, Enum):
    """
    An enum representing the different contexts in which a Pipeline
    will be invoked.

    Attributes:
        upsert (str): The Collection.upsert method
        query (str): The Collection.query method
    """

    upsert = "upsert"
    query = "query"


class AdapterStep(ABC):
    """
    Abstract class representing a step in the adapter pipeline.

    Each adapter step should adapt a user media into a tuple of:
     - id (str)
     - media (unknown type)
     - metadata (dict)

    If the user provides id or metadata, default production is overridden.
    """

    @property
    def exported_dimension(self) -> Optional[int]:
        """
        Property that should be overridden by subclasses to provide the output dimension
        of the adapter step.
        """
        return None

    @abstractmethod
    def __call__(
        self,
        records: Iterable[Tuple[str, Any, Optional[Dict]]],
        adapter_context: AdapterContext,
    ) -> Generator[Tuple[str, Any, Dict], None, None]:
        """
        Abstract method that should be overridden by subclasses to handle each record.
        """


class Adapter:
    """
    Class representing a sequence of AdapterStep instances forming a pipeline.
    """

    def __init__(self, steps: list[AdapterStep]):
        """
        Initialize an Adapter instance with a list of AdapterStep instances.

        Args:
            steps: list of AdapterStep instances.

        Raises:
            ArgError: Raised if the steps list is empty.
        """
        self.steps = steps
        if len(steps) < 1:
            raise ArgError("Adapter must contain at least 1 step")

    @property
    def exported_dimension(self) -> Optional[int]:
        """
        The output dimension of the adapter. Returns the exported dimension of the last
        AdapterStep that provides one (from end to start of the steps list).
        """
        for step in reversed(self.steps):
            step_dim = step.exported_dimension
            if step_dim is not None:
                return step_dim
        return None

    def __call__(
        self,
        records: Iterable[Tuple[str, Any, Optional[Dict]]],
        adapter_context: AdapterContext,
    ) -> Generator[Tuple[str, Any, Dict], None, None]:
        """
        Invokes the adapter pipeline on an iterable of records.

        Args:
            records: Iterable of tuples each containing an id, a media and an optional dict.
            adapter_context: Context of the adapter.

        Yields:
            Tuples each containing an id, a media and a dict.
        """
        pipeline = records
        for step in self.steps:
            pipeline = step(pipeline, adapter_context)

        yield from pipeline  # type: ignore