1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
import logging
import uuid
from abc import abstractmethod
from typing import Any, AsyncGenerator, Optional, Union
from r2r.base import (
AsyncPipe,
AsyncState,
KVLoggingSingleton,
PipeType,
VectorSearchResult,
)
logger = logging.getLogger(__name__)
class SearchPipe(AsyncPipe):
class SearchConfig(AsyncPipe.PipeConfig):
name: str = "default_vector_search"
search_filters: dict = {}
search_limit: int = 10
class Input(AsyncPipe.Input):
message: Union[AsyncGenerator[str, None], str]
def __init__(
self,
pipe_logger: Optional[KVLoggingSingleton] = None,
type: PipeType = PipeType.SEARCH,
config: Optional[AsyncPipe.PipeConfig] = None,
*args,
**kwargs,
):
super().__init__(
pipe_logger=pipe_logger,
type=type,
config=config,
*args,
**kwargs,
)
@abstractmethod
async def search(
self,
query: str,
filters: dict[str, Any] = {},
limit: int = 10,
*args: Any,
**kwargs: Any,
) -> AsyncGenerator[VectorSearchResult, None]:
pass
@abstractmethod
async def _run_logic(
self,
input: Input,
state: AsyncState,
run_id: uuid.UUID,
*args: Any,
**kwargs,
) -> AsyncGenerator[VectorSearchResult, None]:
pass
|