about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sdk/sync_client.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sdk/sync_client.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sdk/sync_client.py165
1 files changed, 165 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sdk/sync_client.py b/.venv/lib/python3.12/site-packages/sdk/sync_client.py
new file mode 100644
index 00000000..93db993f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sdk/sync_client.py
@@ -0,0 +1,165 @@
+import json
+from io import BytesIO
+from typing import Any, Generator
+
+from httpx import Client, RequestError, Response
+
+from shared.abstractions import R2RException
+
+from .base.base_client import BaseClient
+from .sync_methods import (
+    ChunksSDK,
+    CollectionsSDK,
+    ConversationsSDK,
+    DocumentsSDK,
+    GraphsSDK,
+    IndicesSDK,
+    PromptsSDK,
+    RetrievalSDK,
+    SystemSDK,
+    UsersSDK,
+)
+
+
+class R2RClient(BaseClient):
+    def __init__(
+        self,
+        base_url: str | None = None,
+        timeout: float = 300.0,
+        custom_client=None,
+    ):
+        super().__init__(base_url, timeout)
+        self.client = custom_client or Client(timeout=timeout)
+        self.chunks = ChunksSDK(self)
+        self.collections = CollectionsSDK(self)
+        self.conversations = ConversationsSDK(self)
+        self.documents = DocumentsSDK(self)
+        self.graphs = GraphsSDK(self)
+        self.indices = IndicesSDK(self)
+        self.prompts = PromptsSDK(self)
+        self.retrieval = RetrievalSDK(self)
+        self.system = SystemSDK(self)
+        self.users = UsersSDK(self)
+
+    def _make_request(
+        self, method: str, endpoint: str, version: str = "v3", **kwargs
+    ) -> dict[str, Any] | BytesIO | None:
+        url = self._get_full_url(endpoint, version)
+        if (
+            "https://api.sciphi.ai" in url
+            and ("login" not in endpoint)
+            and ("create" not in endpoint)
+            and ("users" not in endpoint)
+            and ("health" not in endpoint)
+            and (not self.access_token and not self.api_key)
+        ):
+            raise R2RException(
+                status_code=401,
+                message="Access token or api key is required to access `https://api.sciphi.ai`. To change the base url, use `set_base_url` method or set the local environment variable `R2R_API_BASE` to `http://localhost:7272`.",
+            )
+        request_args = self._prepare_request_args(endpoint, **kwargs)
+
+        try:
+            response = self.client.request(method, url, **request_args)
+            self._handle_response(response)
+
+            if "application/json" in response.headers.get("Content-Type", ""):
+                return response.json() if response.content else None
+            else:
+                return BytesIO(response.content)
+
+        except RequestError as e:
+            raise R2RException(
+                status_code=500,
+                message=f"Request failed: {str(e)}",
+            ) from e
+
+    def _make_streaming_request(
+        self, method: str, endpoint: str, version: str = "v3", **kwargs
+    ) -> Generator[dict[str, str], None, None]:
+        """
+        Make a streaming request, parsing Server-Sent Events (SSE) in multiline form.
+
+        Yields a dictionary with keys:
+        - "event": the event type (or "unknown" if not provided)
+        - "data": the JSON string (possibly spanning multiple lines) accumulated from the event's data lines
+        """
+        url = self._get_full_url(endpoint, version)
+        request_args = self._prepare_request_args(endpoint, **kwargs)
+
+        with Client(timeout=self.timeout) as client:
+            with client.stream(method, url, **request_args) as response:
+                self._handle_response(response)
+
+                sse_event_block: dict[str, Any] = {"event": None, "data": []}
+
+                for line in response.iter_lines():
+                    if isinstance(line, bytes):
+                        line = line.decode("utf-8", "replace")
+
+                    # Blank line -> end of this SSE event
+                    if line == "":
+                        # If there's any accumulated data, yield this event
+                        if sse_event_block["data"]:
+                            data_str = "".join(sse_event_block["data"])
+                            yield {
+                                "event": sse_event_block["event"] or "unknown",
+                                "data": data_str,
+                            }
+                        # Reset the block
+                        sse_event_block = {"event": None, "data": []}
+                        continue
+
+                    # Otherwise, parse the line
+                    if line.startswith("event:"):
+                        sse_event_block["event"] = line[
+                            len("event:") :
+                        ].lstrip()
+                    elif line.startswith("data:"):
+                        # Accumulate the exact substring after "data:"
+                        # Notice we do *not* strip() the entire line
+                        chunk = line[len("data:") :]
+                        sse_event_block["data"].append(chunk)
+                    # Optionally handle id:, retry:, etc. if needed
+
+                # If something remains in the buffer at the end
+                if sse_event_block["data"]:
+                    data_str = "".join(sse_event_block["data"])
+                    yield {
+                        "event": sse_event_block["event"] or "unknown",
+                        "data": data_str,
+                    }
+
+    def _handle_response(self, response: Response) -> None:
+        if response.status_code >= 400:
+            try:
+                error_content = response.json()
+                if isinstance(error_content, dict):
+                    message = (
+                        error_content.get("detail", {}).get(
+                            "message", str(error_content)
+                        )
+                        if isinstance(error_content.get("detail"), dict)
+                        else error_content.get("detail", str(error_content))
+                    )
+                else:
+                    message = str(error_content)
+            except json.JSONDecodeError:
+                message = response.text
+            except Exception as e:
+                message = str(e)
+
+            raise R2RException(
+                status_code=response.status_code, message=message
+            )
+
+    def set_api_key(self, api_key: str) -> None:
+        if self.access_token:
+            raise ValueError("Cannot have both access token and api key.")
+        self.api_key = api_key
+
+    def unset_api_key(self) -> None:
+        self.api_key = None
+
+    def set_base_url(self, base_url: str) -> None:
+        self.base_url = base_url