import json
from io import BytesIO
from typing import Any, Generator
from httpx import Client, RequestError, Response
from shared.abstractions import R2RException
from .base.base_client import BaseClient
from .sync_methods import (
ChunksSDK,
CollectionsSDK,
ConversationsSDK,
DocumentsSDK,
GraphsSDK,
IndicesSDK,
PromptsSDK,
RetrievalSDK,
SystemSDK,
UsersSDK,
)
class R2RClient(BaseClient):
def __init__(
self,
base_url: str | None = None,
timeout: float = 300.0,
custom_client=None,
):
super().__init__(base_url, timeout)
self.client = custom_client or Client(timeout=timeout)
self.chunks = ChunksSDK(self)
self.collections = CollectionsSDK(self)
self.conversations = ConversationsSDK(self)
self.documents = DocumentsSDK(self)
self.graphs = GraphsSDK(self)
self.indices = IndicesSDK(self)
self.prompts = PromptsSDK(self)
self.retrieval = RetrievalSDK(self)
self.system = SystemSDK(self)
self.users = UsersSDK(self)
def _make_request(
self, method: str, endpoint: str, version: str = "v3", **kwargs
) -> dict[str, Any] | BytesIO | None:
url = self._get_full_url(endpoint, version)
if (
"https://api.sciphi.ai" in url
and ("login" not in endpoint)
and ("create" not in endpoint)
and ("users" not in endpoint)
and ("health" not in endpoint)
and (not self.access_token and not self.api_key)
):
raise R2RException(
status_code=401,
message="Access token or api key is required to access `https://api.sciphi.ai`. To change the base url, use `set_base_url` method or set the local environment variable `R2R_API_BASE` to `http://localhost:7272`.",
)
request_args = self._prepare_request_args(endpoint, **kwargs)
try:
response = self.client.request(method, url, **request_args)
self._handle_response(response)
if "application/json" in response.headers.get("Content-Type", ""):
return response.json() if response.content else None
else:
return BytesIO(response.content)
except RequestError as e:
raise R2RException(
status_code=500,
message=f"Request failed: {str(e)}",
) from e
def _make_streaming_request(
self, method: str, endpoint: str, version: str = "v3", **kwargs
) -> Generator[dict[str, str], None, None]:
"""
Make a streaming request, parsing Server-Sent Events (SSE) in multiline form.
Yields a dictionary with keys:
- "event": the event type (or "unknown" if not provided)
- "data": the JSON string (possibly spanning multiple lines) accumulated from the event's data lines
"""
url = self._get_full_url(endpoint, version)
request_args = self._prepare_request_args(endpoint, **kwargs)
with Client(timeout=self.timeout) as client:
with client.stream(method, url, **request_args) as response:
self._handle_response(response)
sse_event_block: dict[str, Any] = {"event": None, "data": []}
for line in response.iter_lines():
if isinstance(line, bytes):
line = line.decode("utf-8", "replace")
# Blank line -> end of this SSE event
if line == "":
# If there's any accumulated data, yield this event
if sse_event_block["data"]:
data_str = "".join(sse_event_block["data"])
yield {
"event": sse_event_block["event"] or "unknown",
"data": data_str,
}
# Reset the block
sse_event_block = {"event": None, "data": []}
continue
# Otherwise, parse the line
if line.startswith("event:"):
sse_event_block["event"] = line[
len("event:") :
].lstrip()
elif line.startswith("data:"):
# Accumulate the exact substring after "data:"
# Notice we do *not* strip() the entire line
chunk = line[len("data:") :]
sse_event_block["data"].append(chunk)
# Optionally handle id:, retry:, etc. if needed
# If something remains in the buffer at the end
if sse_event_block["data"]:
data_str = "".join(sse_event_block["data"])
yield {
"event": sse_event_block["event"] or "unknown",
"data": data_str,
}
def _handle_response(self, response: Response) -> None:
if response.status_code >= 400:
try:
error_content = response.json()
if isinstance(error_content, dict):
message = (
error_content.get("detail", {}).get(
"message", str(error_content)
)
if isinstance(error_content.get("detail"), dict)
else error_content.get("detail", str(error_content))
)
else:
message = str(error_content)
except json.JSONDecodeError:
message = response.text
except Exception as e:
message = str(e)
raise R2RException(
status_code=response.status_code, message=message
)
def set_api_key(self, api_key: str) -> None:
if self.access_token:
raise ValueError("Cannot have both access token and api key.")
self.api_key = api_key
def unset_api_key(self) -> None:
self.api_key = None
def set_base_url(self, base_url: str) -> None:
self.base_url = base_url