aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/sdk/async_client.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sdk/async_client.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sdk/async_client.py135
1 files changed, 135 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sdk/async_client.py b/.venv/lib/python3.12/site-packages/sdk/async_client.py
new file mode 100644
index 00000000..ada14f5a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sdk/async_client.py
@@ -0,0 +1,135 @@
+import json
+from io import BytesIO
+from typing import Any, AsyncGenerator
+
+import httpx
+
+from shared.abstractions import R2RException
+
+from .asnyc_methods import (
+ ChunksSDK,
+ CollectionsSDK,
+ ConversationsSDK,
+ DocumentsSDK,
+ GraphsSDK,
+ IndicesSDK,
+ PromptsSDK,
+ RetrievalSDK,
+ SystemSDK,
+ UsersSDK,
+)
+from .base.base_client import BaseClient
+
+
+class R2RAsyncClient(BaseClient):
+ """Asynchronous client for interacting with the R2R API."""
+
+ def __init__(
+ self,
+ base_url: str | None = None,
+ timeout: float = 300.0,
+ custom_client=None,
+ ):
+ super().__init__(base_url, timeout)
+ self.client = custom_client or httpx.AsyncClient(timeout=timeout)
+ self.chunks = ChunksSDK(self)
+ self.collections = CollectionsSDK(self)
+ self.conversations = ConversationsSDK(self)
+ self.documents = DocumentsSDK(self)
+ self.graphs = GraphsSDK(self)
+ self.indices = IndicesSDK(self)
+ self.prompts = PromptsSDK(self)
+ self.retrieval = RetrievalSDK(self)
+ self.system = SystemSDK(self)
+ self.users = UsersSDK(self)
+
+ async def _make_request(
+ self, method: str, endpoint: str, version: str = "v3", **kwargs
+ ):
+ url = self._get_full_url(endpoint, version)
+ if (
+ "https://api.sciphi.ai" in url
+ and ("login" not in endpoint)
+ and ("create" not in endpoint)
+ and ("users" not in endpoint)
+ and ("health" not in endpoint)
+ and (not self.access_token and not self.api_key)
+ ):
+ raise R2RException(
+ status_code=401,
+ message="Access token or api key is required to access `https://api.sciphi.ai`. To change the base url, use `set_base_url` method or set the local environment variable `R2R_API_BASE` to `http://localhost:7272`.",
+ )
+ request_args = self._prepare_request_args(endpoint, **kwargs)
+
+ try:
+ response = await self.client.request(method, url, **request_args)
+ await self._handle_response(response)
+ if "application/json" in response.headers.get("Content-Type", ""):
+ return response.json() if response.content else None
+ else:
+ return BytesIO(response.content)
+
+ except httpx.RequestError as e:
+ raise R2RException(
+ status_code=500,
+ message=f"Request failed: {str(e)}",
+ ) from e
+
+ async def _make_streaming_request(
+ self, method: str, endpoint: str, version: str = "v3", **kwargs
+ ) -> AsyncGenerator[Any, None]:
+ url = self._get_full_url(endpoint, version)
+ request_args = self._prepare_request_args(endpoint, **kwargs)
+
+ async with httpx.AsyncClient(timeout=self.timeout) as client:
+ async with client.stream(method, url, **request_args) as response:
+ await self._handle_response(response)
+ async for line in response.aiter_lines():
+ if line.strip(): # Ignore empty lines
+ try:
+ yield json.loads(line)
+ except Exception:
+ yield line
+
+ async def _handle_response(self, response):
+ if response.status_code >= 400:
+ try:
+ error_content = response.json()
+ if isinstance(error_content, dict):
+ message = (
+ error_content.get("detail", {}).get(
+ "message", str(error_content)
+ )
+ if isinstance(error_content.get("detail"), dict)
+ else error_content.get("detail", str(error_content))
+ )
+ else:
+ message = str(error_content)
+ except json.JSONDecodeError:
+ message = response.text
+ except Exception as e:
+ message = str(e)
+
+ raise R2RException(
+ status_code=response.status_code, message=message
+ )
+
+ async def close(self):
+ await self.client.aclose()
+
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ await self.close()
+
+ def set_api_key(self, api_key: str) -> None:
+ if self.access_token:
+ raise ValueError("Cannot have both access token and api key.")
+ self.api_key = api_key
+
+ def unset_api_key(self) -> None:
+ self.api_key = None
+
+ def set_base_url(self, base_url: str) -> None:
+ self.base_url = base_url