1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
"""
The `vecs.experimental.adapter.base` module provides abstract classes and utilities
for creating and handling adapters in vecs. Adapters allow users to interact with
a collection using media types other than vectors.
All public classes, enums, and functions are re-exported by `vecs.adapters` module.
"""
from abc import ABC, abstractmethod
from enum import Enum
from typing import Any, Dict, Generator, Iterable, Optional, Tuple
from vecs.exc import ArgError
class AdapterContext(str, Enum):
"""
An enum representing the different contexts in which a Pipeline
will be invoked.
Attributes:
upsert (str): The Collection.upsert method
query (str): The Collection.query method
"""
upsert = "upsert"
query = "query"
class AdapterStep(ABC):
"""
Abstract class representing a step in the adapter pipeline.
Each adapter step should adapt a user media into a tuple of:
- id (str)
- media (unknown type)
- metadata (dict)
If the user provides id or metadata, default production is overridden.
"""
@property
def exported_dimension(self) -> Optional[int]:
"""
Property that should be overridden by subclasses to provide the output dimension
of the adapter step.
"""
return None
@abstractmethod
def __call__(
self,
records: Iterable[Tuple[str, Any, Optional[Dict]]],
adapter_context: AdapterContext,
) -> Generator[Tuple[str, Any, Dict], None, None]:
"""
Abstract method that should be overridden by subclasses to handle each record.
"""
class Adapter:
"""
Class representing a sequence of AdapterStep instances forming a pipeline.
"""
def __init__(self, steps: list[AdapterStep]):
"""
Initialize an Adapter instance with a list of AdapterStep instances.
Args:
steps: list of AdapterStep instances.
Raises:
ArgError: Raised if the steps list is empty.
"""
self.steps = steps
if len(steps) < 1:
raise ArgError("Adapter must contain at least 1 step")
@property
def exported_dimension(self) -> Optional[int]:
"""
The output dimension of the adapter. Returns the exported dimension of the last
AdapterStep that provides one (from end to start of the steps list).
"""
for step in reversed(self.steps):
step_dim = step.exported_dimension
if step_dim is not None:
return step_dim
return None
def __call__(
self,
records: Iterable[Tuple[str, Any, Optional[Dict]]],
adapter_context: AdapterContext,
) -> Generator[Tuple[str, Any, Dict], None, None]:
"""
Invokes the adapter pipeline on an iterable of records.
Args:
records: Iterable of tuples each containing an id, a media and an optional dict.
adapter_context: Context of the adapter.
Yields:
Tuples each containing an id, a media and a dict.
"""
pipeline = records
for step in self.steps:
pipeline = step(pipeline, adapter_context)
yield from pipeline # type: ignore
|