diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/sdk/async_client.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/sdk/async_client.py | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sdk/async_client.py b/.venv/lib/python3.12/site-packages/sdk/async_client.py new file mode 100644 index 00000000..ada14f5a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/sdk/async_client.py @@ -0,0 +1,135 @@ +import json +from io import BytesIO +from typing import Any, AsyncGenerator + +import httpx + +from shared.abstractions import R2RException + +from .asnyc_methods import ( + ChunksSDK, + CollectionsSDK, + ConversationsSDK, + DocumentsSDK, + GraphsSDK, + IndicesSDK, + PromptsSDK, + RetrievalSDK, + SystemSDK, + UsersSDK, +) +from .base.base_client import BaseClient + + +class R2RAsyncClient(BaseClient): + """Asynchronous client for interacting with the R2R API.""" + + def __init__( + self, + base_url: str | None = None, + timeout: float = 300.0, + custom_client=None, + ): + super().__init__(base_url, timeout) + self.client = custom_client or httpx.AsyncClient(timeout=timeout) + self.chunks = ChunksSDK(self) + self.collections = CollectionsSDK(self) + self.conversations = ConversationsSDK(self) + self.documents = DocumentsSDK(self) + self.graphs = GraphsSDK(self) + self.indices = IndicesSDK(self) + self.prompts = PromptsSDK(self) + self.retrieval = RetrievalSDK(self) + self.system = SystemSDK(self) + self.users = UsersSDK(self) + + async def _make_request( + self, method: str, endpoint: str, version: str = "v3", **kwargs + ): + url = self._get_full_url(endpoint, version) + if ( + "https://api.sciphi.ai" in url + and ("login" not in endpoint) + and ("create" not in endpoint) + and ("users" not in endpoint) + and ("health" not in endpoint) + and (not self.access_token and not self.api_key) + ): + raise R2RException( + status_code=401, + message="Access token or api key is required to access `https://api.sciphi.ai`. To change the base url, use `set_base_url` method or set the local environment variable `R2R_API_BASE` to `http://localhost:7272`.", + ) + request_args = self._prepare_request_args(endpoint, **kwargs) + + try: + response = await self.client.request(method, url, **request_args) + await self._handle_response(response) + if "application/json" in response.headers.get("Content-Type", ""): + return response.json() if response.content else None + else: + return BytesIO(response.content) + + except httpx.RequestError as e: + raise R2RException( + status_code=500, + message=f"Request failed: {str(e)}", + ) from e + + async def _make_streaming_request( + self, method: str, endpoint: str, version: str = "v3", **kwargs + ) -> AsyncGenerator[Any, None]: + url = self._get_full_url(endpoint, version) + request_args = self._prepare_request_args(endpoint, **kwargs) + + async with httpx.AsyncClient(timeout=self.timeout) as client: + async with client.stream(method, url, **request_args) as response: + await self._handle_response(response) + async for line in response.aiter_lines(): + if line.strip(): # Ignore empty lines + try: + yield json.loads(line) + except Exception: + yield line + + async def _handle_response(self, response): + if response.status_code >= 400: + try: + error_content = response.json() + if isinstance(error_content, dict): + message = ( + error_content.get("detail", {}).get( + "message", str(error_content) + ) + if isinstance(error_content.get("detail"), dict) + else error_content.get("detail", str(error_content)) + ) + else: + message = str(error_content) + except json.JSONDecodeError: + message = response.text + except Exception as e: + message = str(e) + + raise R2RException( + status_code=response.status_code, message=message + ) + + async def close(self): + await self.client.aclose() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + def set_api_key(self, api_key: str) -> None: + if self.access_token: + raise ValueError("Cannot have both access token and api key.") + self.api_key = api_key + + def unset_api_key(self) -> None: + self.api_key = None + + def set_base_url(self, base_url: str) -> None: + self.base_url = base_url |