diff options
| author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
|---|---|---|
| committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
| commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
| tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/main/api | |
| parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
| download | gn-ai-master.tar.gz | |
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/main/api')
12 files changed, 11484 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/main/api/v3/base_router.py b/.venv/lib/python3.12/site-packages/core/main/api/v3/base_router.py new file mode 100644 index 00000000..ef432420 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/main/api/v3/base_router.py @@ -0,0 +1,151 @@ +import functools +import logging +from abc import abstractmethod +from typing import Callable + +from fastapi import APIRouter, Depends, HTTPException, Request +from fastapi.responses import FileResponse, StreamingResponse + +from core.base import R2RException + +from ...abstractions import R2RProviders, R2RServices +from ...config import R2RConfig + +logger = logging.getLogger() + + +class BaseRouterV3: + def __init__( + self, providers: R2RProviders, services: R2RServices, config: R2RConfig + ): + """ + :param providers: Typically includes auth, database, etc. + :param services: Additional service references (ingestion, etc). + """ + self.providers = providers + self.services = services + self.config = config + self.router = APIRouter() + self.openapi_extras = self._load_openapi_extras() + + # Add the rate-limiting dependency + self.set_rate_limiting() + + # Initialize any routes + self._setup_routes() + self._register_workflows() + + def get_router(self): + return self.router + + def base_endpoint(self, func: Callable): + """ + A decorator to wrap endpoints in a standard pattern: + - error handling + - response shaping + """ + + @functools.wraps(func) + async def wrapper(*args, **kwargs): + try: + func_result = await func(*args, **kwargs) + if isinstance(func_result, tuple) and len(func_result) == 2: + results, outer_kwargs = func_result + else: + results, outer_kwargs = func_result, {} + + if isinstance(results, (StreamingResponse, FileResponse)): + return results + return {"results": results, **outer_kwargs} + + except R2RException: + raise + except Exception as e: + logger.error( + f"Error in base endpoint {func.__name__}() - {str(e)}", + exc_info=True, + ) + raise HTTPException( + status_code=500, + detail={ + "message": f"An error '{e}' occurred during {func.__name__}", + "error": str(e), + "error_type": type(e).__name__, + }, + ) from e + + wrapper._is_base_endpoint = True # type: ignore + return wrapper + + @classmethod + def build_router(cls, engine): + """Class method for building a router instance (if you have a standard + pattern).""" + return cls(engine).router + + def _register_workflows(self): + pass + + def _load_openapi_extras(self): + return {} + + @abstractmethod + def _setup_routes(self): + """Subclasses override this to define actual endpoints.""" + pass + + def set_rate_limiting(self): + """Adds a yield-based dependency for rate limiting each request. + + Checks the limits, then logs the request if the check passes. + """ + + async def rate_limit_dependency( + request: Request, + auth_user=Depends(self.providers.auth.auth_wrapper()), + ): + """1) Fetch the user from the DB (including .limits_overrides). + + 2) Pass it to limits_handler.check_limits. 3) After the endpoint + completes, call limits_handler.log_request. + """ + # If the user is superuser, skip checks + if auth_user.is_superuser: + yield + return + + user_id = auth_user.id + route = request.scope["path"] + + # 1) Fetch the user from DB + user = await self.providers.database.users_handler.get_user_by_id( + user_id + ) + if not user: + raise HTTPException(status_code=404, detail="User not found.") + + # 2) Rate-limit check + try: + await self.providers.database.limits_handler.check_limits( + user=user, + route=route, # Pass the User object + ) + except ValueError as e: + # If check_limits raises ValueError -> 429 Too Many Requests + raise HTTPException(status_code=429, detail=str(e)) from e + + request.state.user_id = user_id + request.state.route = route + + # 3) Execute the route + try: + yield + finally: + # 4) Log only POST and DELETE requests + if request.method in ["POST", "DELETE"]: + await self.providers.database.limits_handler.log_request( + user_id, route + ) + + # Attach the dependencies so you can use them in your endpoints + self.rate_limit_dependency = rate_limit_dependency diff --git a/.venv/lib/python3.12/site-packages/core/main/api/v3/chunks_router.py b/.venv/lib/python3.12/site-packages/core/main/api/v3/chunks_router.py new file mode 100644 index 00000000..ab0a62cb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/main/api/v3/chunks_router.py @@ -0,0 +1,422 @@ +import json +import logging +import textwrap +from typing import Optional +from uuid import UUID + +from fastapi import Body, Depends, Path, Query + +from core.base import ( + ChunkResponse, + GraphSearchSettings, + R2RException, + SearchSettings, + UpdateChunk, + select_search_filters, +) +from core.base.api.models import ( + GenericBooleanResponse, + WrappedBooleanResponse, + WrappedChunkResponse, + WrappedChunksResponse, + WrappedVectorSearchResponse, +) + +from ...abstractions import R2RProviders, R2RServices +from ...config import R2RConfig +from .base_router import BaseRouterV3 + +logger = logging.getLogger() + +MAX_CHUNKS_PER_REQUEST = 1024 * 100 + + +class ChunksRouter(BaseRouterV3): + def __init__( + self, providers: R2RProviders, services: R2RServices, config: R2RConfig + ): + logging.info("Initializing ChunksRouter") + super().__init__(providers, services, config) + + def _setup_routes(self): + @self.router.post( + "/chunks/search", + summary="Search Chunks", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + response = client.chunks.search( + query="search query", + search_settings={ + "limit": 10 + } + ) + """), + } + ] + }, + ) + @self.base_endpoint + async def search_chunks( + query: str = Body(...), + search_settings: SearchSettings = Body( + default_factory=SearchSettings, + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedVectorSearchResponse: # type: ignore + # TODO - Deduplicate this code by sharing the code on the retrieval router + """Perform a semantic search query over all stored chunks. + + This endpoint allows for complex filtering of search results using PostgreSQL-based queries. + Filters can be applied to various fields such as document_id, and internal metadata values. + + Allowed operators include `eq`, `neq`, `gt`, `gte`, `lt`, `lte`, `like`, `ilike`, `in`, and `nin`. + """ + + search_settings.filters = select_search_filters( + auth_user, search_settings + ) + + search_settings.graph_settings = GraphSearchSettings(enabled=False) + + results = await self.services.retrieval.search( + query=query, + search_settings=search_settings, + ) + return results.chunk_search_results # type: ignore + + @self.router.get( + "/chunks/{id}", + summary="Retrieve Chunk", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + response = client.chunks.retrieve( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.chunks.retrieve({ + id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def retrieve_chunk( + id: UUID = Path(...), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedChunkResponse: + """Get a specific chunk by its ID. + + Returns the chunk's content, metadata, and associated + document/collection information. Users can only retrieve chunks + they own or have access to through collections. + """ + chunk = await self.services.ingestion.get_chunk(id) + if not chunk: + raise R2RException("Chunk not found", 404) + + # TODO - Add collection ID check + if not auth_user.is_superuser and str(auth_user.id) != str( + chunk["owner_id"] + ): + raise R2RException("Not authorized to access this chunk", 403) + + return ChunkResponse( # type: ignore + id=chunk["id"], + document_id=chunk["document_id"], + owner_id=chunk["owner_id"], + collection_ids=chunk["collection_ids"], + text=chunk["text"], + metadata=chunk["metadata"], + # vector = chunk["vector"] # TODO - Add include vector flag + ) + + @self.router.post( + "/chunks/{id}", + summary="Update Chunk", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + response = client.chunks.update( + { + "id": "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + "text": "Updated content", + "metadata": {"key": "new value"} + } + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.chunks.update({ + id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + text: "Updated content", + metadata: {key: "new value"} + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def update_chunk( + id: UUID = Path(...), + chunk_update: UpdateChunk = Body(...), + # TODO: Run with orchestration? + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedChunkResponse: + """Update an existing chunk's content and/or metadata. + + The chunk's vectors will be automatically recomputed based on the + new content. Users can only update chunks they own unless they are + superusers. + """ + # Get the existing chunk to get its chunk_id + existing_chunk = await self.services.ingestion.get_chunk( + chunk_update.id + ) + if existing_chunk is None: + raise R2RException(f"Chunk {chunk_update.id} not found", 404) + + workflow_input = { + "document_id": str(existing_chunk["document_id"]), + "id": str(chunk_update.id), + "text": chunk_update.text, + "metadata": chunk_update.metadata + or existing_chunk["metadata"], + "user": auth_user.model_dump_json(), + } + + logger.info("Running chunk ingestion without orchestration.") + from core.main.orchestration import simple_ingestion_factory + + # TODO - CLEAN THIS UP + + simple_ingestor = simple_ingestion_factory(self.services.ingestion) + await simple_ingestor["update-chunk"](workflow_input) + + return ChunkResponse( # type: ignore + id=chunk_update.id, + document_id=existing_chunk["document_id"], + owner_id=existing_chunk["owner_id"], + collection_ids=existing_chunk["collection_ids"], + text=chunk_update.text, + metadata=chunk_update.metadata or existing_chunk["metadata"], + # vector = existing_chunk.get('vector') + ) + + @self.router.delete( + "/chunks/{id}", + summary="Delete Chunk", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + response = client.chunks.delete( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.chunks.delete({ + id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def delete_chunk( + id: UUID = Path(...), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Delete a specific chunk by ID. + + This permanently removes the chunk and its associated vector + embeddings. The parent document remains unchanged. Users can only + delete chunks they own unless they are superusers. + """ + # Get the existing chunk to get its chunk_id + existing_chunk = await self.services.ingestion.get_chunk(id) + + if existing_chunk is None: + raise R2RException( + message=f"Chunk {id} not found", status_code=404 + ) + + filters = { + "$and": [ + {"owner_id": {"$eq": str(auth_user.id)}}, + {"chunk_id": {"$eq": str(id)}}, + ] + } + await ( + self.services.management.delete_documents_and_chunks_by_filter( + filters=filters + ) + ) + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.get( + "/chunks", + dependencies=[Depends(self.rate_limit_dependency)], + summary="List Chunks", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + response = client.chunks.list( + metadata_filter={"key": "value"}, + include_vectors=False, + offset=0, + limit=10, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.chunks.list({ + metadataFilter: {key: "value"}, + includeVectors: false, + offset: 0, + limit: 10, + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def list_chunks( + metadata_filter: Optional[str] = Query( + None, description="Filter by metadata" + ), + include_vectors: bool = Query( + False, description="Include vector data in response" + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedChunksResponse: + """List chunks with pagination support. + + Returns a paginated list of chunks that the user has access to. + Results can be filtered and sorted based on various parameters. + Vector embeddings are only included if specifically requested. + + Regular users can only list chunks they own or have access to + through collections. Superusers can list all chunks in the system. + """ # Build filters + filters = {} + + # Add user access control filter + if not auth_user.is_superuser: + filters["owner_id"] = {"$eq": str(auth_user.id)} + + # Add metadata filters if provided + if metadata_filter: + metadata_filter = json.loads(metadata_filter) + + # Get chunks using the vector handler's list_chunks method + results = await self.services.ingestion.list_chunks( + filters=filters, + include_vectors=include_vectors, + offset=offset, + limit=limit, + ) + + # Convert to response format + chunks = [ + ChunkResponse( + id=chunk["id"], + document_id=chunk["document_id"], + owner_id=chunk["owner_id"], + collection_ids=chunk["collection_ids"], + text=chunk["text"], + metadata=chunk["metadata"], + vector=chunk.get("vector") if include_vectors else None, + ) + for chunk in results["results"] + ] + + return (chunks, {"total_entries": results["total_entries"]}) # type: ignore diff --git a/.venv/lib/python3.12/site-packages/core/main/api/v3/collections_router.py b/.venv/lib/python3.12/site-packages/core/main/api/v3/collections_router.py new file mode 100644 index 00000000..462f5ca3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/main/api/v3/collections_router.py @@ -0,0 +1,1207 @@ +import logging +import textwrap +from enum import Enum +from typing import Optional +from uuid import UUID + +from fastapi import Body, Depends, Path, Query +from fastapi.background import BackgroundTasks +from fastapi.responses import FileResponse + +from core.base import R2RException +from core.base.abstractions import GraphCreationSettings +from core.base.api.models import ( + GenericBooleanResponse, + WrappedBooleanResponse, + WrappedCollectionResponse, + WrappedCollectionsResponse, + WrappedDocumentsResponse, + WrappedGenericMessageResponse, + WrappedUsersResponse, +) +from core.utils import ( + generate_default_user_collection_id, + update_settings_from_dict, +) + +from ...abstractions import R2RProviders, R2RServices +from ...config import R2RConfig +from .base_router import BaseRouterV3 + +logger = logging.getLogger() + + +class CollectionAction(str, Enum): + VIEW = "view" + EDIT = "edit" + DELETE = "delete" + MANAGE_USERS = "manage_users" + ADD_DOCUMENT = "add_document" + REMOVE_DOCUMENT = "remove_document" + + +async def authorize_collection_action( + auth_user, collection_id: UUID, action: CollectionAction, services +) -> bool: + """Authorize a user's action on a given collection based on: + + - If user is superuser (admin): Full access. + - If user is owner of the collection: Full access. + - If user is a member of the collection (in `collection_ids`): VIEW only. + - Otherwise: No access. + """ + + # Superusers have complete access + if auth_user.is_superuser: + return True + + # Fetch collection details: owner_id and members + results = ( + await services.management.collections_overview( + 0, 1, collection_ids=[collection_id] + ) + )["results"] + if len(results) == 0: + raise R2RException("The specified collection does not exist.", 404) + details = results[0] + owner_id = details.owner_id + + # Check if user is owner + if auth_user.id == owner_id: + # Owner can do all actions + return True + + # Check if user is a member (non-owner) + if collection_id in auth_user.collection_ids: + # Members can only view + if action == CollectionAction.VIEW: + return True + else: + raise R2RException( + "Insufficient permissions for this action.", 403 + ) + + # User is neither owner nor member + raise R2RException("You do not have access to this collection.", 403) + + +class CollectionsRouter(BaseRouterV3): + def __init__( + self, providers: R2RProviders, services: R2RServices, config: R2RConfig + ): + logging.info("Initializing CollectionsRouter") + super().__init__(providers, services, config) + + def _setup_routes(self): + @self.router.post( + "/collections", + summary="Create a new collection", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.collections.create( + name="My New Collection", + description="This is a sample collection" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.collections.create({ + name: "My New Collection", + description: "This is a sample collection" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/collections" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{"name": "My New Collection", "description": "This is a sample collection"}' + """), + }, + ] + }, + ) + @self.base_endpoint + async def create_collection( + name: str = Body(..., description="The name of the collection"), + description: Optional[str] = Body( + None, description="An optional description of the collection" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedCollectionResponse: + """Create a new collection and automatically add the creating user + to it. + + This endpoint allows authenticated users to create a new collection + with a specified name and optional description. The user creating + the collection is automatically added as a member. + """ + user_collections_count = ( + await self.services.management.collections_overview( + user_ids=[auth_user.id], limit=1, offset=0 + ) + )["total_entries"] + user_max_collections = ( + await self.services.management.get_user_max_collections( + auth_user.id + ) + ) + if (user_collections_count + 1) >= user_max_collections: # type: ignore + raise R2RException( + f"User has reached the maximum number of collections allowed ({user_max_collections}).", + 400, + ) + collection = await self.services.management.create_collection( + owner_id=auth_user.id, + name=name, + description=description, + ) + # Add the creating user to the collection + await self.services.management.add_user_to_collection( + auth_user.id, collection.id + ) + return collection # type: ignore + + @self.router.post( + "/collections/export", + summary="Export collections to CSV", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient("http://localhost:7272") + # when using auth, do client.login(...) + + response = client.collections.export( + output_path="export.csv", + columns=["id", "name", "created_at"], + include_header=True, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient("http://localhost:7272"); + + function main() { + await client.collections.export({ + outputPath: "export.csv", + columns: ["id", "name", "created_at"], + includeHeader: true, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "http://127.0.0.1:7272/v3/collections/export" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -H "Content-Type: application/json" \ + -H "Accept: text/csv" \ + -d '{ "columns": ["id", "name", "created_at"], "include_header": true }' \ + --output export.csv + """), + }, + ] + }, + ) + @self.base_endpoint + async def export_collections( + background_tasks: BackgroundTasks, + columns: Optional[list[str]] = Body( + None, description="Specific columns to export" + ), + filters: Optional[dict] = Body( + None, description="Filters to apply to the export" + ), + include_header: Optional[bool] = Body( + True, description="Whether to include column headers" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> FileResponse: + """Export collections as a CSV file.""" + + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can export data.", + 403, + ) + + ( + csv_file_path, + temp_file, + ) = await self.services.management.export_collections( + columns=columns, + filters=filters, + include_header=include_header + if include_header is not None + else True, + ) + + background_tasks.add_task(temp_file.close) + + return FileResponse( + path=csv_file_path, + media_type="text/csv", + filename="collections_export.csv", + ) + + @self.router.get( + "/collections", + summary="List collections", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.collections.list( + offset=0, + limit=10, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.collections.list(); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/collections?offset=0&limit=10&name=Sample" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def list_collections( + ids: list[str] = Query( + [], + description="A list of collection IDs to retrieve. If not provided, all collections will be returned.", + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedCollectionsResponse: + """Returns a paginated list of collections the authenticated user + has access to. + + Results can be filtered by providing specific collection IDs. + Regular users will only see collections they own or have access to. + Superusers can see all collections. + + The collections are returned in order of last modification, with + most recent first. + """ + requesting_user_id = ( + None if auth_user.is_superuser else [auth_user.id] + ) + + collection_uuids = [UUID(collection_id) for collection_id in ids] + + collections_overview_response = ( + await self.services.management.collections_overview( + user_ids=requesting_user_id, + collection_ids=collection_uuids, + offset=offset, + limit=limit, + ) + ) + + return ( # type: ignore + collections_overview_response["results"], + { + "total_entries": collections_overview_response[ + "total_entries" + ] + }, + ) + + @self.router.get( + "/collections/{id}", + summary="Get collection details", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.collections.retrieve("123e4567-e89b-12d3-a456-426614174000") + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.collections.retrieve({id: "123e4567-e89b-12d3-a456-426614174000"}); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/collections/123e4567-e89b-12d3-a456-426614174000" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_collection( + id: UUID = Path( + ..., description="The unique identifier of the collection" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedCollectionResponse: + """Get details of a specific collection. + + This endpoint retrieves detailed information about a single + collection identified by its UUID. The user must have access to the + collection to view its details. + """ + await authorize_collection_action( + auth_user, id, CollectionAction.VIEW, self.services + ) + + collections_overview_response = ( + await self.services.management.collections_overview( + user_ids=None, + collection_ids=[id], + offset=0, + limit=1, + ) + ) + overview = collections_overview_response["results"] + + if len(overview) == 0: # type: ignore + raise R2RException( + "The specified collection does not exist.", + 404, + ) + return overview[0] # type: ignore + + @self.router.post( + "/collections/{id}", + summary="Update collection", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.collections.update( + "123e4567-e89b-12d3-a456-426614174000", + name="Updated Collection Name", + description="Updated description" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.collections.update({ + id: "123e4567-e89b-12d3-a456-426614174000", + name: "Updated Collection Name", + description: "Updated description" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/collections/123e4567-e89b-12d3-a456-426614174000" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{"name": "Updated Collection Name", "description": "Updated description"}' + """), + }, + ] + }, + ) + @self.base_endpoint + async def update_collection( + id: UUID = Path( + ..., + description="The unique identifier of the collection to update", + ), + name: Optional[str] = Body( + None, description="The name of the collection" + ), + description: Optional[str] = Body( + None, description="An optional description of the collection" + ), + generate_description: Optional[bool] = Body( + False, + description="Whether to generate a new synthetic description for the collection", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedCollectionResponse: + """Update an existing collection's configuration. + + This endpoint allows updating the name and description of an + existing collection. The user must have appropriate permissions to + modify the collection. + """ + await authorize_collection_action( + auth_user, id, CollectionAction.EDIT, self.services + ) + + if generate_description and description is not None: + raise R2RException( + "Cannot provide both a description and request to synthetically generate a new one.", + 400, + ) + + return await self.services.management.update_collection( # type: ignore + id, + name=name, + description=description, + generate_description=generate_description or False, + ) + + @self.router.delete( + "/collections/{id}", + summary="Delete collection", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.collections.delete("123e4567-e89b-12d3-a456-426614174000") + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.collections.delete({id: "123e4567-e89b-12d3-a456-426614174000"}); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X DELETE "https://api.example.com/v3/collections/123e4567-e89b-12d3-a456-426614174000" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def delete_collection( + id: UUID = Path( + ..., + description="The unique identifier of the collection to delete", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Delete an existing collection. + + This endpoint allows deletion of a collection identified by its + UUID. The user must have appropriate permissions to delete the + collection. Deleting a collection removes all associations but does + not delete the documents within it. + """ + if id == generate_default_user_collection_id(auth_user.id): + raise R2RException( + "Cannot delete the default user collection.", + 400, + ) + await authorize_collection_action( + auth_user, id, CollectionAction.DELETE, self.services + ) + + await self.services.management.delete_collection(collection_id=id) + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.post( + "/collections/{id}/documents/{document_id}", + summary="Add document to collection", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.collections.add_document( + "123e4567-e89b-12d3-a456-426614174000", + "456e789a-b12c-34d5-e678-901234567890" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.collections.addDocument({ + id: "123e4567-e89b-12d3-a456-426614174000" + documentId: "456e789a-b12c-34d5-e678-901234567890" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/collections/123e4567-e89b-12d3-a456-426614174000/documents/456e789a-b12c-34d5-e678-901234567890" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def add_document_to_collection( + id: UUID = Path(...), + document_id: UUID = Path(...), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGenericMessageResponse: + """Add a document to a collection.""" + await authorize_collection_action( + auth_user, id, CollectionAction.ADD_DOCUMENT, self.services + ) + + return ( + await self.services.management.assign_document_to_collection( + document_id, id + ) + ) + + @self.router.get( + "/collections/{id}/documents", + summary="List documents in collection", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.collections.list_documents( + "123e4567-e89b-12d3-a456-426614174000", + offset=0, + limit=10, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.collections.listDocuments({id: "123e4567-e89b-12d3-a456-426614174000"}); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/collections/123e4567-e89b-12d3-a456-426614174000/documents?offset=0&limit=10" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_collection_documents( + id: UUID = Path( + ..., description="The unique identifier of the collection" + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedDocumentsResponse: + """Get all documents in a collection with pagination and sorting + options. + + This endpoint retrieves a paginated list of documents associated + with a specific collection. It supports sorting options to + customize the order of returned documents. + """ + await authorize_collection_action( + auth_user, id, CollectionAction.VIEW, self.services + ) + + documents_in_collection_response = ( + await self.services.management.documents_in_collection( + id, offset, limit + ) + ) + + return documents_in_collection_response["results"], { # type: ignore + "total_entries": documents_in_collection_response[ + "total_entries" + ] + } + + @self.router.delete( + "/collections/{id}/documents/{document_id}", + summary="Remove document from collection", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.collections.remove_document( + "123e4567-e89b-12d3-a456-426614174000", + "456e789a-b12c-34d5-e678-901234567890" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.collections.removeDocument({ + id: "123e4567-e89b-12d3-a456-426614174000" + documentId: "456e789a-b12c-34d5-e678-901234567890" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X DELETE "https://api.example.com/v3/collections/123e4567-e89b-12d3-a456-426614174000/documents/456e789a-b12c-34d5-e678-901234567890" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def remove_document_from_collection( + id: UUID = Path( + ..., description="The unique identifier of the collection" + ), + document_id: UUID = Path( + ..., + description="The unique identifier of the document to remove", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Remove a document from a collection. + + This endpoint removes the association between a document and a + collection. It does not delete the document itself. The user must + have permissions to modify the collection. + """ + await authorize_collection_action( + auth_user, id, CollectionAction.REMOVE_DOCUMENT, self.services + ) + await self.services.management.remove_document_from_collection( + document_id, id + ) + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.get( + "/collections/{id}/users", + summary="List users in collection", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.collections.list_users( + "123e4567-e89b-12d3-a456-426614174000", + offset=0, + limit=10, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.collections.listUsers({ + id: "123e4567-e89b-12d3-a456-426614174000" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/collections/123e4567-e89b-12d3-a456-426614174000/users?offset=0&limit=10" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_collection_users( + id: UUID = Path( + ..., description="The unique identifier of the collection" + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedUsersResponse: + """Get all users in a collection with pagination and sorting + options. + + This endpoint retrieves a paginated list of users who have access + to a specific collection. It supports sorting options to customize + the order of returned users. + """ + await authorize_collection_action( + auth_user, id, CollectionAction.VIEW, self.services + ) + + users_in_collection_response = ( + await self.services.management.get_users_in_collection( + collection_id=id, + offset=offset, + limit=min(max(limit, 1), 1000), + ) + ) + + return users_in_collection_response["results"], { # type: ignore + "total_entries": users_in_collection_response["total_entries"] + } + + @self.router.post( + "/collections/{id}/users/{user_id}", + summary="Add user to collection", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.collections.add_user( + "123e4567-e89b-12d3-a456-426614174000", + "789a012b-c34d-5e6f-g789-012345678901" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.collections.addUser({ + id: "123e4567-e89b-12d3-a456-426614174000" + userId: "789a012b-c34d-5e6f-g789-012345678901" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/collections/123e4567-e89b-12d3-a456-426614174000/users/789a012b-c34d-5e6f-g789-012345678901" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def add_user_to_collection( + id: UUID = Path( + ..., description="The unique identifier of the collection" + ), + user_id: UUID = Path( + ..., description="The unique identifier of the user to add" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Add a user to a collection. + + This endpoint grants a user access to a specific collection. The + authenticated user must have admin permissions for the collection + to add new users. + """ + await authorize_collection_action( + auth_user, id, CollectionAction.MANAGE_USERS, self.services + ) + + result = await self.services.management.add_user_to_collection( + user_id, id + ) + return GenericBooleanResponse(success=result) # type: ignore + + @self.router.delete( + "/collections/{id}/users/{user_id}", + summary="Remove user from collection", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.collections.remove_user( + "123e4567-e89b-12d3-a456-426614174000", + "789a012b-c34d-5e6f-g789-012345678901" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.collections.removeUser({ + id: "123e4567-e89b-12d3-a456-426614174000" + userId: "789a012b-c34d-5e6f-g789-012345678901" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X DELETE "https://api.example.com/v3/collections/123e4567-e89b-12d3-a456-426614174000/users/789a012b-c34d-5e6f-g789-012345678901" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def remove_user_from_collection( + id: UUID = Path( + ..., description="The unique identifier of the collection" + ), + user_id: UUID = Path( + ..., description="The unique identifier of the user to remove" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Remove a user from a collection. + + This endpoint revokes a user's access to a specific collection. The + authenticated user must have admin permissions for the collection + to remove users. + """ + await authorize_collection_action( + auth_user, id, CollectionAction.MANAGE_USERS, self.services + ) + + result = ( + await self.services.management.remove_user_from_collection( + user_id, id + ) + ) + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.post( + "/collections/{id}/extract", + summary="Extract entities and relationships", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.documents.extract( + id="9fbe403b-c11c-5aae-8ade-ef22980c3ad1" + ) + """), + }, + ], + }, + ) + @self.base_endpoint + async def extract( + id: UUID = Path( + ..., + description="The ID of the document to extract entities and relationships from.", + ), + settings: Optional[GraphCreationSettings] = Body( + default=None, + description="Settings for the entities and relationships extraction process.", + ), + run_with_orchestration: Optional[bool] = Query( + default=True, + description="Whether to run the entities and relationships extraction process with orchestration.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGenericMessageResponse: + """Extracts entities and relationships from a document. + + The entities and relationships extraction process involves: + 1. Parsing documents into semantic chunks + 2. Extracting entities and relationships using LLMs + """ + await authorize_collection_action( + auth_user, id, CollectionAction.EDIT, self.services + ) + + settings = settings.dict() if settings else None # type: ignore + if not auth_user.is_superuser: + logger.warning("Implement permission checks here.") + + # Apply runtime settings overrides + server_graph_creation_settings = ( + self.providers.database.config.graph_creation_settings + ) + + if settings: + server_graph_creation_settings = update_settings_from_dict( + server_settings=server_graph_creation_settings, + settings_dict=settings, # type: ignore + ) + if run_with_orchestration: + try: + workflow_input = { + "collection_id": str(id), + "graph_creation_settings": server_graph_creation_settings.model_dump_json(), + "user": auth_user.json(), + } + + return await self.providers.orchestration.run_workflow( # type: ignore + "graph-extraction", {"request": workflow_input}, {} + ) + except Exception as e: # TODO: Need to find specific error (gRPC most likely?) + logger.error( + f"Error running orchestrated extraction: {e} \n\nAttempting to run without orchestration." + ) + + from core.main.orchestration import ( + simple_graph_search_results_factory, + ) + + logger.info("Running extract-triples without orchestration.") + simple_graph_search_results = simple_graph_search_results_factory( + self.services.graph + ) + await simple_graph_search_results["graph-extraction"]( + workflow_input + ) # type: ignore + return { # type: ignore + "message": "Graph created successfully.", + "task_id": None, + } + + @self.router.get( + "/collections/name/{collection_name}", + summary="Get a collection by name", + dependencies=[Depends(self.rate_limit_dependency)], + ) + @self.base_endpoint + async def get_collection_by_name( + collection_name: str = Path( + ..., description="The name of the collection" + ), + owner_id: Optional[UUID] = Query( + None, + description="(Superuser only) Specify the owner_id to retrieve a collection by name", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedCollectionResponse: + """Retrieve a collection by its (owner_id, name) combination. + + The authenticated user can only fetch collections they own, or, if + superuser, from anyone. + """ + if auth_user.is_superuser: + if not owner_id: + owner_id = auth_user.id + else: + owner_id = auth_user.id + + # If not superuser, fetch by (owner_id, name). Otherwise, maybe pass `owner_id=None`. + # Decide on the logic for superusers. + if not owner_id: # is_superuser + # If you want superusers to do /collections/name/<string>?owner_id=... + # just parse it from the query. For now, let's say it's not implemented. + raise R2RException( + "Superuser must specify an owner_id to fetch by name.", 400 + ) + + collection = await self.providers.database.collections_handler.get_collection_by_name( + owner_id, collection_name + ) + if not collection: + raise R2RException("Collection not found.", 404) + + # Now, authorize the 'view' action just in case: + # e.g. await authorize_collection_action(auth_user, collection.id, CollectionAction.VIEW, self.services) + + return collection # type: ignore diff --git a/.venv/lib/python3.12/site-packages/core/main/api/v3/conversations_router.py b/.venv/lib/python3.12/site-packages/core/main/api/v3/conversations_router.py new file mode 100644 index 00000000..d1b6d645 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/main/api/v3/conversations_router.py @@ -0,0 +1,737 @@ +import logging +import textwrap +from typing import Optional +from uuid import UUID + +from fastapi import Body, Depends, Path, Query +from fastapi.background import BackgroundTasks +from fastapi.responses import FileResponse + +from core.base import Message, R2RException +from core.base.api.models import ( + GenericBooleanResponse, + WrappedBooleanResponse, + WrappedConversationMessagesResponse, + WrappedConversationResponse, + WrappedConversationsResponse, + WrappedMessageResponse, +) + +from ...abstractions import R2RProviders, R2RServices +from ...config import R2RConfig +from .base_router import BaseRouterV3 + +logger = logging.getLogger() + + +class ConversationsRouter(BaseRouterV3): + def __init__( + self, providers: R2RProviders, services: R2RServices, config: R2RConfig + ): + logging.info("Initializing ConversationsRouter") + super().__init__(providers, services, config) + + def _setup_routes(self): + @self.router.post( + "/conversations", + summary="Create a new conversation", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.conversations.create() + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.conversations.create(); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/conversations" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def create_conversation( + name: Optional[str] = Body( + None, description="The name of the conversation", embed=True + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedConversationResponse: + """Create a new conversation. + + This endpoint initializes a new conversation for the authenticated + user. + """ + user_id = auth_user.id + + return await self.services.management.create_conversation( # type: ignore + user_id=user_id, + name=name, + ) + + @self.router.get( + "/conversations", + summary="List conversations", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.conversations.list( + offset=0, + limit=10, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.conversations.list(); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/conversations?offset=0&limit=10" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def list_conversations( + ids: list[str] = Query( + [], + description="A list of conversation IDs to retrieve. If not provided, all conversations will be returned.", + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedConversationsResponse: + """List conversations with pagination and sorting options. + + This endpoint returns a paginated list of conversations for the + authenticated user. + """ + requesting_user_id = ( + None if auth_user.is_superuser else [auth_user.id] + ) + + conversation_uuids = [ + UUID(conversation_id) for conversation_id in ids + ] + + conversations_response = ( + await self.services.management.conversations_overview( + offset=offset, + limit=limit, + conversation_ids=conversation_uuids, + user_ids=requesting_user_id, + ) + ) + return conversations_response["results"], { # type: ignore + "total_entries": conversations_response["total_entries"] + } + + @self.router.post( + "/conversations/export", + summary="Export conversations to CSV", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient("http://localhost:7272") + # when using auth, do client.login(...) + + response = client.conversations.export( + output_path="export.csv", + columns=["id", "created_at"], + include_header=True, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient("http://localhost:7272"); + + function main() { + await client.conversations.export({ + outputPath: "export.csv", + columns: ["id", "created_at"], + includeHeader: true, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "http://127.0.0.1:7272/v3/conversations/export" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -H "Content-Type: application/json" \ + -H "Accept: text/csv" \ + -d '{ "columns": ["id", "created_at"], "include_header": true }' \ + --output export.csv + """), + }, + ] + }, + ) + @self.base_endpoint + async def export_conversations( + background_tasks: BackgroundTasks, + columns: Optional[list[str]] = Body( + None, description="Specific columns to export" + ), + filters: Optional[dict] = Body( + None, description="Filters to apply to the export" + ), + include_header: Optional[bool] = Body( + True, description="Whether to include column headers" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> FileResponse: + """Export conversations as a downloadable CSV file.""" + + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can export data.", + 403, + ) + + ( + csv_file_path, + temp_file, + ) = await self.services.management.export_conversations( + columns=columns, + filters=filters, + include_header=include_header + if include_header is not None + else True, + ) + + background_tasks.add_task(temp_file.close) + + return FileResponse( + path=csv_file_path, + media_type="text/csv", + filename="documents_export.csv", + ) + + @self.router.post( + "/conversations/export_messages", + summary="Export messages to CSV", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient("http://localhost:7272") + # when using auth, do client.login(...) + + response = client.conversations.export_messages( + output_path="export.csv", + columns=["id", "created_at"], + include_header=True, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient("http://localhost:7272"); + + function main() { + await client.conversations.exportMessages({ + outputPath: "export.csv", + columns: ["id", "created_at"], + includeHeader: true, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "http://127.0.0.1:7272/v3/conversations/export_messages" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -H "Content-Type: application/json" \ + -H "Accept: text/csv" \ + -d '{ "columns": ["id", "created_at"], "include_header": true }' \ + --output export.csv + """), + }, + ] + }, + ) + @self.base_endpoint + async def export_messages( + background_tasks: BackgroundTasks, + columns: Optional[list[str]] = Body( + None, description="Specific columns to export" + ), + filters: Optional[dict] = Body( + None, description="Filters to apply to the export" + ), + include_header: Optional[bool] = Body( + True, description="Whether to include column headers" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> FileResponse: + """Export conversations as a downloadable CSV file.""" + + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can export data.", + 403, + ) + + ( + csv_file_path, + temp_file, + ) = await self.services.management.export_messages( + columns=columns, + filters=filters, + include_header=include_header + if include_header is not None + else True, + ) + + background_tasks.add_task(temp_file.close) + + return FileResponse( + path=csv_file_path, + media_type="text/csv", + filename="documents_export.csv", + ) + + @self.router.get( + "/conversations/{id}", + summary="Get conversation details", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.conversations.get( + "123e4567-e89b-12d3-a456-426614174000" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.conversations.retrieve({ + id: "123e4567-e89b-12d3-a456-426614174000", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/conversations/123e4567-e89b-12d3-a456-426614174000" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_conversation( + id: UUID = Path( + ..., description="The unique identifier of the conversation" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedConversationMessagesResponse: + """Get details of a specific conversation. + + This endpoint retrieves detailed information about a single + conversation identified by its UUID. + """ + requesting_user_id = ( + None if auth_user.is_superuser else [auth_user.id] + ) + + conversation = await self.services.management.get_conversation( + conversation_id=id, + user_ids=requesting_user_id, + ) + return conversation # type: ignore + + @self.router.post( + "/conversations/{id}", + summary="Update conversation", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.conversations.update("123e4567-e89b-12d3-a456-426614174000", "new_name") + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.conversations.update({ + id: "123e4567-e89b-12d3-a456-426614174000", + name: "new_name", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/conversations/123e4567-e89b-12d3-a456-426614174000" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -H "Content-Type: application/json" \ + -d '{"name": "new_name"}' + """), + }, + ] + }, + ) + @self.base_endpoint + async def update_conversation( + id: UUID = Path( + ..., + description="The unique identifier of the conversation to delete", + ), + name: str = Body( + ..., + description="The updated name for the conversation", + embed=True, + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedConversationResponse: + """Update an existing conversation. + + This endpoint updates the name of an existing conversation + identified by its UUID. + """ + return await self.services.management.update_conversation( # type: ignore + conversation_id=id, + name=name, + ) + + @self.router.delete( + "/conversations/{id}", + summary="Delete conversation", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.conversations.delete("123e4567-e89b-12d3-a456-426614174000") + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.conversations.delete({ + id: "123e4567-e89b-12d3-a456-426614174000", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X DELETE "https://api.example.com/v3/conversations/123e4567-e89b-12d3-a456-426614174000" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def delete_conversation( + id: UUID = Path( + ..., + description="The unique identifier of the conversation to delete", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Delete an existing conversation. + + This endpoint deletes a conversation identified by its UUID. + """ + requesting_user_id = ( + None if auth_user.is_superuser else [auth_user.id] + ) + + await self.services.management.delete_conversation( + conversation_id=id, + user_ids=requesting_user_id, + ) + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.post( + "/conversations/{id}/messages", + summary="Add message to conversation", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.conversations.add_message( + "123e4567-e89b-12d3-a456-426614174000", + content="Hello, world!", + role="user", + parent_id="parent_message_id", + metadata={"key": "value"} + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.conversations.addMessage({ + id: "123e4567-e89b-12d3-a456-426614174000", + content: "Hello, world!", + role: "user", + parentId: "parent_message_id", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/conversations/123e4567-e89b-12d3-a456-426614174000/messages" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -H "Content-Type: application/json" \\ + -d '{"content": "Hello, world!", "parent_id": "parent_message_id", "metadata": {"key": "value"}}' + """), + }, + ] + }, + ) + @self.base_endpoint + async def add_message( + id: UUID = Path( + ..., description="The unique identifier of the conversation" + ), + content: str = Body( + ..., description="The content of the message to add" + ), + role: str = Body( + ..., description="The role of the message to add" + ), + parent_id: Optional[UUID] = Body( + None, description="The ID of the parent message, if any" + ), + metadata: Optional[dict[str, str]] = Body( + None, description="Additional metadata for the message" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedMessageResponse: + """Add a new message to a conversation. + + This endpoint adds a new message to an existing conversation. + """ + if content == "": + raise R2RException("Content cannot be empty", status_code=400) + if role not in ["user", "assistant", "system"]: + raise R2RException("Invalid role", status_code=400) + message = Message(role=role, content=content) + return await self.services.management.add_message( # type: ignore + conversation_id=id, + content=message, + parent_id=parent_id, + metadata=metadata, + ) + + @self.router.post( + "/conversations/{id}/messages/{message_id}", + summary="Update message in conversation", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.conversations.update_message( + "123e4567-e89b-12d3-a456-426614174000", + "message_id_to_update", + content="Updated content" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.conversations.updateMessage({ + id: "123e4567-e89b-12d3-a456-426614174000", + messageId: "message_id_to_update", + content: "Updated content", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/conversations/123e4567-e89b-12d3-a456-426614174000/messages/message_id_to_update" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -H "Content-Type: application/json" \\ + -d '{"content": "Updated content"}' + """), + }, + ] + }, + ) + @self.base_endpoint + async def update_message( + id: UUID = Path( + ..., description="The unique identifier of the conversation" + ), + message_id: UUID = Path( + ..., description="The ID of the message to update" + ), + content: Optional[str] = Body( + None, description="The new content for the message" + ), + metadata: Optional[dict[str, str]] = Body( + None, description="Additional metadata for the message" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedMessageResponse: + """Update an existing message in a conversation. + + This endpoint updates the content of an existing message in a + conversation. + """ + return await self.services.management.edit_message( # type: ignore + message_id=message_id, + new_content=content, + additional_metadata=metadata, + ) diff --git a/.venv/lib/python3.12/site-packages/core/main/api/v3/documents_router.py b/.venv/lib/python3.12/site-packages/core/main/api/v3/documents_router.py new file mode 100644 index 00000000..fe152b8b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/main/api/v3/documents_router.py @@ -0,0 +1,2342 @@ +import base64 +import logging +import mimetypes +import textwrap +from datetime import datetime +from io import BytesIO +from typing import Any, Optional +from urllib.parse import quote +from uuid import UUID + +from fastapi import Body, Depends, File, Form, Path, Query, UploadFile +from fastapi.background import BackgroundTasks +from fastapi.responses import FileResponse, StreamingResponse +from pydantic import Json + +from core.base import ( + IngestionConfig, + IngestionMode, + R2RException, + SearchMode, + SearchSettings, + UnprocessedChunk, + Workflow, + generate_document_id, + generate_id, + select_search_filters, +) +from core.base.abstractions import GraphCreationSettings, StoreType +from core.base.api.models import ( + GenericBooleanResponse, + WrappedBooleanResponse, + WrappedChunksResponse, + WrappedCollectionsResponse, + WrappedDocumentResponse, + WrappedDocumentSearchResponse, + WrappedDocumentsResponse, + WrappedEntitiesResponse, + WrappedGenericMessageResponse, + WrappedIngestionResponse, + WrappedRelationshipsResponse, +) +from core.utils import update_settings_from_dict + +from ...abstractions import R2RProviders, R2RServices +from ...config import R2RConfig +from .base_router import BaseRouterV3 + +logger = logging.getLogger() +MAX_CHUNKS_PER_REQUEST = 1024 * 100 + + +def merge_search_settings( + base: SearchSettings, overrides: SearchSettings +) -> SearchSettings: + # Convert both to dict + base_dict = base.model_dump() + overrides_dict = overrides.model_dump(exclude_unset=True) + + # Update base_dict with values from overrides_dict + # This ensures that any field set in overrides takes precedence + for k, v in overrides_dict.items(): + base_dict[k] = v + + # Construct a new SearchSettings from the merged dict + return SearchSettings(**base_dict) + + +def merge_ingestion_config( + base: IngestionConfig, overrides: IngestionConfig +) -> IngestionConfig: + base_dict = base.model_dump() + overrides_dict = overrides.model_dump(exclude_unset=True) + + for k, v in overrides_dict.items(): + base_dict[k] = v + + return IngestionConfig(**base_dict) + + +class DocumentsRouter(BaseRouterV3): + def __init__( + self, + providers: R2RProviders, + services: R2RServices, + config: R2RConfig, + ): + logging.info("Initializing DocumentsRouter") + super().__init__(providers, services, config) + self._register_workflows() + + def _prepare_search_settings( + self, + auth_user: Any, + search_mode: SearchMode, + search_settings: Optional[SearchSettings], + ) -> SearchSettings: + """Prepare the effective search settings based on the provided + search_mode, optional user-overrides in search_settings, and applied + filters.""" + + if search_mode != SearchMode.custom: + # Start from mode defaults + effective_settings = SearchSettings.get_default(search_mode.value) + if search_settings: + # Merge user-provided overrides + effective_settings = merge_search_settings( + effective_settings, search_settings + ) + else: + # Custom mode: use provided settings or defaults + effective_settings = search_settings or SearchSettings() + + # Apply user-specific filters + effective_settings.filters = select_search_filters( + auth_user, effective_settings + ) + + return effective_settings + + # TODO - Remove this legacy method + def _register_workflows(self): + self.providers.orchestration.register_workflows( + Workflow.INGESTION, + self.services.ingestion, + { + "ingest-files": ( + "Ingest files task queued successfully." + if self.providers.orchestration.config.provider != "simple" + else "Document created and ingested successfully." + ), + "ingest-chunks": ( + "Ingest chunks task queued successfully." + if self.providers.orchestration.config.provider != "simple" + else "Document created and ingested successfully." + ), + "update-chunk": ( + "Update chunk task queued successfully." + if self.providers.orchestration.config.provider != "simple" + else "Chunk update completed successfully." + ), + "update-document-metadata": ( + "Update document metadata task queued successfully." + if self.providers.orchestration.config.provider != "simple" + else "Document metadata update completed successfully." + ), + "create-vector-index": ( + "Vector index creation task queued successfully." + if self.providers.orchestration.config.provider != "simple" + else "Vector index creation task completed successfully." + ), + "delete-vector-index": ( + "Vector index deletion task queued successfully." + if self.providers.orchestration.config.provider != "simple" + else "Vector index deletion task completed successfully." + ), + "select-vector-index": ( + "Vector index selection task queued successfully." + if self.providers.orchestration.config.provider != "simple" + else "Vector index selection task completed successfully." + ), + }, + ) + + def _prepare_ingestion_config( + self, + ingestion_mode: IngestionMode, + ingestion_config: Optional[IngestionConfig], + ) -> IngestionConfig: + # If not custom, start from defaults + if ingestion_mode != IngestionMode.custom: + effective_config = IngestionConfig.get_default( + ingestion_mode.value, app=self.providers.auth.config.app + ) + if ingestion_config: + effective_config = merge_ingestion_config( + effective_config, ingestion_config + ) + else: + # custom mode + effective_config = ingestion_config or IngestionConfig( + app=self.providers.auth.config.app + ) + + effective_config.validate_config() + return effective_config + + def _setup_routes(self): + @self.router.post( + "/documents", + dependencies=[Depends(self.rate_limit_dependency)], + status_code=202, + summary="Create a new document", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.documents.create( + file_path="pg_essay_1.html", + metadata={"metadata_1":"some random metadata"}, + id=None + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.documents.create({ + file: { path: "examples/data/marmeladov.txt", name: "marmeladov.txt" }, + metadata: { title: "marmeladov.txt" }, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/documents" \\ + -H "Content-Type: multipart/form-data" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -F "file=@pg_essay_1.html;type=text/html" \\ + -F 'metadata={}' \\ + -F 'id=null' + """), + }, + ] + }, + ) + @self.base_endpoint + async def create_document( + file: Optional[UploadFile] = File( + None, + description="The file to ingest. Exactly one of file, raw_text, or chunks must be provided.", + ), + raw_text: Optional[str] = Form( + None, + description="Raw text content to ingest. Exactly one of file, raw_text, or chunks must be provided.", + ), + chunks: Optional[Json[list[str]]] = Form( + None, + description="Pre-processed text chunks to ingest. Exactly one of file, raw_text, or chunks must be provided.", + ), + id: Optional[UUID] = Form( + None, + description="The ID of the document. If not provided, a new ID will be generated.", + ), + collection_ids: Optional[Json[list[UUID]]] = Form( + None, + description="Collection IDs to associate with the document. If none are provided, the document will be assigned to the user's default collection.", + ), + metadata: Optional[Json[dict]] = Form( + None, + description="Metadata to associate with the document, such as title, description, or custom fields.", + ), + ingestion_mode: IngestionMode = Form( + default=IngestionMode.custom, + description=( + "Ingestion modes:\n" + "- `hi-res`: Thorough ingestion with full summaries and enrichment.\n" + "- `fast`: Quick ingestion with minimal enrichment and no summaries.\n" + "- `custom`: Full control via `ingestion_config`.\n\n" + "If `filters` or `limit` (in `ingestion_config`) are provided alongside `hi-res` or `fast`, " + "they will override the default settings for that mode." + ), + ), + ingestion_config: Optional[Json[IngestionConfig]] = Form( + None, + description="An optional dictionary to override the default chunking configuration for the ingestion process. If not provided, the system will use the default server-side chunking configuration.", + ), + run_with_orchestration: Optional[bool] = Form( + True, + description="Whether or not ingestion runs with orchestration, default is `True`. When set to `False`, the ingestion process will run synchronous and directly return the result.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedIngestionResponse: + """ + Creates a new Document object from an input file, text content, or chunks. The chosen `ingestion_mode` determines + how the ingestion process is configured: + + **Ingestion Modes:** + - `hi-res`: Comprehensive parsing and enrichment, including summaries and possibly more thorough parsing. + - `fast`: Speed-focused ingestion that skips certain enrichment steps like summaries. + - `custom`: Provide a full `ingestion_config` to customize the entire ingestion process. + + Either a file or text content must be provided, but not both. Documents are shared through `Collections` which allow for tightly specified cross-user interactions. + + The ingestion process runs asynchronously and its progress can be tracked using the returned + task_id. + """ + if not auth_user.is_superuser: + user_document_count = ( + await self.services.management.documents_overview( + user_ids=[auth_user.id], + offset=0, + limit=1, + ) + )["total_entries"] + user_max_documents = ( + await self.services.management.get_user_max_documents( + auth_user.id + ) + ) + + if user_document_count >= user_max_documents: + raise R2RException( + status_code=403, + message=f"User has reached the maximum number of documents allowed ({user_max_documents}).", + ) + + # Get chunks using the vector handler's list_chunks method + user_chunk_count = ( + await self.services.ingestion.list_chunks( + filters={"owner_id": {"$eq": str(auth_user.id)}}, + offset=0, + limit=1, + ) + )["total_entries"] + user_max_chunks = ( + await self.services.management.get_user_max_chunks( + auth_user.id + ) + ) + if user_chunk_count >= user_max_chunks: + raise R2RException( + status_code=403, + message=f"User has reached the maximum number of chunks allowed ({user_max_chunks}).", + ) + + user_collections_count = ( + await self.services.management.collections_overview( + user_ids=[auth_user.id], + offset=0, + limit=1, + ) + )["total_entries"] + user_max_collections = ( + await self.services.management.get_user_max_collections( + auth_user.id + ) + ) + if user_collections_count >= user_max_collections: # type: ignore + raise R2RException( + status_code=403, + message=f"User has reached the maximum number of collections allowed ({user_max_collections}).", + ) + + effective_ingestion_config = self._prepare_ingestion_config( + ingestion_mode=ingestion_mode, + ingestion_config=ingestion_config, + ) + if not file and not raw_text and not chunks: + raise R2RException( + status_code=422, + message="Either a `file`, `raw_text`, or `chunks` must be provided.", + ) + if ( + (file and raw_text) + or (file and chunks) + or (raw_text and chunks) + ): + raise R2RException( + status_code=422, + message="Only one of `file`, `raw_text`, or `chunks` may be provided.", + ) + # Check if the user is a superuser + metadata = metadata or {} + + if chunks: + if len(chunks) == 0: + raise R2RException("Empty list of chunks provided", 400) + + if len(chunks) > MAX_CHUNKS_PER_REQUEST: + raise R2RException( + f"Maximum of {MAX_CHUNKS_PER_REQUEST} chunks per request", + 400, + ) + + document_id = id or generate_document_id( + "".join(chunks), auth_user.id + ) + + # FIXME: Metadata doesn't seem to be getting passed through + raw_chunks_for_doc = [ + UnprocessedChunk( + text=chunk, + metadata=metadata, + id=generate_id(), + ) + for chunk in chunks + ] + + # Prepare workflow input + workflow_input = { + "document_id": str(document_id), + "chunks": [ + chunk.model_dump(mode="json") + for chunk in raw_chunks_for_doc + ], + "metadata": metadata, # Base metadata for the document + "user": auth_user.model_dump_json(), + "ingestion_config": effective_ingestion_config.model_dump( + mode="json" + ), + } + + if run_with_orchestration: + try: + # Run ingestion with orchestration + raw_message = ( + await self.providers.orchestration.run_workflow( + "ingest-chunks", + {"request": workflow_input}, + options={ + "additional_metadata": { + "document_id": str(document_id), + } + }, + ) + ) + raw_message["document_id"] = str(document_id) + return raw_message # type: ignore + except Exception as e: # TODO: Need to find specific errors that we should be excepting (gRPC most likely?) + logger.error( + f"Error running orchestrated ingestion: {e} \n\nAttempting to run without orchestration." + ) + + logger.info("Running chunk ingestion without orchestration.") + from core.main.orchestration import simple_ingestion_factory + + simple_ingestor = simple_ingestion_factory( + self.services.ingestion + ) + await simple_ingestor["ingest-chunks"](workflow_input) + + return { # type: ignore + "message": "Document created and ingested successfully.", + "document_id": str(document_id), + "task_id": None, + } + + else: + if file: + file_data = await self._process_file(file) + + if not file.filename: + raise R2RException( + status_code=422, + message="Uploaded file must have a filename.", + ) + + file_ext = file.filename.split(".")[ + -1 + ] # e.g. "pdf", "txt" + max_allowed_size = await self.services.management.get_max_upload_size_by_type( + user_id=auth_user.id, file_type_or_ext=file_ext + ) + + content_length = file_data["content_length"] + + if content_length > max_allowed_size: + raise R2RException( + status_code=413, # HTTP 413: Payload Too Large + message=( + f"File size exceeds maximum of {max_allowed_size} bytes " + f"for extension '{file_ext}'." + ), + ) + + file_content = BytesIO( + base64.b64decode(file_data["content"]) + ) + + file_data.pop("content", None) + document_id = id or generate_document_id( + file_data["filename"], auth_user.id + ) + elif raw_text: + content_length = len(raw_text) + file_content = BytesIO(raw_text.encode("utf-8")) + document_id = id or generate_document_id( + raw_text, auth_user.id + ) + title = metadata.get("title", None) + title = title + ".txt" if title else None + file_data = { + "filename": title or "N/A", + "content_type": "text/plain", + } + else: + raise R2RException( + status_code=422, + message="Either a file or content must be provided.", + ) + + workflow_input = { + "file_data": file_data, + "document_id": str(document_id), + "collection_ids": ( + [str(cid) for cid in collection_ids] + if collection_ids + else None + ), + "metadata": metadata, + "ingestion_config": effective_ingestion_config.model_dump( + mode="json" + ), + "user": auth_user.model_dump_json(), + "size_in_bytes": content_length, + "version": "v0", + } + + file_name = file_data["filename"] + await self.providers.database.files_handler.store_file( + document_id, + file_name, + file_content, + file_data["content_type"], + ) + + await self.services.ingestion.ingest_file_ingress( + file_data=workflow_input["file_data"], + user=auth_user, + document_id=workflow_input["document_id"], + size_in_bytes=workflow_input["size_in_bytes"], + metadata=workflow_input["metadata"], + version=workflow_input["version"], + ) + + if run_with_orchestration: + try: + # TODO - Modify create_chunks so that we can add chunks to existing document + + workflow_result: dict[ + str, str | None + ] = await self.providers.orchestration.run_workflow( # type: ignore + "ingest-files", + {"request": workflow_input}, + options={ + "additional_metadata": { + "document_id": str(document_id), + } + }, + ) + workflow_result["document_id"] = str(document_id) + return workflow_result # type: ignore + except Exception as e: # TODO: Need to find specific error (gRPC most likely?) + logger.error( + f"Error running orchestrated ingestion: {e} \n\nAttempting to run without orchestration." + ) + logger.info( + f"Running ingestion without orchestration for file {file_name} and document_id {document_id}." + ) + # TODO - Clean up implementation logic here to be more explicitly `synchronous` + from core.main.orchestration import simple_ingestion_factory + + simple_ingestor = simple_ingestion_factory(self.services.ingestion) + await simple_ingestor["ingest-files"](workflow_input) + return { # type: ignore + "message": "Document created and ingested successfully.", + "document_id": str(document_id), + "task_id": None, + } + + @self.router.patch( + "/documents/{id}/metadata", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Append metadata to a document", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.documents.append_metadata( + id="9fbe403b-c11c-5aae-8ade-ef22980c3ad1", + metadata=[{"key": "new_key", "value": "new_value"}] + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.documents.appendMetadata({ + id: "9fbe403b-c11c-5aae-8ade-ef22980c3ad1", + metadata: [{ key: "new_key", value: "new_value" }], + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def patch_metadata( + id: UUID = Path( + ..., + description="The ID of the document to append metadata to.", + ), + metadata: list[dict] = Body( + ..., + description="Metadata to append to the document.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedDocumentResponse: + """Appends metadata to a document. This endpoint allows adding new metadata fields or updating existing ones.""" + request_user_ids = ( + None if auth_user.is_superuser else [auth_user.id] + ) + + documents_overview_response = ( + await self.services.management.documents_overview( + user_ids=request_user_ids, + document_ids=[id], + offset=0, + limit=1, + ) + ) + results = documents_overview_response["results"] + if len(results) == 0: + raise R2RException("Document not found.", 404) + + return await self.services.management.update_document_metadata( + document_id=id, + metadata=metadata, + overwrite=False, + ) + + @self.router.put( + "/documents/{id}/metadata", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Replace metadata of a document", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.documents.replace_metadata( + id="9fbe403b-c11c-5aae-8ade-ef22980c3ad1", + metadata=[{"key": "new_key", "value": "new_value"}] + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.documents.replaceMetadata({ + id: "9fbe403b-c11c-5aae-8ade-ef22980c3ad1", + metadata: [{ key: "new_key", value: "new_value" }], + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def put_metadata( + id: UUID = Path( + ..., + description="The ID of the document to append metadata to.", + ), + metadata: list[dict] = Body( + ..., + description="Metadata to append to the document.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedDocumentResponse: + """Replaces metadata in a document. This endpoint allows overwriting existing metadata fields.""" + request_user_ids = ( + None if auth_user.is_superuser else [auth_user.id] + ) + + documents_overview_response = ( + await self.services.management.documents_overview( + user_ids=request_user_ids, + document_ids=[id], + offset=0, + limit=1, + ) + ) + results = documents_overview_response["results"] + if len(results) == 0: + raise R2RException("Document not found.", 404) + + return await self.services.management.update_document_metadata( + document_id=id, + metadata=metadata, + overwrite=True, + ) + + @self.router.post( + "/documents/export", + summary="Export documents to CSV", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient("http://localhost:7272") + # when using auth, do client.login(...) + + response = client.documents.export( + output_path="export.csv", + columns=["id", "title", "created_at"], + include_header=True, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient("http://localhost:7272"); + + function main() { + await client.documents.export({ + outputPath: "export.csv", + columns: ["id", "title", "created_at"], + includeHeader: true, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "http://127.0.0.1:7272/v3/documents/export" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -H "Content-Type: application/json" \ + -H "Accept: text/csv" \ + -d '{ "columns": ["id", "title", "created_at"], "include_header": true }' \ + --output export.csv + """), + }, + ] + }, + ) + @self.base_endpoint + async def export_documents( + background_tasks: BackgroundTasks, + columns: Optional[list[str]] = Body( + None, description="Specific columns to export" + ), + filters: Optional[dict] = Body( + None, description="Filters to apply to the export" + ), + include_header: Optional[bool] = Body( + True, description="Whether to include column headers" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> FileResponse: + """Export documents as a downloadable CSV file.""" + + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can export data.", + 403, + ) + + ( + csv_file_path, + temp_file, + ) = await self.services.management.export_documents( + columns=columns, + filters=filters, + include_header=include_header + if include_header is not None + else True, + ) + + background_tasks.add_task(temp_file.close) + + return FileResponse( + path=csv_file_path, + media_type="text/csv", + filename="documents_export.csv", + ) + + @self.router.get( + "/documents/download_zip", + dependencies=[Depends(self.rate_limit_dependency)], + response_class=StreamingResponse, + summary="Export multiple documents as zip", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + client.documents.download_zip( + document_ids=["uuid1", "uuid2"], + start_date="2024-01-01", + end_date="2024-12-31" + ) + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/documents/download_zip?document_ids=uuid1,uuid2&start_date=2024-01-01&end_date=2024-12-31" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def export_files( + document_ids: Optional[list[UUID]] = Query( + None, + description="List of document IDs to include in the export. If not provided, all accessible documents will be included.", + ), + start_date: Optional[datetime] = Query( + None, + description="Filter documents created on or after this date.", + ), + end_date: Optional[datetime] = Query( + None, + description="Filter documents created before this date.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> StreamingResponse: + """Export multiple documents as a zip file. Documents can be + filtered by IDs and/or date range. + + The endpoint allows downloading: + - Specific documents by providing their IDs + - Documents within a date range + - All accessible documents if no filters are provided + + Files are streamed as a zip archive to handle potentially large downloads efficiently. + """ + if not auth_user.is_superuser: + # For non-superusers, verify access to requested documents + if document_ids: + documents_overview = ( + await self.services.management.documents_overview( + user_ids=[auth_user.id], + document_ids=document_ids, + offset=0, + limit=len(document_ids), + ) + ) + if len(documents_overview["results"]) != len(document_ids): + raise R2RException( + status_code=403, + message="You don't have access to one or more requested documents.", + ) + if not document_ids: + raise R2RException( + status_code=403, + message="Non-superusers must provide document IDs to export.", + ) + + ( + zip_name, + zip_content, + zip_size, + ) = await self.services.management.export_files( + document_ids=document_ids, + start_date=start_date, + end_date=end_date, + ) + encoded_filename = quote(zip_name) + + async def stream_file(): + yield zip_content.getvalue() + + return StreamingResponse( + stream_file(), + media_type="application/zip", + headers={ + "Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}", + "Content-Length": str(zip_size), + }, + ) + + @self.router.get( + "/documents", + dependencies=[Depends(self.rate_limit_dependency)], + summary="List documents", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.documents.list( + limit=10, + offset=0 + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.documents.list({ + limit: 10, + offset: 0, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/documents" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_documents( + ids: list[str] = Query( + [], + description="A list of document IDs to retrieve. If not provided, all documents will be returned.", + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + include_summary_embeddings: bool = Query( + False, + description="Specifies whether or not to include embeddings of each document summary.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedDocumentsResponse: + """Returns a paginated list of documents the authenticated user has + access to. + + Results can be filtered by providing specific document IDs. Regular + users will only see documents they own or have access to through + collections. Superusers can see all documents. + + The documents are returned in order of last modification, with most + recent first. + """ + requesting_user_id = ( + None if auth_user.is_superuser else [auth_user.id] + ) + filter_collection_ids = ( + None if auth_user.is_superuser else auth_user.collection_ids + ) + + document_uuids = [UUID(document_id) for document_id in ids] + documents_overview_response = ( + await self.services.management.documents_overview( + user_ids=requesting_user_id, + collection_ids=filter_collection_ids, + document_ids=document_uuids, + offset=offset, + limit=limit, + ) + ) + if not include_summary_embeddings: + for document in documents_overview_response["results"]: + document.summary_embedding = None + + return ( # type: ignore + documents_overview_response["results"], + { + "total_entries": documents_overview_response[ + "total_entries" + ] + }, + ) + + @self.router.get( + "/documents/{id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Retrieve a document", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.documents.retrieve( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.documents.retrieve({ + id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_document( + id: UUID = Path( + ..., + description="The ID of the document to retrieve.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedDocumentResponse: + """Retrieves detailed information about a specific document by its + ID. + + This endpoint returns the document's metadata, status, and system information. It does not + return the document's content - use the `/documents/{id}/download` endpoint for that. + + Users can only retrieve documents they own or have access to through collections. + Superusers can retrieve any document. + """ + request_user_ids = ( + None if auth_user.is_superuser else [auth_user.id] + ) + filter_collection_ids = ( + None if auth_user.is_superuser else auth_user.collection_ids + ) + + documents_overview_response = await self.services.management.documents_overview( # FIXME: This was using the pagination defaults from before... We need to review if this is as intended. + user_ids=request_user_ids, + collection_ids=filter_collection_ids, + document_ids=[id], + offset=0, + limit=100, + ) + results = documents_overview_response["results"] + if len(results) == 0: + raise R2RException("Document not found.", 404) + + return results[0] + + @self.router.get( + "/documents/{id}/chunks", + dependencies=[Depends(self.rate_limit_dependency)], + summary="List document chunks", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.documents.list_chunks( + id="32b6a70f-a995-5c51-85d2-834f06283a1e" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.documents.listChunks({ + id: "32b6a70f-a995-5c51-85d2-834f06283a1e", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/chunks" \\ + -H "Authorization: Bearer YOUR_API_KEY"\ + """), + }, + ] + }, + ) + @self.base_endpoint + async def list_chunks( + id: UUID = Path( + ..., + description="The ID of the document to retrieve chunks for.", + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + include_vectors: Optional[bool] = Query( + False, + description="Whether to include vector embeddings in the response.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedChunksResponse: + """Retrieves the text chunks that were generated from a document + during ingestion. Chunks represent semantic sections of the + document and are used for retrieval and analysis. + + Users can only access chunks from documents they own or have access + to through collections. Vector embeddings are only included if + specifically requested. + + Results are returned in chunk sequence order, representing their + position in the original document. + """ + list_document_chunks = ( + await self.services.management.list_document_chunks( + document_id=id, + offset=offset, + limit=limit, + include_vectors=include_vectors or False, + ) + ) + + if not list_document_chunks["results"]: + raise R2RException( + "No chunks found for the given document ID.", 404 + ) + + is_owner = str( + list_document_chunks["results"][0].get("owner_id") + ) == str(auth_user.id) + document_collections = ( + await self.services.management.collections_overview( + offset=0, + limit=-1, + document_ids=[id], + ) + ) + + user_has_access = ( + is_owner + or set(auth_user.collection_ids).intersection( + {ele.id for ele in document_collections["results"]} # type: ignore + ) + != set() + ) + + if not user_has_access and not auth_user.is_superuser: + raise R2RException( + "Not authorized to access this document's chunks.", 403 + ) + + return ( # type: ignore + list_document_chunks["results"], + {"total_entries": list_document_chunks["total_entries"]}, + ) + + @self.router.get( + "/documents/{id}/download", + dependencies=[Depends(self.rate_limit_dependency)], + response_class=StreamingResponse, + summary="Download document content", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.documents.download( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.documents.download({ + id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/download" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_document_file( + id: str = Path(..., description="Document ID"), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> StreamingResponse: + """Downloads the original file content of a document. + + For uploaded files, returns the original file with its proper MIME + type. For text-only documents, returns the content as plain text. + + Users can only download documents they own or have access to + through collections. + """ + try: + document_uuid = UUID(id) + except ValueError: + raise R2RException( + status_code=422, message="Invalid document ID format." + ) from None + + # Retrieve the document's information + documents_overview_response = ( + await self.services.management.documents_overview( + user_ids=None, + collection_ids=None, + document_ids=[document_uuid], + offset=0, + limit=1, + ) + ) + + if not documents_overview_response["results"]: + raise R2RException("Document not found.", 404) + + document = documents_overview_response["results"][0] + + is_owner = str(document.owner_id) == str(auth_user.id) + + if not auth_user.is_superuser and not is_owner: + document_collections = ( + await self.services.management.collections_overview( + offset=0, + limit=-1, + document_ids=[document_uuid], + ) + ) + + document_collection_ids = { + str(ele.id) + for ele in document_collections["results"] # type: ignore + } + + user_collection_ids = { + str(cid) for cid in auth_user.collection_ids + } + + has_collection_access = user_collection_ids.intersection( + document_collection_ids + ) + + if not has_collection_access: + raise R2RException( + "Not authorized to access this document.", 403 + ) + + file_tuple = await self.services.management.download_file( + document_uuid + ) + if not file_tuple: + raise R2RException(status_code=404, message="File not found.") + + file_name, file_content, file_size = file_tuple + encoded_filename = quote(file_name) + + mime_type, _ = mimetypes.guess_type(file_name) + if not mime_type: + mime_type = "application/octet-stream" + + async def file_stream(): + chunk_size = 1024 * 1024 # 1MB + while True: + data = file_content.read(chunk_size) + if not data: + break + yield data + + return StreamingResponse( + file_stream(), + media_type=mime_type, + headers={ + "Content-Disposition": f"inline; filename*=UTF-8''{encoded_filename}", + "Content-Length": str(file_size), + }, + ) + + @self.router.delete( + "/documents/by-filter", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Delete documents by filter", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + client = R2RClient() + # when using auth, do client.login(...) + response = client.documents.delete_by_filter( + filters={"document_type": {"$eq": "txt"}} + ) + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X DELETE "https://api.example.com/v3/documents/by-filter?filters=%7B%22document_type%22%3A%7B%22%24eq%22%3A%22text%22%7D%2C%22created_at%22%3A%7B%22%24lt%22%3A%222023-01-01T00%3A00%3A00Z%22%7D%7D" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def delete_document_by_filter( + filters: Json[dict] = Body( + ..., description="JSON-encoded filters" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Delete documents based on provided filters. + + Allowed operators + include: `eq`, `neq`, `gt`, `gte`, `lt`, `lte`, `like`, + `ilike`, `in`, and `nin`. Deletion requests are limited to a + user's own documents. + """ + + filters_dict = { + "$and": [{"owner_id": {"$eq": str(auth_user.id)}}, filters] + } + await ( + self.services.management.delete_documents_and_chunks_by_filter( + filters=filters_dict + ) + ) + + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.delete( + "/documents/{id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Delete a document", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.documents.delete( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.documents.delete({ + id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X DELETE "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def delete_document_by_id( + id: UUID = Path(..., description="Document ID"), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Delete a specific document. All chunks corresponding to the + document are deleted, and all other references to the document are + removed. + + NOTE - Deletions do not yet impact the knowledge graph or other derived data. This feature is planned for a future release. + """ + + filters: dict[str, Any] = {"document_id": {"$eq": str(id)}} + if not auth_user.is_superuser: + filters = { + "$and": [ + {"owner_id": {"$eq": str(auth_user.id)}}, + {"document_id": {"$eq": str(id)}}, + ] + } + + await ( + self.services.management.delete_documents_and_chunks_by_filter( + filters=filters + ) + ) + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.get( + "/documents/{id}/collections", + dependencies=[Depends(self.rate_limit_dependency)], + summary="List document collections", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.documents.list_collections( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", offset=0, limit=10 + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.documents.listCollections({ + id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/collections" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_document_collections( + id: str = Path(..., description="Document ID"), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedCollectionsResponse: + """Retrieves all collections that contain the specified document. + This endpoint is restricted to superusers only and provides a + system-wide view of document organization. + + Collections are used to organize documents and manage access control. A document can belong + to multiple collections, and users can access documents through collection membership. + + The results are paginated and ordered by collection creation date, with the most recently + created collections appearing first. + + NOTE - This endpoint is only available to superusers, it will be extended to regular users in a future release. + """ + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can get the collections belonging to a document.", + 403, + ) + + collections_response = ( + await self.services.management.collections_overview( + offset=offset, + limit=limit, + document_ids=[UUID(id)], # Convert string ID to UUID + ) + ) + + return collections_response["results"], { # type: ignore + "total_entries": collections_response["total_entries"] + } + + @self.router.post( + "/documents/{id}/extract", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Extract entities and relationships", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.documents.extract( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" + ) + """), + }, + ], + }, + ) + @self.base_endpoint + async def extract( + id: UUID = Path( + ..., + description="The ID of the document to extract entities and relationships from.", + ), + settings: Optional[GraphCreationSettings] = Body( + default=None, + description="Settings for the entities and relationships extraction process.", + ), + run_with_orchestration: Optional[bool] = Body( + default=True, + description="Whether to run the entities and relationships extraction process with orchestration.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGenericMessageResponse: + """Extracts entities and relationships from a document. + + The entities and relationships extraction process involves: + + 1. Parsing documents into semantic chunks + + 2. Extracting entities and relationships using LLMs + + 3. Storing the created entities and relationships in the knowledge graph + + 4. Preserving the document's metadata and content, and associating the elements with collections the document belongs to + """ + + settings = settings.dict() if settings else None # type: ignore + documents_overview_response = ( + await self.services.management.documents_overview( + user_ids=( + None if auth_user.is_superuser else [auth_user.id] + ), + collection_ids=( + None + if auth_user.is_superuser + else auth_user.collection_ids + ), + document_ids=[id], + offset=0, + limit=1, + ) + )["results"] + if len(documents_overview_response) == 0: + raise R2RException("Document not found.", 404) + + if ( + not auth_user.is_superuser + and auth_user.id != documents_overview_response[0].owner_id + ): + raise R2RException( + "Only a superuser can extract entities and relationships from a document they do not own.", + 403, + ) + + # Apply runtime settings overrides + server_graph_creation_settings = ( + self.providers.database.config.graph_creation_settings + ) + + if settings: + server_graph_creation_settings = update_settings_from_dict( + server_settings=server_graph_creation_settings, + settings_dict=settings, # type: ignore + ) + + if run_with_orchestration: + try: + workflow_input = { + "document_id": str(id), + "graph_creation_settings": server_graph_creation_settings.model_dump_json(), + "user": auth_user.json(), + } + + return await self.providers.orchestration.run_workflow( # type: ignore + "graph-extraction", {"request": workflow_input}, {} + ) + except Exception as e: # TODO: Need to find specific errors that we should be excepting (gRPC most likely?) + logger.error( + f"Error running orchestrated extraction: {e} \n\nAttempting to run without orchestration." + ) + + from core.main.orchestration import ( + simple_graph_search_results_factory, + ) + + logger.info("Running extract-triples without orchestration.") + simple_graph_search_results = simple_graph_search_results_factory( + self.services.graph + ) + await simple_graph_search_results["graph-extraction"]( + workflow_input + ) + return { # type: ignore + "message": "Graph created successfully.", + "task_id": None, + } + + @self.router.post( + "/documents/{id}/deduplicate", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Deduplicate entities", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + + response = client.documents.deduplicate( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.documents.deduplicate({ + id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/deduplicate" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ], + }, + ) + @self.base_endpoint + async def deduplicate( + id: UUID = Path( + ..., + description="The ID of the document to extract entities and relationships from.", + ), + settings: Optional[GraphCreationSettings] = Body( + default=None, + description="Settings for the entities and relationships extraction process.", + ), + run_with_orchestration: Optional[bool] = Body( + default=True, + description="Whether to run the entities and relationships extraction process with orchestration.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGenericMessageResponse: + """Deduplicates entities from a document.""" + + settings = settings.model_dump() if settings else None # type: ignore + documents_overview_response = ( + await self.services.management.documents_overview( + user_ids=( + None if auth_user.is_superuser else [auth_user.id] + ), + collection_ids=( + None + if auth_user.is_superuser + else auth_user.collection_ids + ), + document_ids=[id], + offset=0, + limit=1, + ) + )["results"] + if len(documents_overview_response) == 0: + raise R2RException("Document not found.", 404) + + if ( + not auth_user.is_superuser + and auth_user.id != documents_overview_response[0].owner_id + ): + raise R2RException( + "Only a superuser can run deduplication on a document they do not own.", + 403, + ) + + # Apply runtime settings overrides + server_graph_creation_settings = ( + self.providers.database.config.graph_creation_settings + ) + + if settings: + server_graph_creation_settings = update_settings_from_dict( + server_settings=server_graph_creation_settings, + settings_dict=settings, # type: ignore + ) + + if run_with_orchestration: + try: + workflow_input = { + "document_id": str(id), + } + + return await self.providers.orchestration.run_workflow( # type: ignore + "graph-deduplication", + {"request": workflow_input}, + {}, + ) + except Exception as e: # TODO: Need to find specific errors that we should be excepting (gRPC most likely?) + logger.error( + f"Error running orchestrated deduplication: {e} \n\nAttempting to run without orchestration." + ) + + from core.main.orchestration import ( + simple_graph_search_results_factory, + ) + + logger.info( + "Running deduplicate-document-entities without orchestration." + ) + simple_graph_search_results = simple_graph_search_results_factory( + self.services.graph + ) + await simple_graph_search_results["graph-deduplication"]( + workflow_input + ) + return { # type: ignore + "message": "Graph created successfully.", + "task_id": None, + } + + @self.router.get( + "/documents/{id}/entities", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Lists the entities from the document", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.documents.extract( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" + ) + """), + }, + ], + }, + ) + @self.base_endpoint + async def get_entities( + id: UUID = Path( + ..., + description="The ID of the document to retrieve entities from.", + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + include_embeddings: Optional[bool] = Query( + False, + description="Whether to include vector embeddings in the response.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedEntitiesResponse: + """Retrieves the entities that were extracted from a document. + These represent important semantic elements like people, places, + organizations, concepts, etc. + + Users can only access entities from documents they own or have + access to through collections. Entity embeddings are only included + if specifically requested. + + Results are returned in the order they were extracted from the + document. + """ + # if ( + # not auth_user.is_superuser + # and id not in auth_user.collection_ids + # ): + # raise R2RException( + # "The currently authenticated user does not have access to the specified collection.", + # 403, + # ) + + # First check if the document exists and user has access + documents_overview_response = ( + await self.services.management.documents_overview( + user_ids=( + None if auth_user.is_superuser else [auth_user.id] + ), + collection_ids=( + None + if auth_user.is_superuser + else auth_user.collection_ids + ), + document_ids=[id], + offset=0, + limit=1, + ) + ) + + if not documents_overview_response["results"]: + raise R2RException("Document not found.", 404) + + # Get all entities for this document from the document_entity table + ( + entities, + count, + ) = await self.providers.database.graphs_handler.entities.get( + parent_id=id, + store_type=StoreType.DOCUMENTS, + offset=offset, + limit=limit, + include_embeddings=include_embeddings or False, + ) + + return entities, {"total_entries": count} # type: ignore + + @self.router.post( + "/documents/{id}/entities/export", + summary="Export document entities to CSV", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient("http://localhost:7272") + # when using auth, do client.login(...) + + response = client.documents.export_entities( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + output_path="export.csv", + columns=["id", "title", "created_at"], + include_header=True, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient("http://localhost:7272"); + + function main() { + await client.documents.exportEntities({ + id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + outputPath: "export.csv", + columns: ["id", "title", "created_at"], + includeHeader: true, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "http://127.0.0.1:7272/v3/documents/export_entities" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -H "Content-Type: application/json" \ + -H "Accept: text/csv" \ + -d '{ "columns": ["id", "title", "created_at"], "include_header": true }' \ + --output export.csv + """), + }, + ] + }, + ) + @self.base_endpoint + async def export_entities( + background_tasks: BackgroundTasks, + id: UUID = Path( + ..., + description="The ID of the document to export entities from.", + ), + columns: Optional[list[str]] = Body( + None, description="Specific columns to export" + ), + filters: Optional[dict] = Body( + None, description="Filters to apply to the export" + ), + include_header: Optional[bool] = Body( + True, description="Whether to include column headers" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> FileResponse: + """Export documents as a downloadable CSV file.""" + + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can export data.", + 403, + ) + + ( + csv_file_path, + temp_file, + ) = await self.services.management.export_document_entities( + id=id, + columns=columns, + filters=filters, + include_header=include_header + if include_header is not None + else True, + ) + + background_tasks.add_task(temp_file.close) + + return FileResponse( + path=csv_file_path, + media_type="text/csv", + filename="documents_export.csv", + ) + + @self.router.get( + "/documents/{id}/relationships", + dependencies=[Depends(self.rate_limit_dependency)], + summary="List document relationships", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.documents.list_relationships( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + offset=0, + limit=100 + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.documents.listRelationships({ + id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + offset: 0, + limit: 100, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/documents/b4ac4dd6-5f27-596e-a55b-7cf242ca30aa/relationships" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_relationships( + id: UUID = Path( + ..., + description="The ID of the document to retrieve relationships for.", + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + entity_names: Optional[list[str]] = Query( + None, + description="Filter relationships by specific entity names.", + ), + relationship_types: Optional[list[str]] = Query( + None, + description="Filter relationships by specific relationship types.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedRelationshipsResponse: + """Retrieves the relationships between entities that were extracted + from a document. These represent connections and interactions + between entities found in the text. + + Users can only access relationships from documents they own or have + access to through collections. Results can be filtered by entity + names and relationship types. + + Results are returned in the order they were extracted from the + document. + """ + # if ( + # not auth_user.is_superuser + # and id not in auth_user.collection_ids + # ): + # raise R2RException( + # "The currently authenticated user does not have access to the specified collection.", + # 403, + # ) + + # First check if the document exists and user has access + documents_overview_response = ( + await self.services.management.documents_overview( + user_ids=( + None if auth_user.is_superuser else [auth_user.id] + ), + collection_ids=( + None + if auth_user.is_superuser + else auth_user.collection_ids + ), + document_ids=[id], + offset=0, + limit=1, + ) + ) + + if not documents_overview_response["results"]: + raise R2RException("Document not found.", 404) + + # Get relationships for this document + ( + relationships, + count, + ) = await self.providers.database.graphs_handler.relationships.get( + parent_id=id, + store_type=StoreType.DOCUMENTS, + entity_names=entity_names, + relationship_types=relationship_types, + offset=offset, + limit=limit, + ) + + return relationships, {"total_entries": count} # type: ignore + + @self.router.post( + "/documents/{id}/relationships/export", + summary="Export document relationships to CSV", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient("http://localhost:7272") + # when using auth, do client.login(...) + + response = client.documents.export_entities( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + output_path="export.csv", + columns=["id", "title", "created_at"], + include_header=True, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient("http://localhost:7272"); + + function main() { + await client.documents.exportEntities({ + id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + outputPath: "export.csv", + columns: ["id", "title", "created_at"], + includeHeader: true, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "http://127.0.0.1:7272/v3/documents/export_entities" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -H "Content-Type: application/json" \ + -H "Accept: text/csv" \ + -d '{ "columns": ["id", "title", "created_at"], "include_header": true }' \ + --output export.csv + """), + }, + ] + }, + ) + @self.base_endpoint + async def export_relationships( + background_tasks: BackgroundTasks, + id: UUID = Path( + ..., + description="The ID of the document to export entities from.", + ), + columns: Optional[list[str]] = Body( + None, description="Specific columns to export" + ), + filters: Optional[dict] = Body( + None, description="Filters to apply to the export" + ), + include_header: Optional[bool] = Body( + True, description="Whether to include column headers" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> FileResponse: + """Export documents as a downloadable CSV file.""" + + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can export data.", + 403, + ) + + ( + csv_file_path, + temp_file, + ) = await self.services.management.export_document_relationships( + id=id, + columns=columns, + filters=filters, + include_header=include_header + if include_header is not None + else True, + ) + + background_tasks.add_task(temp_file.close) + + return FileResponse( + path=csv_file_path, + media_type="text/csv", + filename="documents_export.csv", + ) + + @self.router.post( + "/documents/search", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Search document summaries", + ) + @self.base_endpoint + async def search_documents( + query: str = Body( + ..., + description="The search query to perform.", + ), + search_mode: SearchMode = Body( + default=SearchMode.custom, + description=( + "Default value of `custom` allows full control over search settings.\n\n" + "Pre-configured search modes:\n" + "`basic`: A simple semantic-based search.\n" + "`advanced`: A more powerful hybrid search combining semantic and full-text.\n" + "`custom`: Full control via `search_settings`.\n\n" + "If `filters` or `limit` are provided alongside `basic` or `advanced`, " + "they will override the default settings for that mode." + ), + ), + search_settings: SearchSettings = Body( + default_factory=SearchSettings, + description="Settings for document search", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedDocumentSearchResponse: + """Perform a search query on the automatically generated document + summaries in the system. + + This endpoint allows for complex filtering of search results using PostgreSQL-based queries. + Filters can be applied to various fields such as document_id, and internal metadata values. + + + Allowed operators include `eq`, `neq`, `gt`, `gte`, `lt`, `lte`, `like`, `ilike`, `in`, and `nin`. + """ + effective_settings = self._prepare_search_settings( + auth_user, search_mode, search_settings + ) + + query_embedding = ( + await self.providers.embedding.async_get_embedding(query) + ) + results = await self.services.retrieval.search_documents( + query=query, + query_embedding=query_embedding, + settings=effective_settings, + ) + return results # type: ignore + + @staticmethod + async def _process_file(file): + import base64 + + content = await file.read() + + return { + "filename": file.filename, + "content": base64.b64encode(content).decode("utf-8"), + "content_type": file.content_type, + "content_length": len(content), + } diff --git a/.venv/lib/python3.12/site-packages/core/main/api/v3/examples.py b/.venv/lib/python3.12/site-packages/core/main/api/v3/examples.py new file mode 100644 index 00000000..ba588c3b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/main/api/v3/examples.py @@ -0,0 +1,1065 @@ +import textwrap + +""" +This file contains updated OpenAPI examples for the RetrievalRouterV3 class. +These examples are designed to be included in the openapi_extra field for each route. +""" + +# Updated examples for search_app endpoint +search_app_examples = { + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent( + """ + from r2r import R2RClient + + client = R2RClient() + # if using auth, do client.login(...) + + # Basic search + response = client.retrieval.search( + query="What is DeepSeek R1?", + ) + + # Advanced mode with specific filters + response = client.retrieval.search( + query="What is DeepSeek R1?", + search_mode="advanced", + search_settings={ + "filters": {"document_id": {"$eq": "e43864f5-a36f-548e-aacd-6f8d48b30c7f"}}, + "limit": 5 + } + ) + + # Using hybrid search + response = client.retrieval.search( + query="What was Uber's profit in 2020?", + search_settings={ + "use_hybrid_search": True, + "hybrid_settings": { + "full_text_weight": 1.0, + "semantic_weight": 5.0, + "full_text_limit": 200, + "rrf_k": 50 + }, + "filters": {"title": {"$in": ["DeepSeek_R1.pdf"]}}, + } + ) + + # Advanced filtering + results = client.retrieval.search( + query="What are the effects of climate change?", + search_settings={ + "filters": { + "$and":[ + {"document_type": {"$eq": "pdf"}}, + {"metadata.year": {"$gt": 2020}} + ] + }, + "limit": 10 + } + ) + + # Knowledge graph enhanced search + results = client.retrieval.search( + query="What was DeepSeek R1", + graph_search_settings={ + "use_graph_search": True, + "kg_search_type": "local" + } + ) + """ + ), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent( + """ + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + // if using auth, do client.login(...) + + // Basic search + const response = await client.retrieval.search({ + query: "What is DeepSeek R1?", + }); + + // With specific filters + const filteredResponse = await client.retrieval.search({ + query: "What is DeepSeek R1?", + searchSettings: { + filters: {"document_id": {"$eq": "e43864f5-a36f-548e-aacd-6f8d48b30c7f"}}, + limit: 5 + } + }); + + // Using hybrid search + const hybridResponse = await client.retrieval.search({ + query: "What was Uber's profit in 2020?", + searchSettings: { + indexMeasure: "l2_distance", + useHybridSearch: true, + hybridSettings: { + fullTextWeight: 1.0, + semanticWeight: 5.0, + fullTextLimit: 200, + }, + filters: {"title": {"$in": ["DeepSeek_R1.pdf"]}}, + } + }); + + // Advanced filtering + const advancedResults = await client.retrieval.search({ + query: "What are the effects of climate change?", + searchSettings: { + filters: { + $and: [ + {document_type: {$eq: "pdf"}}, + {"metadata.year": {$gt: 2020}} + ] + }, + limit: 10 + } + }); + + // Knowledge graph enhanced search + const kgResults = await client.retrieval.search({ + query: "who was aristotle?", + graphSearchSettings: { + useKgSearch: true, + kgSearchType: "local" + } + }); + """ + ), + }, + { + "lang": "Shell", + "source": textwrap.dedent( + """ + # Basic search + curl -X POST "https://api.sciphi.ai/v3/retrieval/search" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "query": "What is DeepSeek R1?" + }' + + # With hybrid search and filters + curl -X POST "https://api.sciphi.ai/v3/retrieval/search" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "query": "What was Uber'\''s profit in 2020?", + "search_settings": { + "use_hybrid_search": true, + "hybrid_settings": { + "full_text_weight": 1.0, + "semantic_weight": 5.0, + "full_text_limit": 200, + "rrf_k": 50 + }, + "filters": {"title": {"$in": ["DeepSeek_R1.pdf"]}}, + "limit": 10, + "chunk_settings": { + "index_measure": "l2_distance", + "probes": 25, + "ef_search": 100 + } + } + }' + + # Knowledge graph enhanced search + curl -X POST "https://api.sciphi.ai/v3/retrieval/search" \\ + -H "Content-Type: application/json" \\ + -d '{ + "query": "who was aristotle?", + "graph_search_settings": { + "use_graph_search": true, + "kg_search_type": "local" + } + }' \\ + -H "Authorization: Bearer YOUR_API_KEY" + """ + ), + }, + ] +} + +# Updated examples for rag_app endpoint +rag_app_examples = { + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent( + """ + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + # Basic RAG request + response = client.retrieval.rag( + query="What is DeepSeek R1?", + ) + + # Advanced RAG with custom search settings + response = client.retrieval.rag( + query="What is DeepSeek R1?", + search_settings={ + "use_semantic_search": True, + "filters": {"document_id": {"$eq": "e43864f5-a36f-548e-aacd-6f8d48b30c7f"}}, + "limit": 10, + }, + rag_generation_config={ + "stream": False, + "temperature": 0.7, + "max_tokens": 1500 + } + ) + + # Hybrid search in RAG + results = client.retrieval.rag( + "Who is Jon Snow?", + search_settings={"use_hybrid_search": True} + ) + + # Custom model selection + response = client.retrieval.rag( + "Who was Aristotle?", + rag_generation_config={"model":"anthropic/claude-3-haiku-20240307", "stream": True} + ) + for chunk in response: + print(chunk) + + # Streaming RAG + from r2r import ( + CitationEvent, + FinalAnswerEvent, + MessageEvent, + SearchResultsEvent, + R2RClient, + ) + + result_stream = client.retrieval.rag( + query="What is DeepSeek R1?", + search_settings={"limit": 25}, + rag_generation_config={"stream": True}, + ) + + # Process different event types + for event in result_stream: + if isinstance(event, SearchResultsEvent): + print("Search results:", event.data) + elif isinstance(event, MessageEvent): + print("Partial message:", event.data.delta) + elif isinstance(event, CitationEvent): + print("New citation detected:", event.data.id) + elif isinstance(event, FinalAnswerEvent): + print("Final answer:", event.data.generated_answer) + """ + ), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent( + """ + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + // when using auth, do client.login(...) + + // Basic RAG request + const response = await client.retrieval.rag({ + query: "What is DeepSeek R1?", + }); + + // RAG with custom settings + const advancedResponse = await client.retrieval.rag({ + query: "What is DeepSeek R1?", + searchSettings: { + useSemanticSearch: true, + filters: {"document_id": {"$eq": "e43864f5-a36f-548e-aacd-6f8d48b30c7f"}}, + limit: 10, + }, + ragGenerationConfig: { + stream: false, + temperature: 0.7, + maxTokens: 1500 + } + }); + + // Hybrid search in RAG + const hybridResults = await client.retrieval.rag({ + query: "Who is Jon Snow?", + searchSettings: { + useHybridSearch: true + }, + }); + + // Custom model + const customModelResponse = await client.retrieval.rag({ + query: "Who was Aristotle?", + ragGenerationConfig: { + model: 'anthropic/claude-3-haiku-20240307', + temperature: 0.7, + } + }); + + // Streaming RAG + const resultStream = await client.retrieval.rag({ + query: "What is DeepSeek R1?", + searchSettings: { limit: 25 }, + ragGenerationConfig: { stream: true }, + }); + + // Process streaming events + if (Symbol.asyncIterator in resultStream) { + for await (const event of resultStream) { + switch (event.event) { + case "search_results": + console.log("Search results:", event.data); + break; + case "message": + console.log("Partial message delta:", event.data.delta); + break; + case "citation": + console.log("New citation event:", event.data.id); + break; + case "final_answer": + console.log("Final answer:", event.data.generated_answer); + break; + default: + console.log("Unknown or unhandled event:", event); + } + } + } + """ + ), + }, + { + "lang": "Shell", + "source": textwrap.dedent( + """ + # Basic RAG request + curl -X POST "https://api.sciphi.ai/v3/retrieval/rag" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "query": "What is DeepSeek R1?" + }' + + # RAG with custom settings + curl -X POST "https://api.sciphi.ai/v3/retrieval/rag" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "query": "What is DeepSeek R1?", + "search_settings": { + "use_semantic_search": true, + "filters": {"document_id": {"$eq": "e43864f5-a36f-548e-aacd-6f8d48b30c7f"}}, + "limit": 10 + }, + "rag_generation_config": { + "stream": false, + "temperature": 0.7, + "max_tokens": 1500 + } + }' + + # Hybrid search in RAG + curl -X POST "https://api.sciphi.ai/v3/retrieval/rag" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "query": "Who is Jon Snow?", + "search_settings": { + "use_hybrid_search": true, + "filters": {}, + "limit": 10 + } + }' + + # Custom model + curl -X POST "https://api.sciphi.ai/v3/retrieval/rag" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "query": "Who is Jon Snow?", + "rag_generation_config": { + "model": "anthropic/claude-3-haiku-20240307", + "temperature": 0.7 + } + }' + """ + ), + }, + ] +} + +# Updated examples for agent_app endpoint +agent_app_examples = { + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent( + """ +from r2r import ( + R2RClient, + ThinkingEvent, + ToolCallEvent, + ToolResultEvent, + CitationEvent, + FinalAnswerEvent, + MessageEvent, +) + +client = R2RClient() +# when using auth, do client.login(...) + +# Basic synchronous request +response = client.retrieval.agent( + message={ + "role": "user", + "content": "Do a deep analysis of the philosophical implications of DeepSeek R1" + }, + rag_tools=["web_search", "web_scrape", "search_file_descriptions", "search_file_knowledge", "get_file_content"], +) + +# Advanced analysis with streaming and extended thinking +streaming_response = client.retrieval.agent( + message={ + "role": "user", + "content": "Do a deep analysis of the philosophical implications of DeepSeek R1" + }, + search_settings={"limit": 20}, + rag_tools=["web_search", "web_scrape", "search_file_descriptions", "search_file_knowledge", "get_file_content"], + rag_generation_config={ + "model": "anthropic/claude-3-7-sonnet-20250219", + "extended_thinking": True, + "thinking_budget": 4096, + "temperature": 1, + "top_p": None, + "max_tokens": 16000, + "stream": True + } +) + +# Process streaming events with emoji only on type change +current_event_type = None +for event in streaming_response: + # Check if the event type has changed + event_type = type(event) + if event_type != current_event_type: + current_event_type = event_type + print() # Add newline before new event type + + # Print emoji based on the new event type + if isinstance(event, ThinkingEvent): + print(f"\n🧠Thinking: ", end="", flush=True) + elif isinstance(event, ToolCallEvent): + print(f"\n🔧 Tool call: ", end="", flush=True) + elif isinstance(event, ToolResultEvent): + print(f"\n📊 Tool result: ", end="", flush=True) + elif isinstance(event, CitationEvent): + print(f"\n📑 Citation: ", end="", flush=True) + elif isinstance(event, MessageEvent): + print(f"\n💬 Message: ", end="", flush=True) + elif isinstance(event, FinalAnswerEvent): + print(f"\n✅ Final answer: ", end="", flush=True) + + # Print the content without the emoji + if isinstance(event, ThinkingEvent): + print(f"{event.data.delta.content[0].payload.value}", end="", flush=True) + elif isinstance(event, ToolCallEvent): + print(f"{event.data.name}({event.data.arguments})") + elif isinstance(event, ToolResultEvent): + print(f"{event.data.content[:60]}...") + elif isinstance(event, CitationEvent): + print(f"{event.data.id}") + elif isinstance(event, MessageEvent): + print(f"{event.data.delta.content[0].payload.value}", end="", flush=True) + elif isinstance(event, FinalAnswerEvent): + print(f"{event.data.generated_answer[:100]}...") + print(f" Citations: {len(event.data.citations)} sources referenced") + +# Conversation with multiple turns (synchronous) +conversation = client.conversations.create() + +# First message in conversation +results_1 = client.retrieval.agent( + query="What does DeepSeek R1 imply for the future of AI?", + rag_generation_config={ + "model": "anthropic/claude-3-7-sonnet-20250219", + "extended_thinking": True, + "thinking_budget": 4096, + "temperature": 1, + "top_p": None, + "max_tokens": 16000, + "stream": True + }, + conversation_id=conversation.results.id +) + +# Follow-up query in the same conversation +results_2 = client.retrieval.agent( + query="How does it compare to other reasoning models?", + rag_generation_config={ + "model": "anthropic/claude-3-7-sonnet-20250219", + "extended_thinking": True, + "thinking_budget": 4096, + "temperature": 1, + "top_p": None, + "max_tokens": 16000, + "stream": True + }, + conversation_id=conversation.results.id +) + +# Access the final results +print(f"First response: {results_1.generated_answer[:100]}...") +print(f"Follow-up response: {results_2.generated_answer[:100]}...") +""" + ), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent( + """ + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + // when using auth, do client.login(...) + + async function main() { + // Basic synchronous request + const ragResponse = await client.retrieval.agent({ + message: { + role: "user", + content: "Do a deep analysis of the philosophical implications of DeepSeek R1" + }, + ragTools: ["web_search", "web_scrape", "search_file_descriptions", "search_file_knowledge", "get_file_content"] + }); + + // Advanced analysis with streaming and extended thinking + const streamingResponse = await client.retrieval.agent({ + message: { + role: "user", + content: "Do a deep analysis of the philosophical implications of DeepSeek R1" + }, + searchSettings: {limit: 20}, + ragTools: ["web_search", "web_scrape", "search_file_descriptions", "search_file_knowledge", "get_file_content"], + ragGenerationConfig: { + model: "anthropic/claude-3-7-sonnet-20250219", + extendedThinking: true, + thinkingBudget: 4096, + temperature: 1, + maxTokens: 16000, + stream: true + } + }); + + // Process streaming events with emoji only on type change + if (Symbol.asyncIterator in streamingResponse) { + let currentEventType = null; + + for await (const event of streamingResponse) { + // Check if event type has changed + const eventType = event.event; + if (eventType !== currentEventType) { + currentEventType = eventType; + console.log(); // Add newline before new event type + + // Print emoji based on the new event type + switch(eventType) { + case "thinking": + process.stdout.write(`🧠Thinking: `); + break; + case "tool_call": + process.stdout.write(`🔧 Tool call: `); + break; + case "tool_result": + process.stdout.write(`📊 Tool result: `); + break; + case "citation": + process.stdout.write(`📑 Citation: `); + break; + case "message": + process.stdout.write(`💬 Message: `); + break; + case "final_answer": + process.stdout.write(`✅ Final answer: `); + break; + } + } + + // Print content based on event type + switch(eventType) { + case "thinking": + process.stdout.write(`${event.data.delta.content[0].payload.value}`); + break; + case "tool_call": + console.log(`${event.data.name}(${JSON.stringify(event.data.arguments)})`); + break; + case "tool_result": + console.log(`${event.data.content.substring(0, 60)}...`); + break; + case "citation": + console.log(`${event.data.id}`); + break; + case "message": + process.stdout.write(`${event.data.delta.content[0].payload.value}`); + break; + case "final_answer": + console.log(`${event.data.generated_answer.substring(0, 100)}...`); + console.log(` Citations: ${event.data.citations.length} sources referenced`); + break; + } + } + } + + // Conversation with multiple turns (synchronous) + const conversation = await client.conversations.create(); + + // First message in conversation + const results1 = await client.retrieval.agent({ + query: "What does DeepSeek R1 imply for the future of AI?", + ragGenerationConfig: { + model: "anthropic/claude-3-7-sonnet-20250219", + extendedThinking: true, + thinkingBudget: 4096, + temperature: 1, + maxTokens: 16000, + stream: true + }, + conversationId: conversation.results.id + }); + + // Follow-up query in the same conversation + const results2 = await client.retrieval.agent({ + query: "How does it compare to other reasoning models?", + ragGenerationConfig: { + model: "anthropic/claude-3-7-sonnet-20250219", + extendedThinking: true, + thinkingBudget: 4096, + temperature: 1, + maxTokens: 16000, + stream: true + }, + conversationId: conversation.results.id + }); + + // Log the results + console.log(`First response: ${results1.generated_answer.substring(0, 100)}...`); + console.log(`Follow-up response: ${results2.generated_answer.substring(0, 100)}...`); + } + + main(); + """ + ), + }, + { + "lang": "Shell", + "source": textwrap.dedent( + """ + # Basic request + curl -X POST "https://api.sciphi.ai/v3/retrieval/agent" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "message": { + "role": "user", + "content": "What were the key contributions of Aristotle to logic?" + }, + "search_settings": { + "use_semantic_search": true, + "filters": {"document_id": {"$eq": "e43864f5-a36f-548e-aacd-6f8d48b30c7f"}} + }, + "rag_tools": ["search_file_knowledge", "content", "web_search"] + }' + + # Advanced analysis with extended thinking + curl -X POST "https://api.sciphi.ai/v3/retrieval/agent" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "message": { + "role": "user", + "content": "Do a deep analysis of the philosophical implications of DeepSeek R1" + }, + "search_settings": {"limit": 20}, + "research_tools": ["rag", "reasoning", "critique", "python_executor"], + "rag_generation_config": { + "model": "anthropic/claude-3-7-sonnet-20250219", + "extended_thinking": true, + "thinking_budget": 4096, + "temperature": 1, + "top_p": null, + "max_tokens": 16000, + "stream": true + } + }' + + # Conversation continuation + curl -X POST "https://api.sciphi.ai/v3/retrieval/agent" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "message": { + "role": "user", + "content": "How does it compare to other reasoning models?" + }, + "conversation_id": "YOUR_CONVERSATION_ID" + }' + """ + ), + }, + ] +} + +# Updated examples for completion endpoint +completion_examples = { + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent( + """ + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.completion( + messages=[ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "What is the capital of France?"}, + {"role": "assistant", "content": "The capital of France is Paris."}, + {"role": "user", "content": "What about Italy?"} + ], + generation_config={ + "model": "openai/gpt-4o-mini", + "temperature": 0.7, + "max_tokens": 150, + "stream": False + } + ) + """ + ), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent( + """ + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + // when using auth, do client.login(...) + + async function main() { + const response = await client.completion({ + messages: [ + { role: "system", content: "You are a helpful assistant." }, + { role: "user", content: "What is the capital of France?" }, + { role: "assistant", content: "The capital of France is Paris." }, + { role: "user", content: "What about Italy?" } + ], + generationConfig: { + model: "openai/gpt-4o-mini", + temperature: 0.7, + maxTokens: 150, + stream: false + } + }); + } + + main(); + """ + ), + }, + { + "lang": "Shell", + "source": textwrap.dedent( + """ + curl -X POST "https://api.sciphi.ai/v3/retrieval/completion" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "messages": [ + {"role": "system", "content": "You are a helpful assistant."}, + {"role": "user", "content": "What is the capital of France?"}, + {"role": "assistant", "content": "The capital of France is Paris."}, + {"role": "user", "content": "What about Italy?"} + ], + "generation_config": { + "model": "openai/gpt-4o-mini", + "temperature": 0.7, + "max_tokens": 150, + "stream": false + } + }' + """ + ), + }, + ] +} + +# Updated examples for embedding endpoint +embedding_examples = { + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent( + """ + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.retrieval.embedding( + text="What is DeepSeek R1?", + ) + """ + ), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent( + """ + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + // when using auth, do client.login(...) + + async function main() { + const response = await client.retrieval.embedding({ + text: "What is DeepSeek R1?", + }); + } + + main(); + """ + ), + }, + { + "lang": "Shell", + "source": textwrap.dedent( + """ + curl -X POST "https://api.sciphi.ai/v3/retrieval/embedding" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "text": "What is DeepSeek R1?", + }' + """ + ), + }, + ] +} + +# Updated rag_app docstring +rag_app_docstring = """ +Execute a RAG (Retrieval-Augmented Generation) query. + +This endpoint combines search results with language model generation to produce accurate, +contextually-relevant responses based on your document corpus. + +**Features:** +- Combines vector search, optional knowledge graph integration, and LLM generation +- Automatically cites sources with unique citation identifiers +- Supports both streaming and non-streaming responses +- Compatible with various LLM providers (OpenAI, Anthropic, etc.) +- Web search integration for up-to-date information + +**Search Configuration:** +All search parameters from the search endpoint apply here, including filters, hybrid search, and graph-enhanced search. + +**Generation Configuration:** +Fine-tune the language model's behavior with `rag_generation_config`: +```json +{ + "model": "openai/gpt-4o-mini", // Model to use + "temperature": 0.7, // Control randomness (0-1) + "max_tokens": 1500, // Maximum output length + "stream": true // Enable token streaming +} +``` + +**Model Support:** +- OpenAI models (default) +- Anthropic Claude models (requires ANTHROPIC_API_KEY) +- Local models via Ollama +- Any provider supported by LiteLLM + +**Streaming Responses:** +When `stream: true` is set, the endpoint returns Server-Sent Events with the following types: +- `search_results`: Initial search results from your documents +- `message`: Partial tokens as they're generated +- `citation`: Citation metadata when sources are referenced +- `final_answer`: Complete answer with structured citations + +**Example Response:** +```json +{ + "generated_answer": "DeepSeek-R1 is a model that demonstrates impressive performance...[1]", + "search_results": { ... }, + "citations": [ + { + "id": "cit.123456", + "object": "citation", + "payload": { ... } + } + ] +} +``` +""" + +# Updated agent_app docstring +agent_app_docstring = """ +Engage with an intelligent agent for information retrieval, analysis, and research. + +This endpoint offers two operating modes: +- **RAG mode**: Standard retrieval-augmented generation for answering questions based on knowledge base +- **Research mode**: Advanced capabilities for deep analysis, reasoning, and computation + +### RAG Mode (Default) + +The RAG mode provides fast, knowledge-based responses using: +- Semantic and hybrid search capabilities +- Document-level and chunk-level content retrieval +- Optional web search integration +- Source citation and evidence-based responses + +### Research Mode + +The Research mode builds on RAG capabilities and adds: +- A dedicated reasoning system for complex problem-solving +- Critique capabilities to identify potential biases or logical fallacies +- Python execution for computational analysis +- Multi-step reasoning for deeper exploration of topics + +### Available Tools + +**RAG Tools:** +- `search_file_knowledge`: Semantic/hybrid search on your ingested documents +- `search_file_descriptions`: Search over file-level metadata +- `content`: Fetch entire documents or chunk structures +- `web_search`: Query external search APIs for up-to-date information +- `web_scrape`: Scrape and extract content from specific web pages + +**Research Tools:** +- `rag`: Leverage the underlying RAG agent for information retrieval +- `reasoning`: Call a dedicated model for complex analytical thinking +- `critique`: Analyze conversation history to identify flaws and biases +- `python_executor`: Execute Python code for complex calculations and analysis + +### Streaming Output + +When streaming is enabled, the agent produces different event types: +- `thinking`: Shows the model's step-by-step reasoning (when extended_thinking=true) +- `tool_call`: Shows when the agent invokes a tool +- `tool_result`: Shows the result of a tool call +- `citation`: Indicates when a citation is added to the response +- `message`: Streams partial tokens of the response +- `final_answer`: Contains the complete generated answer and structured citations + +### Conversations + +Maintain context across multiple turns by including `conversation_id` in each request. +After your first call, store the returned `conversation_id` and include it in subsequent calls. +""" + +# Updated completion_docstring +completion_docstring = """ +Generate completions for a list of messages. + +This endpoint uses the language model to generate completions for the provided messages. +The generation process can be customized using the generation_config parameter. + +The messages list should contain alternating user and assistant messages, with an optional +system message at the start. Each message should have a 'role' and 'content'. + +**Generation Configuration:** +Fine-tune the language model's behavior with `generation_config`: +```json +{ + "model": "openai/gpt-4o-mini", // Model to use + "temperature": 0.7, // Control randomness (0-1) + "max_tokens": 1500, // Maximum output length + "stream": true // Enable token streaming +} +``` + +**Multiple LLM Support:** +- OpenAI models (default) +- Anthropic Claude models (requires ANTHROPIC_API_KEY) +- Local models via Ollama +- Any provider supported by LiteLLM +""" + +# Updated embedding_docstring +embedding_docstring = """ +Generate embeddings for the provided text using the specified model. + +This endpoint uses the language model to generate embeddings for the provided text. +The model parameter specifies the model to use for generating embeddings. + +Embeddings are numerical representations of text that capture semantic meaning, +allowing for similarity comparisons and other vector operations. + +**Uses:** +- Semantic search +- Document clustering +- Text similarity analysis +- Content recommendation +""" + +# # Example implementation to update the routers in the RetrievalRouterV3 class +# def update_retrieval_router(router_class): +# """ +# Update the RetrievalRouterV3 class with the improved docstrings and examples. + +# This function demonstrates how the updated examples and docstrings would be +# integrated into the actual router class. +# """ +# # Update search_app endpoint +# router_class.search_app.__doc__ = search_app_docstring +# router_class.search_app.openapi_extra = search_app_examples + +# # Update rag_app endpoint +# router_class.rag_app.__doc__ = rag_app_docstring +# router_class.rag_app.openapi_extra = rag_app_examples + +# # Update agent_app endpoint +# router_class.agent_app.__doc__ = agent_app_docstring +# router_class.agent_app.openapi_extra = agent_app_examples + +# # Update completion endpoint +# router_class.completion.__doc__ = completion_docstring +# router_class.completion.openapi_extra = completion_examples + +# # Update embedding endpoint +# router_class.embedding.__doc__ = embedding_docstring +# router_class.embedding.openapi_extra = embedding_examples + +# return router_class + +# Example showing how the updated router would be integrated +""" +from your_module import RetrievalRouterV3 + +# Apply the updated docstrings and examples +router = RetrievalRouterV3(providers, services, config) +router = update_retrieval_router(router) + +# Now the router has the improved docstrings and examples +""" + +EXAMPLES = { + "search": search_app_examples, + "rag": rag_app_examples, + "agent": agent_app_examples, + "completion": completion_examples, + "embedding": embedding_examples, +} diff --git a/.venv/lib/python3.12/site-packages/core/main/api/v3/graph_router.py b/.venv/lib/python3.12/site-packages/core/main/api/v3/graph_router.py new file mode 100644 index 00000000..244d76cf --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/main/api/v3/graph_router.py @@ -0,0 +1,2051 @@ +import logging +import textwrap +from typing import Optional, cast +from uuid import UUID + +from fastapi import Body, Depends, Path, Query +from fastapi.background import BackgroundTasks +from fastapi.responses import FileResponse + +from core.base import GraphConstructionStatus, R2RException, Workflow +from core.base.abstractions import DocumentResponse, StoreType +from core.base.api.models import ( + GenericBooleanResponse, + GenericMessageResponse, + WrappedBooleanResponse, + WrappedCommunitiesResponse, + WrappedCommunityResponse, + WrappedEntitiesResponse, + WrappedEntityResponse, + WrappedGenericMessageResponse, + WrappedGraphResponse, + WrappedGraphsResponse, + WrappedRelationshipResponse, + WrappedRelationshipsResponse, +) +from core.utils import ( + generate_default_user_collection_id, + update_settings_from_dict, +) + +from ...abstractions import R2RProviders, R2RServices +from ...config import R2RConfig +from .base_router import BaseRouterV3 + +logger = logging.getLogger() + + +class GraphRouter(BaseRouterV3): + def __init__( + self, + providers: R2RProviders, + services: R2RServices, + config: R2RConfig, + ): + logging.info("Initializing GraphRouter") + super().__init__(providers, services, config) + self._register_workflows() + + def _register_workflows(self): + workflow_messages = {} + if self.providers.orchestration.config.provider == "hatchet": + workflow_messages["graph-extraction"] = ( + "Document extraction task queued successfully." + ) + workflow_messages["graph-clustering"] = ( + "Graph enrichment task queued successfully." + ) + workflow_messages["graph-deduplication"] = ( + "Entity deduplication task queued successfully." + ) + else: + workflow_messages["graph-extraction"] = ( + "Document entities and relationships extracted successfully." + ) + workflow_messages["graph-clustering"] = ( + "Graph communities created successfully." + ) + workflow_messages["graph-deduplication"] = ( + "Entity deduplication completed successfully." + ) + + self.providers.orchestration.register_workflows( + Workflow.GRAPH, + self.services.graph, + workflow_messages, + ) + + async def _get_collection_id( + self, collection_id: Optional[UUID], auth_user + ) -> UUID: + """Helper method to get collection ID, using default if none + provided.""" + if collection_id is None: + return generate_default_user_collection_id(auth_user.id) + return collection_id + + def _setup_routes(self): + @self.router.get( + "/graphs", + dependencies=[Depends(self.rate_limit_dependency)], + summary="List graphs", + openapi_extra={ + "x-codeSamples": [ + { # TODO: Verify + "lang": "Python", + "source": textwrap.dedent( + """ + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.list() + """ + ), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent( + """ + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.list({}); + } + + main(); + """ + ), + }, + ] + }, + ) + @self.base_endpoint + async def list_graphs( + collection_ids: list[str] = Query( + [], + description="A list of graph IDs to retrieve. If not provided, all graphs will be returned.", + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGraphsResponse: + """Returns a paginated list of graphs the authenticated user has + access to. + + Results can be filtered by providing specific graph IDs. Regular + users will only see graphs they own or have access to. Superusers + can see all graphs. + + The graphs are returned in order of last modification, with most + recent first. + """ + requesting_user_id = ( + None if auth_user.is_superuser else [auth_user.id] + ) + + graph_uuids = [UUID(graph_id) for graph_id in collection_ids] + + list_graphs_response = await self.services.graph.list_graphs( + # user_ids=requesting_user_id, + graph_ids=graph_uuids, + offset=offset, + limit=limit, + ) + + return ( # type: ignore + list_graphs_response["results"], + {"total_entries": list_graphs_response["total_entries"]}, + ) + + @self.router.get( + "/graphs/{collection_id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Retrieve graph details", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.get( + collection_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + )"""), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.retrieve({ + collectionId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/graphs/d09dedb1-b2ab-48a5-b950-6e1f464d83e7" \\ + -H "Authorization: Bearer YOUR_API_KEY" """), + }, + ] + }, + ) + @self.base_endpoint + async def get_graph( + collection_id: UUID = Path(...), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGraphResponse: + """Retrieves detailed information about a specific graph by ID.""" + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the specified collection associated with the given graph.", + 403, + ) + + list_graphs_response = await self.services.graph.list_graphs( + # user_ids=None, + graph_ids=[collection_id], + offset=0, + limit=1, + ) + return list_graphs_response["results"][0] # type: ignore + + @self.router.post( + "/graphs/{collection_id}/communities/build", + dependencies=[Depends(self.rate_limit_dependency)], + ) + @self.base_endpoint + async def build_communities( + collection_id: UUID = Path( + ..., description="The unique identifier of the collection" + ), + graph_enrichment_settings: Optional[dict] = Body( + default=None, + description="Settings for the graph enrichment process.", + ), + run_with_orchestration: Optional[bool] = Body(True), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGenericMessageResponse: + """Creates communities in the graph by analyzing entity + relationships and similarities. + + Communities are created through the following process: + 1. Analyzes entity relationships and metadata to build a similarity graph + 2. Applies advanced community detection algorithms (e.g. Leiden) to identify densely connected groups + 3. Creates hierarchical community structure with multiple granularity levels + 4. Generates natural language summaries and statistical insights for each community + + The resulting communities can be used to: + - Understand high-level graph structure and organization + - Identify key entity groupings and their relationships + - Navigate and explore the graph at different levels of detail + - Generate insights about entity clusters and their characteristics + + The community detection process is configurable through settings like: + - Community detection algorithm parameters + - Summary generation prompt + """ + collections_overview_response = ( + await self.services.management.collections_overview( + user_ids=[auth_user.id], + collection_ids=[collection_id], + offset=0, + limit=1, + ) + )["results"] + if len(collections_overview_response) == 0: # type: ignore + raise R2RException("Collection not found.", 404) + + # Check user permissions for graph + if ( + not auth_user.is_superuser + and collections_overview_response[0].owner_id != auth_user.id # type: ignore + ): + raise R2RException( + "Only superusers can `build communities` for a graph they do not own.", + 403, + ) + + # If no collection ID is provided, use the default user collection + # id = generate_default_user_collection_id(auth_user.id) + + # Apply runtime settings overrides + server_graph_enrichment_settings = ( + self.providers.database.config.graph_enrichment_settings + ) + if graph_enrichment_settings: + server_graph_enrichment_settings = update_settings_from_dict( + server_graph_enrichment_settings, graph_enrichment_settings + ) + + workflow_input = { + "collection_id": str(collection_id), + "graph_enrichment_settings": server_graph_enrichment_settings.model_dump_json(), + "user": auth_user.json(), + } + + if run_with_orchestration: + try: + return await self.providers.orchestration.run_workflow( # type: ignore + "graph-clustering", {"request": workflow_input}, {} + ) + return GenericMessageResponse( + message="Graph communities created successfully." + ) # type: ignore + + except Exception as e: # TODO: Need to find specific error (gRPC most likely?) + logger.error( + f"Error running orchestrated community building: {e} \n\nAttempting to run without orchestration." + ) + from core.main.orchestration import ( + simple_graph_search_results_factory, + ) + + logger.info("Running build-communities without orchestration.") + simple_graph_search_results = simple_graph_search_results_factory( + self.services.graph + ) + await simple_graph_search_results["graph-clustering"]( + workflow_input + ) + return { # type: ignore + "message": "Graph communities created successfully.", + "task_id": None, + } + + @self.router.post( + "/graphs/{collection_id}/reset", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Reset a graph back to the initial state.", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.reset( + collection_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + )"""), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.reset({ + collectionId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/graphs/d09dedb1-b2ab-48a5-b950-6e1f464d83e7/reset" \\ + -H "Authorization: Bearer YOUR_API_KEY" """), + }, + ] + }, + ) + @self.base_endpoint + async def reset( + collection_id: UUID = Path(...), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Deletes a graph and all its associated data. + + This endpoint permanently removes the specified graph along with + all entities and relationships that belong to only this graph. The + original source entities and relationships extracted from + underlying documents are not deleted and are managed through the + document lifecycle. + """ + if not auth_user.is_superuser: + raise R2RException("Only superusers can reset a graph", 403) + + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + await self.services.graph.reset_graph(id=collection_id) + # await _pull(collection_id, auth_user) + return GenericBooleanResponse(success=True) # type: ignore + + # update graph + @self.router.post( + "/graphs/{collection_id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Update graph", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.update( + collection_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + graph={ + "name": "New Name", + "description": "New Description" + } + )"""), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.update({ + collection_id: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + name: "New Name", + description: "New Description", + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def update_graph( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph to update", + ), + name: Optional[str] = Body( + None, description="The name of the graph" + ), + description: Optional[str] = Body( + None, description="An optional description of the graph" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGraphResponse: + """Update an existing graphs's configuration. + + This endpoint allows updating the name and description of an + existing collection. The user must have appropriate permissions to + modify the collection. + """ + if not auth_user.is_superuser: + raise R2RException( + "Only superusers can update graph details", 403 + ) + + if ( + not auth_user.is_superuser + and id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + return await self.services.graph.update_graph( # type: ignore + collection_id, + name=name, + description=description, + ) + + @self.router.get( + "/graphs/{collection_id}/entities", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.list_entities(collection_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7") + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.listEntities({ + collection_id: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + }); + } + + main(); + """), + }, + ], + }, + ) + @self.base_endpoint + async def get_entities( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph to list entities from.", + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedEntitiesResponse: + """Lists all entities in the graph with pagination support.""" + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + entities, count = await self.services.graph.get_entities( + parent_id=collection_id, + offset=offset, + limit=limit, + ) + + return entities, { # type: ignore + "total_entries": count, + } + + @self.router.post( + "/graphs/{collection_id}/entities/export", + summary="Export graph entities to CSV", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient("http://localhost:7272") + # when using auth, do client.login(...) + + response = client.graphs.export_entities( + collection_id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + output_path="export.csv", + columns=["id", "title", "created_at"], + include_header=True, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient("http://localhost:7272"); + + function main() { + await client.graphs.exportEntities({ + collectionId: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + outputPath: "export.csv", + columns: ["id", "title", "created_at"], + includeHeader: true, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "http://127.0.0.1:7272/v3/graphs/export_entities" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -H "Content-Type: application/json" \ + -H "Accept: text/csv" \ + -d '{ "columns": ["id", "title", "created_at"], "include_header": true }' \ + --output export.csv + """), + }, + ] + }, + ) + @self.base_endpoint + async def export_entities( + background_tasks: BackgroundTasks, + collection_id: UUID = Path( + ..., + description="The ID of the collection to export entities from.", + ), + columns: Optional[list[str]] = Body( + None, description="Specific columns to export" + ), + filters: Optional[dict] = Body( + None, description="Filters to apply to the export" + ), + include_header: Optional[bool] = Body( + True, description="Whether to include column headers" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> FileResponse: + """Export documents as a downloadable CSV file.""" + + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can export data.", + 403, + ) + + ( + csv_file_path, + temp_file, + ) = await self.services.management.export_graph_entities( + id=collection_id, + columns=columns, + filters=filters, + include_header=include_header + if include_header is not None + else True, + ) + + background_tasks.add_task(temp_file.close) + + return FileResponse( + path=csv_file_path, + media_type="text/csv", + filename="documents_export.csv", + ) + + @self.router.post( + "/graphs/{collection_id}/entities", + dependencies=[Depends(self.rate_limit_dependency)], + ) + @self.base_endpoint + async def create_entity( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph to add the entity to.", + ), + name: str = Body( + ..., description="The name of the entity to create." + ), + description: str = Body( + ..., description="The description of the entity to create." + ), + category: Optional[str] = Body( + None, description="The category of the entity to create." + ), + metadata: Optional[dict] = Body( + None, description="The metadata of the entity to create." + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedEntityResponse: + """Creates a new entity in the graph.""" + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + return await self.services.graph.create_entity( # type: ignore + name=name, + description=description, + parent_id=collection_id, + category=category, + metadata=metadata, + ) + + @self.router.post( + "/graphs/{collection_id}/relationships", + dependencies=[Depends(self.rate_limit_dependency)], + ) + @self.base_endpoint + async def create_relationship( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph to add the relationship to.", + ), + subject: str = Body( + ..., description="The subject of the relationship to create." + ), + subject_id: UUID = Body( + ..., + description="The ID of the subject of the relationship to create.", + ), + predicate: str = Body( + ..., description="The predicate of the relationship to create." + ), + object: str = Body( + ..., description="The object of the relationship to create." + ), + object_id: UUID = Body( + ..., + description="The ID of the object of the relationship to create.", + ), + description: str = Body( + ..., + description="The description of the relationship to create.", + ), + weight: float = Body( + 1.0, description="The weight of the relationship to create." + ), + metadata: Optional[dict] = Body( + None, description="The metadata of the relationship to create." + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedRelationshipResponse: + """Creates a new relationship in the graph.""" + if not auth_user.is_superuser: + raise R2RException( + "Only superusers can create relationships.", 403 + ) + + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + return await self.services.graph.create_relationship( # type: ignore + subject=subject, + subject_id=subject_id, + predicate=predicate, + object=object, + object_id=object_id, + description=description, + weight=weight, + metadata=metadata, + parent_id=collection_id, + ) + + @self.router.post( + "/graphs/{collection_id}/relationships/export", + summary="Export graph relationships to CSV", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient("http://localhost:7272") + # when using auth, do client.login(...) + + response = client.graphs.export_entities( + collection_id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + output_path="export.csv", + columns=["id", "title", "created_at"], + include_header=True, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient("http://localhost:7272"); + + function main() { + await client.graphs.exportEntities({ + collectionId: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + outputPath: "export.csv", + columns: ["id", "title", "created_at"], + includeHeader: true, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "http://127.0.0.1:7272/v3/graphs/export_relationships" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -H "Content-Type: application/json" \ + -H "Accept: text/csv" \ + -d '{ "columns": ["id", "title", "created_at"], "include_header": true }' \ + --output export.csv + """), + }, + ] + }, + ) + @self.base_endpoint + async def export_relationships( + background_tasks: BackgroundTasks, + collection_id: UUID = Path( + ..., + description="The ID of the document to export entities from.", + ), + columns: Optional[list[str]] = Body( + None, description="Specific columns to export" + ), + filters: Optional[dict] = Body( + None, description="Filters to apply to the export" + ), + include_header: Optional[bool] = Body( + True, description="Whether to include column headers" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> FileResponse: + """Export documents as a downloadable CSV file.""" + + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can export data.", + 403, + ) + + ( + csv_file_path, + temp_file, + ) = await self.services.management.export_graph_relationships( + id=collection_id, + columns=columns, + filters=filters, + include_header=include_header + if include_header is not None + else True, + ) + + background_tasks.add_task(temp_file.close) + + return FileResponse( + path=csv_file_path, + media_type="text/csv", + filename="documents_export.csv", + ) + + @self.router.get( + "/graphs/{collection_id}/entities/{entity_id}", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.get_entity( + collection_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + entity_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.get_entity({ + collectionId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + entityId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_entity( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph containing the entity.", + ), + entity_id: UUID = Path( + ..., description="The ID of the entity to retrieve." + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedEntityResponse: + """Retrieves a specific entity by its ID.""" + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + result = await self.providers.database.graphs_handler.entities.get( + parent_id=collection_id, + store_type=StoreType.GRAPHS, + offset=0, + limit=1, + entity_ids=[entity_id], + ) + if len(result) == 0 or len(result[0]) == 0: + raise R2RException("Entity not found", 404) + return result[0][0] + + @self.router.post( + "/graphs/{collection_id}/entities/{entity_id}", + dependencies=[Depends(self.rate_limit_dependency)], + ) + @self.base_endpoint + async def update_entity( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph containing the entity.", + ), + entity_id: UUID = Path( + ..., description="The ID of the entity to update." + ), + name: Optional[str] = Body( + ..., description="The updated name of the entity." + ), + description: Optional[str] = Body( + None, description="The updated description of the entity." + ), + category: Optional[str] = Body( + None, description="The updated category of the entity." + ), + metadata: Optional[dict] = Body( + None, description="The updated metadata of the entity." + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedEntityResponse: + """Updates an existing entity in the graph.""" + if not auth_user.is_superuser: + raise R2RException( + "Only superusers can update graph entities.", 403 + ) + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + return await self.services.graph.update_entity( # type: ignore + entity_id=entity_id, + name=name, + category=category, + description=description, + metadata=metadata, + ) + + @self.router.delete( + "/graphs/{collection_id}/entities/{entity_id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Remove an entity", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.remove_entity( + collection_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + entity_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.removeEntity({ + collectionId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + entityId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def delete_entity( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph to remove the entity from.", + ), + entity_id: UUID = Path( + ..., + description="The ID of the entity to remove from the graph.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Removes an entity from the graph.""" + if not auth_user.is_superuser: + raise R2RException( + "Only superusers can delete graph details.", 403 + ) + + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + await self.services.graph.delete_entity( + parent_id=collection_id, + entity_id=entity_id, + ) + + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.get( + "/graphs/{collection_id}/relationships", + dependencies=[Depends(self.rate_limit_dependency)], + description="Lists all relationships in the graph with pagination support.", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.list_relationships(collection_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7") + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.listRelationships({ + collectionId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + }); + } + + main(); + """), + }, + ], + }, + ) + @self.base_endpoint + async def get_relationships( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph to list relationships from.", + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedRelationshipsResponse: + """Lists all relationships in the graph with pagination support.""" + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + relationships, count = await self.services.graph.get_relationships( + parent_id=collection_id, + offset=offset, + limit=limit, + ) + + return relationships, { # type: ignore + "total_entries": count, + } + + @self.router.get( + "/graphs/{collection_id}/relationships/{relationship_id}", + dependencies=[Depends(self.rate_limit_dependency)], + description="Retrieves a specific relationship by its ID.", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.get_relationship( + collection_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + relationship_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.getRelationship({ + collectionId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + relationshipId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + }); + } + + main(); + """), + }, + ], + }, + ) + @self.base_endpoint + async def get_relationship( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph containing the relationship.", + ), + relationship_id: UUID = Path( + ..., description="The ID of the relationship to retrieve." + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedRelationshipResponse: + """Retrieves a specific relationship by its ID.""" + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + results = ( + await self.providers.database.graphs_handler.relationships.get( + parent_id=collection_id, + store_type=StoreType.GRAPHS, + offset=0, + limit=1, + relationship_ids=[relationship_id], + ) + ) + if len(results) == 0 or len(results[0]) == 0: + raise R2RException("Relationship not found", 404) + return results[0][0] + + @self.router.post( + "/graphs/{collection_id}/relationships/{relationship_id}", + dependencies=[Depends(self.rate_limit_dependency)], + ) + @self.base_endpoint + async def update_relationship( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph containing the relationship.", + ), + relationship_id: UUID = Path( + ..., description="The ID of the relationship to update." + ), + subject: Optional[str] = Body( + ..., description="The updated subject of the relationship." + ), + subject_id: Optional[UUID] = Body( + ..., description="The updated subject ID of the relationship." + ), + predicate: Optional[str] = Body( + ..., description="The updated predicate of the relationship." + ), + object: Optional[str] = Body( + ..., description="The updated object of the relationship." + ), + object_id: Optional[UUID] = Body( + ..., description="The updated object ID of the relationship." + ), + description: Optional[str] = Body( + None, + description="The updated description of the relationship.", + ), + weight: Optional[float] = Body( + None, description="The updated weight of the relationship." + ), + metadata: Optional[dict] = Body( + None, description="The updated metadata of the relationship." + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedRelationshipResponse: + """Updates an existing relationship in the graph.""" + if not auth_user.is_superuser: + raise R2RException( + "Only superusers can update graph details", 403 + ) + + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + return await self.services.graph.update_relationship( # type: ignore + relationship_id=relationship_id, + subject=subject, + subject_id=subject_id, + predicate=predicate, + object=object, + object_id=object_id, + description=description, + weight=weight, + metadata=metadata, + ) + + @self.router.delete( + "/graphs/{collection_id}/relationships/{relationship_id}", + dependencies=[Depends(self.rate_limit_dependency)], + description="Removes a relationship from the graph.", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.delete_relationship( + collection_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + relationship_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.deleteRelationship({ + collectionId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + relationshipId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + }); + } + + main(); + """), + }, + ], + }, + ) + @self.base_endpoint + async def delete_relationship( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph to remove the relationship from.", + ), + relationship_id: UUID = Path( + ..., + description="The ID of the relationship to remove from the graph.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Removes a relationship from the graph.""" + if not auth_user.is_superuser: + raise R2RException( + "Only superusers can delete a relationship.", 403 + ) + + if ( + not auth_user.is_superuser + and collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + await self.services.graph.delete_relationship( + parent_id=collection_id, + relationship_id=relationship_id, + ) + + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.post( + "/graphs/{collection_id}/communities", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Create a new community", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.create_community( + collection_id="9fbe403b-c11c-5aae-8ade-ef22980c3ad1", + name="My Community", + summary="A summary of the community", + findings=["Finding 1", "Finding 2"], + rating=5, + rating_explanation="This is a rating explanation", + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.createCommunity({ + collectionId: "9fbe403b-c11c-5aae-8ade-ef22980c3ad1", + name: "My Community", + summary: "A summary of the community", + findings: ["Finding 1", "Finding 2"], + rating: 5, + ratingExplanation: "This is a rating explanation", + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def create_community( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph to create the community in.", + ), + name: str = Body(..., description="The name of the community"), + summary: str = Body(..., description="A summary of the community"), + findings: Optional[list[str]] = Body( + default=[], description="Findings about the community" + ), + rating: Optional[float] = Body( + default=5, ge=1, le=10, description="Rating between 1 and 10" + ), + rating_explanation: Optional[str] = Body( + default="", description="Explanation for the rating" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedCommunityResponse: + """Creates a new community in the graph. + + While communities are typically built automatically via the /graphs/{id}/communities/build endpoint, + this endpoint allows you to manually create your own communities. + + This can be useful when you want to: + - Define custom groupings of entities based on domain knowledge + - Add communities that weren't detected by the automatic process + - Create hierarchical organization structures + - Tag groups of entities with specific metadata + + The created communities will be integrated with any existing automatically detected communities + in the graph's community structure. + """ + if not auth_user.is_superuser: + raise R2RException( + "Only superusers can create a community.", 403 + ) + + if ( + not auth_user.is_superuser + and collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + return await self.services.graph.create_community( # type: ignore + parent_id=collection_id, + name=name, + summary=summary, + findings=findings, + rating=rating, + rating_explanation=rating_explanation, + ) + + @self.router.get( + "/graphs/{collection_id}/communities", + dependencies=[Depends(self.rate_limit_dependency)], + summary="List communities", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.list_communities(collection_id="9fbe403b-c11c-5aae-8ade-ef22980c3ad1") + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.listCommunities({ + collectionId: "9fbe403b-c11c-5aae-8ade-ef22980c3ad1", + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_communities( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph to get communities for.", + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedCommunitiesResponse: + """Lists all communities in the graph with pagination support.""" + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + communities, count = await self.services.graph.get_communities( + parent_id=collection_id, + offset=offset, + limit=limit, + ) + + return communities, { # type: ignore + "total_entries": count, + } + + @self.router.get( + "/graphs/{collection_id}/communities/{community_id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Retrieve a community", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.get_community(collection_id="9fbe403b-c11c-5aae-8ade-ef22980c3ad1") + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.getCommunity({ + collectionId: "9fbe403b-c11c-5aae-8ade-ef22980c3ad1", + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_community( + collection_id: UUID = Path( + ..., + description="The ID of the collection to get communities for.", + ), + community_id: UUID = Path( + ..., + description="The ID of the community to get.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedCommunityResponse: + """Retrieves a specific community by its ID.""" + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + results = ( + await self.providers.database.graphs_handler.communities.get( + parent_id=collection_id, + community_ids=[community_id], + store_type=StoreType.GRAPHS, + offset=0, + limit=1, + ) + ) + if len(results) == 0 or len(results[0]) == 0: + raise R2RException("Community not found", 404) + return results[0][0] + + @self.router.delete( + "/graphs/{collection_id}/communities/{community_id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Delete a community", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.delete_community( + collection_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + community_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.graphs.deleteCommunity({ + collectionId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + communityId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def delete_community( + collection_id: UUID = Path( + ..., + description="The collection ID corresponding to the graph to delete the community from.", + ), + community_id: UUID = Path( + ..., + description="The ID of the community to delete.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + if ( + not auth_user.is_superuser + and collection_id not in auth_user.graph_ids + ): + raise R2RException( + "Only superusers can delete communities", 403 + ) + + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + await self.services.graph.delete_community( + parent_id=collection_id, + community_id=community_id, + ) + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.post( + "/graphs/{collection_id}/communities/export", + summary="Export document communities to CSV", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient("http://localhost:7272") + # when using auth, do client.login(...) + + response = client.graphs.export_communities( + collection_id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + output_path="export.csv", + columns=["id", "title", "created_at"], + include_header=True, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient("http://localhost:7272"); + + function main() { + await client.graphs.exportCommunities({ + collectionId: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa", + outputPath: "export.csv", + columns: ["id", "title", "created_at"], + includeHeader: true, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "http://127.0.0.1:7272/v3/graphs/export_communities" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -H "Content-Type: application/json" \ + -H "Accept: text/csv" \ + -d '{ "columns": ["id", "title", "created_at"], "include_header": true }' \ + --output export.csv + """), + }, + ] + }, + ) + @self.base_endpoint + async def export_communities( + background_tasks: BackgroundTasks, + collection_id: UUID = Path( + ..., + description="The ID of the document to export entities from.", + ), + columns: Optional[list[str]] = Body( + None, description="Specific columns to export" + ), + filters: Optional[dict] = Body( + None, description="Filters to apply to the export" + ), + include_header: Optional[bool] = Body( + True, description="Whether to include column headers" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> FileResponse: + """Export documents as a downloadable CSV file.""" + + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can export data.", + 403, + ) + + ( + csv_file_path, + temp_file, + ) = await self.services.management.export_graph_communities( + id=collection_id, + columns=columns, + filters=filters, + include_header=include_header + if include_header is not None + else True, + ) + + background_tasks.add_task(temp_file.close) + + return FileResponse( + path=csv_file_path, + media_type="text/csv", + filename="documents_export.csv", + ) + + @self.router.post( + "/graphs/{collection_id}/communities/{community_id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Update community", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.update_community( + collection_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + community_update={ + "metadata": { + "topic": "Technology", + "description": "Tech companies and products" + } + } + )"""), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + async function main() { + const response = await client.graphs.updateCommunity({ + collectionId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + communityId: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7", + communityUpdate: { + metadata: { + topic: "Technology", + description: "Tech companies and products" + } + } + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def update_community( + collection_id: UUID = Path(...), + community_id: UUID = Path(...), + name: Optional[str] = Body(None), + summary: Optional[str] = Body(None), + findings: Optional[list[str]] = Body(None), + rating: Optional[float] = Body(default=None, ge=1, le=10), + rating_explanation: Optional[str] = Body(None), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedCommunityResponse: + """Updates an existing community in the graph.""" + if ( + not auth_user.is_superuser + and collection_id not in auth_user.graph_ids + ): + raise R2RException( + "Only superusers can update communities.", 403 + ) + + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + return await self.services.graph.update_community( # type: ignore + community_id=community_id, + name=name, + summary=summary, + findings=findings, + rating=rating, + rating_explanation=rating_explanation, + ) + + @self.router.post( + "/graphs/{collection_id}/pull", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Pull latest entities to the graph", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + response = client.graphs.pull( + collection_id="d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + )"""), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + async function main() { + const response = await client.graphs.pull({ + collection_id: "d09dedb1-b2ab-48a5-b950-6e1f464d83e7" + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def pull( + collection_id: UUID = Path( + ..., description="The ID of the graph to initialize." + ), + force: Optional[bool] = Body( + False, + description="If true, forces a re-pull of all entities and relationships.", + ), + # document_ids: list[UUID] = Body( + # ..., description="List of document IDs to add to the graph." + # ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Adds documents to a graph by copying their entities and + relationships. + + This endpoint: + 1. Copies document entities to the graphs_entities table + 2. Copies document relationships to the graphs_relationships table + 3. Associates the documents with the graph + + When a document is added: + - Its entities and relationships are copied to graph-specific tables + - Existing entities/relationships are updated by merging their properties + - The document ID is recorded in the graph's document_ids array + + Documents added to a graph will contribute their knowledge to: + - Graph analysis and querying + - Community detection + - Knowledge graph enrichment + + The user must have access to both the graph and the documents being added. + """ + + collections_overview_response = ( + await self.services.management.collections_overview( + user_ids=[auth_user.id], + collection_ids=[collection_id], + offset=0, + limit=1, + ) + )["results"] + if len(collections_overview_response) == 0: # type: ignore + raise R2RException("Collection not found.", 404) + + # Check user permissions for graph + if ( + not auth_user.is_superuser + and collections_overview_response[0].owner_id != auth_user.id # type: ignore + ): + raise R2RException("Only superusers can `pull` a graph.", 403) + + if ( + # not auth_user.is_superuser + collection_id not in auth_user.collection_ids + ): + raise R2RException( + "The currently authenticated user does not have access to the collection associated with the given graph.", + 403, + ) + + list_graphs_response = await self.services.graph.list_graphs( + # user_ids=None, + graph_ids=[collection_id], + offset=0, + limit=1, + ) + if len(list_graphs_response["results"]) == 0: # type: ignore + raise R2RException("Graph not found", 404) + collection_id = list_graphs_response["results"][0].collection_id # type: ignore + documents: list[DocumentResponse] = [] + document_req = await self.providers.database.collections_handler.documents_in_collection( + collection_id, offset=0, limit=100 + ) + results = cast(list[DocumentResponse], document_req["results"]) + documents.extend(results) + + while len(results) == 100: + document_req = await self.providers.database.collections_handler.documents_in_collection( + collection_id, offset=len(documents), limit=100 + ) + results = cast(list[DocumentResponse], document_req["results"]) + documents.extend(results) + + success = False + + for document in documents: + entities = ( + await self.providers.database.graphs_handler.entities.get( + parent_id=document.id, + store_type=StoreType.DOCUMENTS, + offset=0, + limit=100, + ) + ) + has_document = ( + await self.providers.database.graphs_handler.has_document( + collection_id, document.id + ) + ) + if has_document: + logger.info( + f"Document {document.id} is already in graph {collection_id}, skipping." + ) + continue + if len(entities[0]) == 0: + if not force: + logger.warning( + f"Document {document.id} has no entities, extraction may not have been called, skipping." + ) + continue + else: + logger.warning( + f"Document {document.id} has no entities, but force=True, continuing." + ) + + success = ( + await self.providers.database.graphs_handler.add_documents( + id=collection_id, + document_ids=[document.id], + ) + ) + if not success: + logger.warning( + f"No documents were added to graph {collection_id}, marking as failed." + ) + + if success: + await self.providers.database.documents_handler.set_workflow_status( + id=collection_id, + status_type="graph_sync_status", + status=GraphConstructionStatus.SUCCESS, + ) + + return GenericBooleanResponse(success=success) # type: ignore diff --git a/.venv/lib/python3.12/site-packages/core/main/api/v3/indices_router.py b/.venv/lib/python3.12/site-packages/core/main/api/v3/indices_router.py new file mode 100644 index 00000000..29b75226 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/main/api/v3/indices_router.py @@ -0,0 +1,576 @@ +import logging +import textwrap +from typing import Optional + +from fastapi import Body, Depends, Path, Query + +from core.base import IndexConfig, R2RException +from core.base.abstractions import VectorTableName +from core.base.api.models import ( + VectorIndexResponse, + VectorIndicesResponse, + WrappedGenericMessageResponse, + WrappedVectorIndexResponse, + WrappedVectorIndicesResponse, +) + +from ...abstractions import R2RProviders, R2RServices +from ...config import R2RConfig +from .base_router import BaseRouterV3 + +logger = logging.getLogger() + + +class IndicesRouter(BaseRouterV3): + def __init__( + self, providers: R2RProviders, services: R2RServices, config: R2RConfig + ): + logging.info("Initializing IndicesRouter") + super().__init__(providers, services, config) + + def _setup_routes(self): + ## TODO - Allow developer to pass the index id with the request + @self.router.post( + "/indices", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Create Vector Index", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + # Create an HNSW index for efficient similarity search + result = client.indices.create( + config={ + "table_name": "chunks", # The table containing vector embeddings + "index_method": "hnsw", # Hierarchical Navigable Small World graph + "index_measure": "cosine_distance", # Similarity measure + "index_arguments": { + "m": 16, # Number of connections per layer + "ef_construction": 64,# Size of dynamic candidate list for construction + "ef": 40, # Size of dynamic candidate list for search + }, + "index_name": "my_document_embeddings_idx", + "index_column": "embedding", + "concurrently": True # Build index without blocking table writes + }, + run_with_orchestration=True # Run as orchestrated task for large indices + ) + + # Create an IVF-Flat index for balanced performance + result = client.indices.create( + config={ + "table_name": "chunks", + "index_method": "ivf_flat", # Inverted File with Flat storage + "index_measure": "l2_distance", + "index_arguments": { + "lists": 100, # Number of cluster centroids + "probe": 10, # Number of clusters to search + }, + "index_name": "my_ivf_embeddings_idx", + "index_column": "embedding", + "concurrently": True + } + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.indicies.create({ + config: { + tableName: "vectors", + indexMethod: "hnsw", + indexMeasure: "cosine_distance", + indexArguments: { + m: 16, + ef_construction: 64, + ef: 40 + }, + indexName: "my_document_embeddings_idx", + indexColumn: "embedding", + concurrently: true + }, + runWithOrchestration: true + }); + } + + main(); + """), + }, + { + "lang": "Shell", + "source": textwrap.dedent(""" + # Create HNSW Index + curl -X POST "https://api.example.com/indices" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "config": { + "table_name": "vectors", + "index_method": "hnsw", + "index_measure": "cosine_distance", + "index_arguments": { + "m": 16, + "ef_construction": 64, + "ef": 40 + }, + "index_name": "my_document_embeddings_idx", + "index_column": "embedding", + "concurrently": true + }, + "run_with_orchestration": true + }' + + # Create IVF-Flat Index + curl -X POST "https://api.example.com/indices" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -d '{ + "config": { + "table_name": "vectors", + "index_method": "ivf_flat", + "index_measure": "l2_distance", + "index_arguments": { + "lists": 100, + "probe": 10 + }, + "index_name": "my_ivf_embeddings_idx", + "index_column": "embedding", + "concurrently": true + } + }' + """), + }, + ] + }, + ) + @self.base_endpoint + async def create_index( + config: IndexConfig, + run_with_orchestration: Optional[bool] = Body( + True, + description="Whether to run index creation as an orchestrated task (recommended for large indices)", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGenericMessageResponse: + """Create a new vector similarity search index in over the target + table. Allowed tables include 'vectors', 'entity', + 'document_collections'. Vectors correspond to the chunks of text + that are indexed for similarity search, whereas entity and + document_collections are created during knowledge graph + construction. + + This endpoint creates a database index optimized for efficient similarity search over vector embeddings. + It supports two main indexing methods: + + 1. HNSW (Hierarchical Navigable Small World): + - Best for: High-dimensional vectors requiring fast approximate nearest neighbor search + - Pros: Very fast search, good recall, memory-resident for speed + - Cons: Slower index construction, more memory usage + - Key parameters: + * m: Number of connections per layer (higher = better recall but more memory) + * ef_construction: Build-time search width (higher = better recall but slower build) + * ef: Query-time search width (higher = better recall but slower search) + + 2. IVF-Flat (Inverted File with Flat Storage): + - Best for: Balance between build speed, search speed, and recall + - Pros: Faster index construction, less memory usage + - Cons: Slightly slower search than HNSW + - Key parameters: + * lists: Number of clusters (usually sqrt(n) where n is number of vectors) + * probe: Number of nearest clusters to search + + Supported similarity measures: + - cosine_distance: Best for comparing semantic similarity + - l2_distance: Best for comparing absolute distances + - ip_distance: Best for comparing raw dot products + + Notes: + - Index creation can be resource-intensive for large datasets + - Use run_with_orchestration=True for large indices to prevent timeouts + - The 'concurrently' option allows other operations while building + - Index names must be unique per table + """ + # TODO: Implement index creation logic + logger.info( + f"Creating vector index for {config.table_name} with method {config.index_method}, measure {config.index_measure}, concurrently {config.concurrently}" + ) + + result = await self.providers.orchestration.run_workflow( + "create-vector-index", + { + "request": { + "table_name": config.table_name, + "index_method": config.index_method, + "index_measure": config.index_measure, + "index_name": config.index_name, + "index_column": config.index_column, + "index_arguments": config.index_arguments, + "concurrently": config.concurrently, + }, + }, + options={ + "additional_metadata": {}, + }, + ) + + return result # type: ignore + + @self.router.get( + "/indices", + dependencies=[Depends(self.rate_limit_dependency)], + summary="List Vector Indices", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + + # List all indices + indices = client.indices.list( + offset=0, + limit=10 + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.indicies.list({ + offset: 0, + limit: 10, + filters: { table_name: "vectors" } + } + + main(); + """), + }, + { + "lang": "Shell", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/indices?offset=0&limit=10" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -H "Content-Type: application/json" + + # With filters + curl -X GET "https://api.example.com/indices?offset=0&limit=10&filters={\"table_name\":\"vectors\"}" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -H "Content-Type: application/json" + """), + }, + ] + }, + ) + @self.base_endpoint + async def list_indices( + # filters: list[str] = Query([]), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedVectorIndicesResponse: + """List existing vector similarity search indices with pagination + support. + + Returns details about each index including: + - Name and table name + - Indexing method and parameters + - Size and row count + - Creation timestamp and last updated + - Performance statistics (if available) + + The response can be filtered using the filter_by parameter to narrow down results + based on table name, index method, or other attributes. + """ + # TODO: Implement index listing logic + indices_data = ( + await self.providers.database.chunks_handler.list_indices( + offset=offset, limit=limit + ) + ) + + formatted_indices = VectorIndicesResponse( + indices=[ + VectorIndexResponse(index=index_data) + for index_data in indices_data["indices"] + ] + ) + + return ( # type: ignore + formatted_indices, + {"total_entries": indices_data["total_entries"]}, + ) + + @self.router.get( + "/indices/{table_name}/{index_name}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Get Vector Index Details", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + + # Get detailed information about a specific index + index = client.indices.retrieve("index_1") + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.indicies.retrieve({ + indexName: "index_1", + tableName: "vectors" + }); + + console.log(response); + } + + main(); + """), + }, + { + "lang": "Shell", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/indices/vectors/index_1" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_index( + table_name: VectorTableName = Path( + ..., + description="The table of vector embeddings to delete (e.g. `vectors`, `entity`, `document_collections`)", + ), + index_name: str = Path( + ..., description="The name of the index to delete" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedVectorIndexResponse: + """Get detailed information about a specific vector index. + + Returns comprehensive information about the index including: + - Configuration details (method, measure, parameters) + - Current size and row count + - Build progress (if still under construction) + - Performance statistics: + * Average query time + * Memory usage + * Cache hit rates + * Recent query patterns + - Maintenance information: + * Last vacuum + * Fragmentation level + * Recommended optimizations + """ + # TODO: Implement get index logic + indices = ( + await self.providers.database.chunks_handler.list_indices( + filters={ + "index_name": index_name, + "table_name": table_name, + }, + limit=1, + offset=0, + ) + ) + if len(indices["indices"]) != 1: + raise R2RException( + f"Index '{index_name}' not found", status_code=404 + ) + return {"index": indices["indices"][0]} # type: ignore + + # TODO - Implement update index + # @self.router.post( + # "/indices/{name}", + # summary="Update Vector Index", + # openapi_extra={ + # "x-codeSamples": [ + # { + # "lang": "Python", + # "source": """ + # from r2r import R2RClient + + # client = R2RClient() + + # # Update HNSW index parameters + # result = client.indices.update( + # "550e8400-e29b-41d4-a716-446655440000", + # config={ + # "index_arguments": { + # "ef": 80, # Increase search quality + # "m": 24 # Increase connections per layer + # }, + # "concurrently": True + # }, + # run_with_orchestration=True + # )""", + # }, + # { + # "lang": "Shell", + # "source": """ + # curl -X PUT "https://api.example.com/indices/550e8400-e29b-41d4-a716-446655440000" \\ + # -H "Content-Type: application/json" \\ + # -H "Authorization: Bearer YOUR_API_KEY" \\ + # -d '{ + # "config": { + # "index_arguments": { + # "ef": 80, + # "m": 24 + # }, + # "concurrently": true + # }, + # "run_with_orchestration": true + # }'""", + # }, + # ] + # }, + # ) + # @self.base_endpoint + # async def update_index( + # id: UUID = Path(...), + # config: IndexConfig = Body(...), + # run_with_orchestration: Optional[bool] = Body(True), + # auth_user=Depends(self.providers.auth.auth_wrapper()), + # ): # -> WrappedUpdateIndexResponse: + # """ + # Update an existing index's configuration. + # """ + # # TODO: Implement index update logic + # pass + + @self.router.delete( + "/indices/{table_name}/{index_name}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Delete Vector Index", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + + # Delete an index with orchestration for cleanup + result = client.indices.delete( + index_name="index_1", + table_name="vectors", + run_with_orchestration=True + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.indicies.delete({ + indexName: "index_1" + tableName: "vectors" + }); + + console.log(response); + } + + main(); + """), + }, + { + "lang": "Shell", + "source": textwrap.dedent(""" + curl -X DELETE "https://api.example.com/indices/index_1" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def delete_index( + table_name: VectorTableName = Path( + default=..., + description="The table of vector embeddings to delete (e.g. `vectors`, `entity`, `document_collections`)", + ), + index_name: str = Path( + ..., description="The name of the index to delete" + ), + # concurrently: bool = Body( + # default=True, + # description="Whether to delete the index concurrently (recommended for large indices)", + # ), + # run_with_orchestration: Optional[bool] = Body(True), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGenericMessageResponse: + """Delete an existing vector similarity search index. + + This endpoint removes the specified index from the database. Important considerations: + + - Deletion is permanent and cannot be undone + - Underlying vector data remains intact + - Queries will fall back to sequential scan + - Running queries during deletion may be slower + - Use run_with_orchestration=True for large indices to prevent timeouts + - Consider index dependencies before deletion + + The operation returns immediately but cleanup may continue in background. + """ + logger.info( + f"Deleting vector index {index_name} from table {table_name}" + ) + + return await self.providers.orchestration.run_workflow( # type: ignore + "delete-vector-index", + { + "request": { + "index_name": index_name, + "table_name": table_name, + "concurrently": True, + }, + }, + options={ + "additional_metadata": {}, + }, + ) diff --git a/.venv/lib/python3.12/site-packages/core/main/api/v3/prompts_router.py b/.venv/lib/python3.12/site-packages/core/main/api/v3/prompts_router.py new file mode 100644 index 00000000..55512143 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/main/api/v3/prompts_router.py @@ -0,0 +1,387 @@ +import logging +import textwrap +from typing import Optional + +from fastapi import Body, Depends, Path, Query + +from core.base import R2RException +from core.base.api.models import ( + GenericBooleanResponse, + GenericMessageResponse, + WrappedBooleanResponse, + WrappedGenericMessageResponse, + WrappedPromptResponse, + WrappedPromptsResponse, +) + +from ...abstractions import R2RProviders, R2RServices +from ...config import R2RConfig +from .base_router import BaseRouterV3 + + +class PromptsRouter(BaseRouterV3): + def __init__( + self, providers: R2RProviders, services: R2RServices, config: R2RConfig + ): + logging.info("Initializing PromptsRouter") + super().__init__(providers, services, config) + + def _setup_routes(self): + @self.router.post( + "/prompts", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Create a new prompt", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.prompts.create( + name="greeting_prompt", + template="Hello, {name}!", + input_types={"name": "string"} + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.prompts.create({ + name: "greeting_prompt", + template: "Hello, {name}!", + inputTypes: { name: "string" }, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/prompts" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -H "Content-Type: application/json" \\ + -d '{"name": "greeting_prompt", "template": "Hello, {name}!", "input_types": {"name": "string"}}' + """), + }, + ] + }, + ) + @self.base_endpoint + async def create_prompt( + name: str = Body(..., description="The name of the prompt"), + template: str = Body( + ..., description="The template string for the prompt" + ), + input_types: dict[str, str] = Body( + default={}, + description="A dictionary mapping input names to their types", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGenericMessageResponse: + """Create a new prompt with the given configuration. + + This endpoint allows superusers to create a new prompt with a + specified name, template, and input types. + """ + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can create prompts.", + 403, + ) + result = await self.services.management.add_prompt( + name, template, input_types + ) + return GenericMessageResponse(message=result) # type: ignore + + @self.router.get( + "/prompts", + dependencies=[Depends(self.rate_limit_dependency)], + summary="List all prompts", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.prompts.list() + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.prompts.list(); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/v3/prompts" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_prompts( + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedPromptsResponse: + """List all available prompts. + + This endpoint retrieves a list of all prompts in the system. Only + superusers can access this endpoint. + """ + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can list prompts.", + 403, + ) + get_prompts_response = ( + await self.services.management.get_all_prompts() + ) + + return ( # type: ignore + get_prompts_response["results"], + { + "total_entries": get_prompts_response["total_entries"], + }, + ) + + @self.router.post( + "/prompts/{name}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Get a specific prompt", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.prompts.get( + "greeting_prompt", + inputs={"name": "John"}, + prompt_override="Hi, {name}!" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.prompts.retrieve({ + name: "greeting_prompt", + inputs: { name: "John" }, + promptOverride: "Hi, {name}!", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/prompts/greeting_prompt?inputs=%7B%22name%22%3A%22John%22%7D&prompt_override=Hi%2C%20%7Bname%7D!" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_prompt( + name: str = Path(..., description="Prompt name"), + inputs: Optional[dict[str, str]] = Body( + None, description="Prompt inputs" + ), + prompt_override: Optional[str] = Query( + None, description="Prompt override" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedPromptResponse: + """Get a specific prompt by name, optionally with inputs and + override. + + This endpoint retrieves a specific prompt and allows for optional + inputs and template override. Only superusers can access this + endpoint. + """ + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can retrieve prompts.", + 403, + ) + result = await self.services.management.get_prompt( + name, inputs, prompt_override + ) + return result # type: ignore + + @self.router.put( + "/prompts/{name}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Update an existing prompt", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.prompts.update( + "greeting_prompt", + template="Greetings, {name}!", + input_types={"name": "string", "age": "integer"} + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.prompts.update({ + name: "greeting_prompt", + template: "Greetings, {name}!", + inputTypes: { name: "string", age: "integer" }, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X PUT "https://api.example.com/v3/prompts/greeting_prompt" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -H "Content-Type: application/json" \\ + -d '{"template": "Greetings, {name}!", "input_types": {"name": "string", "age": "integer"}}' + """), + }, + ] + }, + ) + @self.base_endpoint + async def update_prompt( + name: str = Path(..., description="Prompt name"), + template: Optional[str] = Body( + None, description="Updated prompt template" + ), + input_types: dict[str, str] = Body( + default={}, + description="A dictionary mapping input names to their types", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGenericMessageResponse: + """Update an existing prompt's template and/or input types. + + This endpoint allows superusers to update the template and input + types of an existing prompt. + """ + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can update prompts.", + 403, + ) + result = await self.services.management.update_prompt( + name, template, input_types + ) + return GenericMessageResponse(message=result) # type: ignore + + @self.router.delete( + "/prompts/{name}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Delete a prompt", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.prompts.delete("greeting_prompt") + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.prompts.delete({ + name: "greeting_prompt", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X DELETE "https://api.example.com/v3/prompts/greeting_prompt" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def delete_prompt( + name: str = Path(..., description="Prompt name"), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Delete a prompt by name. + + This endpoint allows superusers to delete an existing prompt. + """ + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can delete prompts.", + 403, + ) + await self.services.management.delete_prompt(name) + return GenericBooleanResponse(success=True) # type: ignore diff --git a/.venv/lib/python3.12/site-packages/core/main/api/v3/retrieval_router.py b/.venv/lib/python3.12/site-packages/core/main/api/v3/retrieval_router.py new file mode 100644 index 00000000..28749319 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/main/api/v3/retrieval_router.py @@ -0,0 +1,639 @@ +import logging +from typing import Any, Literal, Optional +from uuid import UUID + +from fastapi import Body, Depends +from fastapi.responses import StreamingResponse + +from core.base import ( + GenerationConfig, + Message, + R2RException, + SearchMode, + SearchSettings, + select_search_filters, +) +from core.base.api.models import ( + WrappedAgentResponse, + WrappedCompletionResponse, + WrappedEmbeddingResponse, + WrappedLLMChatCompletion, + WrappedRAGResponse, + WrappedSearchResponse, +) + +from ...abstractions import R2RProviders, R2RServices +from ...config import R2RConfig +from .base_router import BaseRouterV3 +from .examples import EXAMPLES + +logger = logging.getLogger(__name__) + + +def merge_search_settings( + base: SearchSettings, overrides: SearchSettings +) -> SearchSettings: + # Convert both to dict + base_dict = base.model_dump() + overrides_dict = overrides.model_dump(exclude_unset=True) + + # Update base_dict with values from overrides_dict + # This ensures that any field set in overrides takes precedence + for k, v in overrides_dict.items(): + base_dict[k] = v + + # Construct a new SearchSettings from the merged dict + return SearchSettings(**base_dict) + + +class RetrievalRouter(BaseRouterV3): + def __init__( + self, providers: R2RProviders, services: R2RServices, config: R2RConfig + ): + logging.info("Initializing RetrievalRouter") + super().__init__(providers, services, config) + + def _register_workflows(self): + pass + + def _prepare_search_settings( + self, + auth_user: Any, + search_mode: SearchMode, + search_settings: Optional[SearchSettings], + ) -> SearchSettings: + """Prepare the effective search settings based on the provided + search_mode, optional user-overrides in search_settings, and applied + filters.""" + if search_mode != SearchMode.custom: + # Start from mode defaults + effective_settings = SearchSettings.get_default(search_mode.value) + if search_settings: + # Merge user-provided overrides + effective_settings = merge_search_settings( + effective_settings, search_settings + ) + else: + # Custom mode: use provided settings or defaults + effective_settings = search_settings or SearchSettings() + + # Apply user-specific filters + effective_settings.filters = select_search_filters( + auth_user, effective_settings + ) + return effective_settings + + def _setup_routes(self): + @self.router.post( + "/retrieval/search", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Search R2R", + openapi_extra=EXAMPLES["search"], + ) + @self.base_endpoint + async def search_app( + query: str = Body( + ..., + description="Search query to find relevant documents", + ), + search_mode: SearchMode = Body( + default=SearchMode.custom, + description=( + "Default value of `custom` allows full control over search settings.\n\n" + "Pre-configured search modes:\n" + "`basic`: A simple semantic-based search.\n" + "`advanced`: A more powerful hybrid search combining semantic and full-text.\n" + "`custom`: Full control via `search_settings`.\n\n" + "If `filters` or `limit` are provided alongside `basic` or `advanced`, " + "they will override the default settings for that mode." + ), + ), + search_settings: Optional[SearchSettings] = Body( + None, + description=( + "The search configuration object. If `search_mode` is `custom`, " + "these settings are used as-is. For `basic` or `advanced`, these settings will override the default mode configuration.\n\n" + "Common overrides include `filters` to narrow results and `limit` to control how many results are returned." + ), + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedSearchResponse: + """Perform a search query against vector and/or graph-based + databases. + + **Search Modes:** + - `basic`: Defaults to semantic search. Simple and easy to use. + - `advanced`: Combines semantic search with full-text search for more comprehensive results. + - `custom`: Complete control over how search is performed. Provide a full `SearchSettings` object. + + **Filters:** + Apply filters directly inside `search_settings.filters`. For example: + ```json + { + "filters": {"document_id": {"$eq": "e43864f5-a36f-548e-aacd-6f8d48b30c7f"}} + } + ``` + Supported operators: `$eq`, `$neq`, `$gt`, `$gte`, `$lt`, `$lte`, `$like`, `$ilike`, `$in`, `$nin`. + + **Hybrid Search:** + Enable hybrid search by setting `use_hybrid_search: true` in search_settings. This combines semantic search with + keyword-based search for improved results. Configure with `hybrid_settings`: + ```json + { + "use_hybrid_search": true, + "hybrid_settings": { + "full_text_weight": 1.0, + "semantic_weight": 5.0, + "full_text_limit": 200, + "rrf_k": 50 + } + } + ``` + + **Graph-Enhanced Search:** + Knowledge graph integration is enabled by default. Control with `graph_search_settings`: + ```json + { + "graph_search_settings": { + "use_graph_search": true, + "kg_search_type": "local" + } + } + ``` + + **Advanced Filtering:** + Use complex filters to narrow down results by metadata fields or document properties: + ```json + { + "filters": { + "$and":[ + {"document_type": {"$eq": "pdf"}}, + {"metadata.year": {"$gt": 2020}} + ] + } + } + ``` + + **Results:** + The response includes vector search results and optional graph search results. + Each result contains the matched text, document ID, and relevance score. + + """ + if query == "": + raise R2RException("Query cannot be empty", 400) + effective_settings = self._prepare_search_settings( + auth_user, search_mode, search_settings + ) + results = await self.services.retrieval.search( + query=query, + search_settings=effective_settings, + ) + return results # type: ignore + + @self.router.post( + "/retrieval/rag", + dependencies=[Depends(self.rate_limit_dependency)], + summary="RAG Query", + response_model=None, + openapi_extra=EXAMPLES["rag"], + ) + @self.base_endpoint + async def rag_app( + query: str = Body(...), + search_mode: SearchMode = Body( + default=SearchMode.custom, + description=( + "Default value of `custom` allows full control over search settings.\n\n" + "Pre-configured search modes:\n" + "`basic`: A simple semantic-based search.\n" + "`advanced`: A more powerful hybrid search combining semantic and full-text.\n" + "`custom`: Full control via `search_settings`.\n\n" + "If `filters` or `limit` are provided alongside `basic` or `advanced`, " + "they will override the default settings for that mode." + ), + ), + search_settings: Optional[SearchSettings] = Body( + None, + description=( + "The search configuration object. If `search_mode` is `custom`, " + "these settings are used as-is. For `basic` or `advanced`, these settings will override the default mode configuration.\n\n" + "Common overrides include `filters` to narrow results and `limit` to control how many results are returned." + ), + ), + rag_generation_config: GenerationConfig = Body( + default_factory=GenerationConfig, + description="Configuration for RAG generation", + ), + task_prompt: Optional[str] = Body( + default=None, + description="Optional custom prompt to override default", + ), + include_title_if_available: bool = Body( + default=False, + description="Include document titles in responses when available", + ), + include_web_search: bool = Body( + default=False, + description="Include web search results provided to the LLM.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedRAGResponse: + """Execute a RAG (Retrieval-Augmented Generation) query. + + This endpoint combines search results with language model generation to produce accurate, + contextually-relevant responses based on your document corpus. + + **Features:** + - Combines vector search, optional knowledge graph integration, and LLM generation + - Automatically cites sources with unique citation identifiers + - Supports both streaming and non-streaming responses + - Compatible with various LLM providers (OpenAI, Anthropic, etc.) + - Web search integration for up-to-date information + + **Search Configuration:** + All search parameters from the search endpoint apply here, including filters, hybrid search, and graph-enhanced search. + + **Generation Configuration:** + Fine-tune the language model's behavior with `rag_generation_config`: + ```json + { + "model": "openai/gpt-4o-mini", // Model to use + "temperature": 0.7, // Control randomness (0-1) + "max_tokens": 1500, // Maximum output length + "stream": true // Enable token streaming + } + ``` + + **Model Support:** + - OpenAI models (default) + - Anthropic Claude models (requires ANTHROPIC_API_KEY) + - Local models via Ollama + - Any provider supported by LiteLLM + + **Streaming Responses:** + When `stream: true` is set, the endpoint returns Server-Sent Events with the following types: + - `search_results`: Initial search results from your documents + - `message`: Partial tokens as they're generated + - `citation`: Citation metadata when sources are referenced + - `final_answer`: Complete answer with structured citations + + **Example Response:** + ```json + { + "generated_answer": "DeepSeek-R1 is a model that demonstrates impressive performance...[1]", + "search_results": { ... }, + "citations": [ + { + "id": "cit.123456", + "object": "citation", + "payload": { ... } + } + ] + } + ``` + """ + + if "model" not in rag_generation_config.__fields_set__: + rag_generation_config.model = self.config.app.quality_llm + + effective_settings = self._prepare_search_settings( + auth_user, search_mode, search_settings + ) + + response = await self.services.retrieval.rag( + query=query, + search_settings=effective_settings, + rag_generation_config=rag_generation_config, + task_prompt=task_prompt, + include_title_if_available=include_title_if_available, + include_web_search=include_web_search, + ) + + if rag_generation_config.stream: + # ========== Streaming path ========== + async def stream_generator(): + try: + async for chunk in response: + if len(chunk) > 1024: + for i in range(0, len(chunk), 1024): + yield chunk[i : i + 1024] + else: + yield chunk + except GeneratorExit: + # Clean up if needed, then return + return + + return StreamingResponse( + stream_generator(), media_type="text/event-stream" + ) # type: ignore + else: + # ========== Non-streaming path ========== + return response + + @self.router.post( + "/retrieval/agent", + dependencies=[Depends(self.rate_limit_dependency)], + summary="RAG-powered Conversational Agent", + openapi_extra=EXAMPLES["agent"], + ) + @self.base_endpoint + async def agent_app( + message: Optional[Message] = Body( + None, + description="Current message to process", + ), + messages: Optional[list[Message]] = Body( + None, + deprecated=True, + description="List of messages (deprecated, use message instead)", + ), + search_mode: SearchMode = Body( + default=SearchMode.custom, + description="Pre-configured search modes: basic, advanced, or custom.", + ), + search_settings: Optional[SearchSettings] = Body( + None, + description="The search configuration object for retrieving context.", + ), + # Generation configurations + rag_generation_config: GenerationConfig = Body( + default_factory=GenerationConfig, + description="Configuration for RAG generation in 'rag' mode", + ), + research_generation_config: Optional[GenerationConfig] = Body( + None, + description="Configuration for generation in 'research' mode. If not provided but mode='research', rag_generation_config will be used with appropriate model overrides.", + ), + # Tool configurations + rag_tools: Optional[ + list[ + Literal[ + "web_search", + "web_scrape", + "search_file_descriptions", + "search_file_knowledge", + "get_file_content", + ] + ] + ] = Body( + None, + description="List of tools to enable for RAG mode. Available tools: search_file_knowledge, get_file_content, web_search, web_scrape, search_file_descriptions", + ), + research_tools: Optional[ + list[ + Literal["rag", "reasoning", "critique", "python_executor"] + ] + ] = Body( + None, + description="List of tools to enable for Research mode. Available tools: rag, reasoning, critique, python_executor", + ), + # Backward compatibility + tools: Optional[list[str]] = Body( + None, + deprecated=True, + description="List of tools to execute (deprecated, use rag_tools or research_tools instead)", + ), + # Other parameters + task_prompt: Optional[str] = Body( + default=None, + description="Optional custom prompt to override default", + ), + # Backward compatibility + task_prompt_override: Optional[str] = Body( + default=None, + deprecated=True, + description="Optional custom prompt to override default", + ), + include_title_if_available: bool = Body( + default=True, + description="Pass document titles from search results into the LLM context window.", + ), + conversation_id: Optional[UUID] = Body( + default=None, + description="ID of the conversation", + ), + max_tool_context_length: Optional[int] = Body( + default=32_768, + description="Maximum length of returned tool context", + ), + use_system_context: Optional[bool] = Body( + default=True, + description="Use extended prompt for generation", + ), + mode: Optional[Literal["rag", "research"]] = Body( + default="rag", + description="Mode to use for generation: 'rag' for standard retrieval or 'research' for deep analysis with reasoning capabilities", + ), + needs_initial_conversation_name: Optional[bool] = Body( + default=None, + description="If true, the system will automatically assign a conversation name if not already specified previously.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedAgentResponse: + """ + Engage with an intelligent agent for information retrieval, analysis, and research. + + This endpoint offers two operating modes: + - **RAG mode**: Standard retrieval-augmented generation for answering questions based on knowledge base + - **Research mode**: Advanced capabilities for deep analysis, reasoning, and computation + + ### RAG Mode (Default) + + The RAG mode provides fast, knowledge-based responses using: + - Semantic and hybrid search capabilities + - Document-level and chunk-level content retrieval + - Optional web search integration + - Source citation and evidence-based responses + + ### Research Mode + + The Research mode builds on RAG capabilities and adds: + - A dedicated reasoning system for complex problem-solving + - Critique capabilities to identify potential biases or logical fallacies + - Python execution for computational analysis + - Multi-step reasoning for deeper exploration of topics + + ### Available Tools + + **RAG Tools:** + - `search_file_knowledge`: Semantic/hybrid search on your ingested documents + - `search_file_descriptions`: Search over file-level metadata + - `content`: Fetch entire documents or chunk structures + - `web_search`: Query external search APIs for up-to-date information + - `web_scrape`: Scrape and extract content from specific web pages + + **Research Tools:** + - `rag`: Leverage the underlying RAG agent for information retrieval + - `reasoning`: Call a dedicated model for complex analytical thinking + - `critique`: Analyze conversation history to identify flaws and biases + - `python_executor`: Execute Python code for complex calculations and analysis + + ### Streaming Output + + When streaming is enabled, the agent produces different event types: + - `thinking`: Shows the model's step-by-step reasoning (when extended_thinking=true) + - `tool_call`: Shows when the agent invokes a tool + - `tool_result`: Shows the result of a tool call + - `citation`: Indicates when a citation is added to the response + - `message`: Streams partial tokens of the response + - `final_answer`: Contains the complete generated answer and structured citations + + ### Conversations + + Maintain context across multiple turns by including `conversation_id` in each request. + After your first call, store the returned `conversation_id` and include it in subsequent calls. + If no conversation name has already been set for the conversation, the system will automatically assign one. + + """ + # Handle backward compatibility for task_prompt + task_prompt = task_prompt or task_prompt_override + # Handle model selection based on mode + if "model" not in rag_generation_config.__fields_set__: + if mode == "rag": + rag_generation_config.model = self.config.app.quality_llm + elif mode == "research": + rag_generation_config.model = self.config.app.planning_llm + + # Prepare search settings + effective_settings = self._prepare_search_settings( + auth_user, search_mode, search_settings + ) + + # Handle tool configuration and backward compatibility + if tools: # Handle deprecated tools parameter + logger.warning( + "The 'tools' parameter is deprecated. Use 'rag_tools' or 'research_tools' based on mode." + ) + rag_tools = tools # type: ignore + + # Determine effective generation config + effective_generation_config = rag_generation_config + if mode == "research" and research_generation_config: + effective_generation_config = research_generation_config + + try: + response = await self.services.retrieval.agent( + message=message, + messages=messages, + search_settings=effective_settings, + rag_generation_config=rag_generation_config, + research_generation_config=research_generation_config, + task_prompt=task_prompt, + include_title_if_available=include_title_if_available, + max_tool_context_length=max_tool_context_length or 32_768, + conversation_id=( + str(conversation_id) if conversation_id else None # type: ignore + ), + use_system_context=use_system_context + if use_system_context is not None + else True, + rag_tools=rag_tools, # type: ignore + research_tools=research_tools, # type: ignore + mode=mode, + needs_initial_conversation_name=needs_initial_conversation_name, + ) + + if effective_generation_config.stream: + + async def stream_generator(): + try: + async for chunk in response: + if len(chunk) > 1024: + for i in range(0, len(chunk), 1024): + yield chunk[i : i + 1024] + else: + yield chunk + except GeneratorExit: + # Clean up if needed, then return + return + + return StreamingResponse( # type: ignore + stream_generator(), media_type="text/event-stream" + ) + else: + return response + except Exception as e: + logger.error(f"Error in agent_app: {e}") + raise R2RException(str(e), 500) from e + + @self.router.post( + "/retrieval/completion", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Generate Message Completions", + openapi_extra=EXAMPLES["completion"], + ) + @self.base_endpoint + async def completion( + messages: list[Message] = Body( + ..., + description="List of messages to generate completion for", + example=[ + { + "role": "system", + "content": "You are a helpful assistant.", + }, + { + "role": "user", + "content": "What is the capital of France?", + }, + { + "role": "assistant", + "content": "The capital of France is Paris.", + }, + {"role": "user", "content": "What about Italy?"}, + ], + ), + generation_config: GenerationConfig = Body( + default_factory=GenerationConfig, + description="Configuration for text generation", + example={ + "model": "openai/gpt-4o-mini", + "temperature": 0.7, + "max_tokens": 150, + "stream": False, + }, + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + response_model=WrappedCompletionResponse, + ) -> WrappedLLMChatCompletion: + """Generate completions for a list of messages. + + This endpoint uses the language model to generate completions for + the provided messages. The generation process can be customized + using the generation_config parameter. + + The messages list should contain alternating user and assistant + messages, with an optional system message at the start. Each + message should have a 'role' and 'content'. + """ + + return await self.services.retrieval.completion( + messages=messages, # type: ignore + generation_config=generation_config, + ) + + @self.router.post( + "/retrieval/embedding", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Generate Embeddings", + openapi_extra=EXAMPLES["embedding"], + ) + @self.base_endpoint + async def embedding( + text: str = Body( + ..., + description="Text to generate embeddings for", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedEmbeddingResponse: + """Generate embeddings for the provided text using the specified + model. + + This endpoint uses the language model to generate embeddings for + the provided text. The model parameter specifies the model to use + for generating embeddings. + """ + + return await self.services.retrieval.embedding( + text=text, + ) diff --git a/.venv/lib/python3.12/site-packages/core/main/api/v3/system_router.py b/.venv/lib/python3.12/site-packages/core/main/api/v3/system_router.py new file mode 100644 index 00000000..682be750 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/main/api/v3/system_router.py @@ -0,0 +1,186 @@ +import logging +import textwrap +from datetime import datetime, timezone + +import psutil +from fastapi import Depends + +from core.base import R2RException +from core.base.api.models import ( + GenericMessageResponse, + WrappedGenericMessageResponse, + WrappedServerStatsResponse, + WrappedSettingsResponse, +) + +from ...abstractions import R2RProviders, R2RServices +from ...config import R2RConfig +from .base_router import BaseRouterV3 + + +class SystemRouter(BaseRouterV3): + def __init__( + self, + providers: R2RProviders, + services: R2RServices, + config: R2RConfig, + ): + logging.info("Initializing SystemRouter") + super().__init__(providers, services, config) + self.start_time = datetime.now(timezone.utc) + + def _setup_routes(self): + @self.router.get( + "/health", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.system.health() + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.system.health(); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/health"\\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + """), + }, + ] + }, + ) + @self.base_endpoint + async def health_check() -> WrappedGenericMessageResponse: + return GenericMessageResponse(message="ok") # type: ignore + + @self.router.get( + "/system/settings", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.system.settings() + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.system.settings(); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/system/settings" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + """), + }, + ] + }, + ) + @self.base_endpoint + async def app_settings( + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedSettingsResponse: + if not auth_user.is_superuser: + raise R2RException( + "Only a superuser can call the `system/settings` endpoint.", + 403, + ) + return await self.services.management.app_settings() + + @self.router.get( + "/system/status", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # when using auth, do client.login(...) + + result = client.system.status() + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.system.status(); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/system/status" \\ + -H "Content-Type: application/json" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + """), + }, + ] + }, + ) + @self.base_endpoint + async def server_stats( + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedServerStatsResponse: + if not auth_user.is_superuser: + raise R2RException( + "Only an authorized user can call the `system/status` endpoint.", + 403, + ) + return { # type: ignore + "start_time": self.start_time.isoformat(), + "uptime_seconds": ( + datetime.now(timezone.utc) - self.start_time + ).total_seconds(), + "cpu_usage": psutil.cpu_percent(), + "memory_usage": psutil.virtual_memory().percent, + } diff --git a/.venv/lib/python3.12/site-packages/core/main/api/v3/users_router.py b/.venv/lib/python3.12/site-packages/core/main/api/v3/users_router.py new file mode 100644 index 00000000..686f0013 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/main/api/v3/users_router.py @@ -0,0 +1,1721 @@ +import logging +import os +import textwrap +import urllib.parse +from typing import Optional +from uuid import UUID + +import requests +from fastapi import Body, Depends, HTTPException, Path, Query +from fastapi.background import BackgroundTasks +from fastapi.responses import FileResponse +from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm +from google.auth.transport import requests as google_requests +from google.oauth2 import id_token +from pydantic import EmailStr + +from core.base import R2RException +from core.base.api.models import ( + GenericBooleanResponse, + GenericMessageResponse, + WrappedAPIKeyResponse, + WrappedAPIKeysResponse, + WrappedBooleanResponse, + WrappedCollectionsResponse, + WrappedGenericMessageResponse, + WrappedLimitsResponse, + WrappedLoginResponse, + WrappedTokenResponse, + WrappedUserResponse, + WrappedUsersResponse, +) + +from ...abstractions import R2RProviders, R2RServices +from ...config import R2RConfig +from .base_router import BaseRouterV3 + +oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") + + +class UsersRouter(BaseRouterV3): + def __init__( + self, providers: R2RProviders, services: R2RServices, config: R2RConfig + ): + logging.info("Initializing UsersRouter") + super().__init__(providers, services, config) + self.google_client_id = os.environ.get("GOOGLE_CLIENT_ID") + self.google_client_secret = os.environ.get("GOOGLE_CLIENT_SECRET") + self.google_redirect_uri = os.environ.get("GOOGLE_REDIRECT_URI") + + self.github_client_id = os.environ.get("GITHUB_CLIENT_ID") + self.github_client_secret = os.environ.get("GITHUB_CLIENT_SECRET") + self.github_redirect_uri = os.environ.get("GITHUB_REDIRECT_URI") + + def _setup_routes(self): + @self.router.post( + "/users", + # dependencies=[Depends(self.rate_limit_dependency)], + response_model=WrappedUserResponse, + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + new_user = client.users.create( + email="jane.doe@example.com", + password="secure_password123" + )"""), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.create({ + email: "jane.doe@example.com", + password: "secure_password123" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/users" \\ + -H "Content-Type: application/json" \\ + -d '{ + "email": "jane.doe@example.com", + "password": "secure_password123" + }'"""), + }, + ] + }, + ) + @self.base_endpoint + async def register( + email: EmailStr = Body(..., description="User's email address"), + password: str = Body(..., description="User's password"), + name: str | None = Body( + None, description="The name for the new user" + ), + bio: str | None = Body( + None, description="The bio for the new user" + ), + profile_picture: str | None = Body( + None, description="Updated user profile picture" + ), + # auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedUserResponse: + """Register a new user with the given email and password.""" + + # TODO: Do we really want this validation? The default password for the superuser would not pass... + def validate_password(password: str) -> bool: + if len(password) < 10: + return False + if not any(c.isupper() for c in password): + return False + if not any(c.islower() for c in password): + return False + if not any(c.isdigit() for c in password): + return False + if not any(c in "!@#$%^&*" for c in password): + return False + return True + + # if not validate_password(password): + # raise R2RException( + # f"Password must be at least 10 characters long and contain at least one uppercase letter, one lowercase letter, one digit, and one special character from '!@#$%^&*'.", + # 400, + # ) + + registration_response = await self.services.auth.register( + email=email, + password=password, + name=name, + bio=bio, + profile_picture=profile_picture, + ) + + return registration_response # type: ignore + + @self.router.post( + "/users/export", + summary="Export users to CSV", + dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient("http://localhost:7272") + # when using auth, do client.login(...) + + response = client.users.export( + output_path="export.csv", + columns=["id", "name", "created_at"], + include_header=True, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient("http://localhost:7272"); + + function main() { + await client.users.export({ + outputPath: "export.csv", + columns: ["id", "name", "created_at"], + includeHeader: true, + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "http://127.0.0.1:7272/v3/users/export" \ + -H "Authorization: Bearer YOUR_API_KEY" \ + -H "Content-Type: application/json" \ + -H "Accept: text/csv" \ + -d '{ "columns": ["id", "name", "created_at"], "include_header": true }' \ + --output export.csv + """), + }, + ] + }, + ) + @self.base_endpoint + async def export_users( + background_tasks: BackgroundTasks, + columns: Optional[list[str]] = Body( + None, description="Specific columns to export" + ), + filters: Optional[dict] = Body( + None, description="Filters to apply to the export" + ), + include_header: Optional[bool] = Body( + True, description="Whether to include column headers" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> FileResponse: + """Export users as a CSV file.""" + + if not auth_user.is_superuser: + raise R2RException( + status_code=403, + message="Only a superuser can export data.", + ) + + ( + csv_file_path, + temp_file, + ) = await self.services.management.export_users( + columns=columns, + filters=filters, + include_header=include_header + if include_header is not None + else True, + ) + + background_tasks.add_task(temp_file.close) + + return FileResponse( + path=csv_file_path, + media_type="text/csv", + filename="users_export.csv", + ) + + @self.router.post( + "/users/verify-email", + # dependencies=[Depends(self.rate_limit_dependency)], + response_model=WrappedGenericMessageResponse, + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + tokens = client.users.verify_email( + email="jane.doe@example.com", + verification_code="1lklwal!awdclm" + )"""), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.verifyEmail({ + email: jane.doe@example.com", + verificationCode: "1lklwal!awdclm" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/users/login" \\ + -H "Content-Type: application/x-www-form-urlencoded" \\ + -d "email=jane.doe@example.com&verification_code=1lklwal!awdclm" + """), + }, + ] + }, + ) + @self.base_endpoint + async def verify_email( + email: EmailStr = Body(..., description="User's email address"), + verification_code: str = Body( + ..., description="Email verification code" + ), + ) -> WrappedGenericMessageResponse: + """Verify a user's email address.""" + user = ( + await self.providers.database.users_handler.get_user_by_email( + email + ) + ) + if user and user.is_verified: + raise R2RException( + status_code=400, + message="This email is already verified. Please log in.", + ) + + result = await self.services.auth.verify_email( + email, verification_code + ) + return GenericMessageResponse(message=result["message"]) # type: ignore + + @self.router.post( + "/users/send-verification-email", + dependencies=[ + Depends(self.providers.auth.auth_wrapper(public=True)) + ], + response_model=WrappedGenericMessageResponse, + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + tokens = client.users.send_verification_email( + email="jane.doe@example.com", + )"""), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.sendVerificationEmail({ + email: jane.doe@example.com", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/users/send-verification-email" \\ + -H "Content-Type: application/x-www-form-urlencoded" \\ + -d "email=jane.doe@example.com" + """), + }, + ] + }, + ) + @self.base_endpoint + async def send_verification_email( + email: EmailStr = Body(..., description="User's email address"), + ) -> WrappedGenericMessageResponse: + """Send a user's email a verification code.""" + user = ( + await self.providers.database.users_handler.get_user_by_email( + email + ) + ) + if user and user.is_verified: + raise R2RException( + status_code=400, + message="This email is already verified. Please log in.", + ) + + await self.services.auth.send_verification_email(email=email) + return GenericMessageResponse( + message="A verification email has been sent." + ) # type: ignore + + @self.router.post( + "/users/login", + # dependencies=[Depends(self.rate_limit_dependency)], + response_model=WrappedTokenResponse, + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + tokens = client.users.login( + email="jane.doe@example.com", + password="secure_password123" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.login({ + email: jane.doe@example.com", + password: "secure_password123" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/users/login" \\ + -H "Content-Type: application/x-www-form-urlencoded" \\ + -d "username=jane.doe@example.com&password=secure_password123" + """), + }, + ] + }, + ) + @self.base_endpoint + async def login( + form_data: OAuth2PasswordRequestForm = Depends(), + ) -> WrappedLoginResponse: + """Authenticate a user and provide access tokens.""" + return await self.services.auth.login( # type: ignore + form_data.username, form_data.password + ) + + @self.router.post( + "/users/logout", + response_model=WrappedGenericMessageResponse, + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + result = client.users.logout() + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.logout(); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/users/logout" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def logout( + token: str = Depends(oauth2_scheme), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGenericMessageResponse: + """Log out the current user.""" + result = await self.services.auth.logout(token) + return GenericMessageResponse(message=result["message"]) # type: ignore + + @self.router.post( + "/users/refresh-token", + # dependencies=[Depends(self.rate_limit_dependency)], + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + new_tokens = client.users.refresh_token() + # New tokens are automatically stored in the client"""), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.refreshAccessToken(); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/users/refresh-token" \\ + -H "Content-Type: application/json" \\ + -d '{ + "refresh_token": "YOUR_REFRESH_TOKEN" + }'"""), + }, + ] + }, + ) + @self.base_endpoint + async def refresh_token( + refresh_token: str = Body(..., description="Refresh token"), + ) -> WrappedTokenResponse: + """Refresh the access token using a refresh token.""" + result = await self.services.auth.refresh_access_token( + refresh_token=refresh_token + ) + return result # type: ignore + + @self.router.post( + "/users/change-password", + dependencies=[Depends(self.rate_limit_dependency)], + response_model=WrappedGenericMessageResponse, + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + result = client.users.change_password( + current_password="old_password123", + new_password="new_secure_password456" + )"""), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.changePassword({ + currentPassword: "old_password123", + newPassword: "new_secure_password456" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/users/change-password" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -H "Content-Type: application/json" \\ + -d '{ + "current_password": "old_password123", + "new_password": "new_secure_password456" + }'"""), + }, + ] + }, + ) + @self.base_endpoint + async def change_password( + current_password: str = Body(..., description="Current password"), + new_password: str = Body(..., description="New password"), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedGenericMessageResponse: + """Change the authenticated user's password.""" + result = await self.services.auth.change_password( + auth_user, current_password, new_password + ) + return GenericMessageResponse(message=result["message"]) # type: ignore + + @self.router.post( + "/users/request-password-reset", + dependencies=[ + Depends(self.providers.auth.auth_wrapper(public=True)) + ], + response_model=WrappedGenericMessageResponse, + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + result = client.users.request_password_reset( + email="jane.doe@example.com" + )"""), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.requestPasswordReset({ + email: jane.doe@example.com", + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/users/request-password-reset" \\ + -H "Content-Type: application/json" \\ + -d '{ + "email": "jane.doe@example.com" + }'"""), + }, + ] + }, + ) + @self.base_endpoint + async def request_password_reset( + email: EmailStr = Body(..., description="User's email address"), + ) -> WrappedGenericMessageResponse: + """Request a password reset for a user.""" + result = await self.services.auth.request_password_reset(email) + return GenericMessageResponse(message=result["message"]) # type: ignore + + @self.router.post( + "/users/reset-password", + dependencies=[ + Depends(self.providers.auth.auth_wrapper(public=True)) + ], + response_model=WrappedGenericMessageResponse, + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + result = client.users.reset_password( + reset_token="reset_token_received_via_email", + new_password="new_secure_password789" + )"""), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.resetPassword({ + resestToken: "reset_token_received_via_email", + newPassword: "new_secure_password789" + }); + } + + main(); + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/v3/users/reset-password" \\ + -H "Content-Type: application/json" \\ + -d '{ + "reset_token": "reset_token_received_via_email", + "new_password": "new_secure_password789" + }'"""), + }, + ] + }, + ) + @self.base_endpoint + async def reset_password( + reset_token: str = Body(..., description="Password reset token"), + new_password: str = Body(..., description="New password"), + ) -> WrappedGenericMessageResponse: + """Reset a user's password using a reset token.""" + result = await self.services.auth.confirm_password_reset( + reset_token, new_password + ) + return GenericMessageResponse(message=result["message"]) # type: ignore + + @self.router.get( + "/users", + dependencies=[Depends(self.rate_limit_dependency)], + summary="List Users", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + # List users with filters + users = client.users.list( + offset=0, + limit=100, + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.list(); + } + + main(); + """), + }, + { + "lang": "Shell", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/users?offset=0&limit=100&username=john&email=john@example.com&is_active=true&is_superuser=false" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def list_users( + ids: list[str] = Query( + [], description="List of user IDs to filter by" + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedUsersResponse: + """List all users with pagination and filtering options. + + Only accessible by superusers. + """ + + if not auth_user.is_superuser: + raise R2RException( + status_code=403, + message="Only a superuser can call the `users_overview` endpoint.", + ) + + user_uuids = [UUID(user_id) for user_id in ids] + + users_overview_response = ( + await self.services.management.users_overview( + user_ids=user_uuids, offset=offset, limit=limit + ) + ) + return users_overview_response["results"], { # type: ignore + "total_entries": users_overview_response["total_entries"] + } + + @self.router.get( + "/users/me", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Get the Current User", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + # Get user details + users = client.users.me() + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.retrieve(); + } + + main(); + """), + }, + { + "lang": "Shell", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/users/me" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_current_user( + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedUserResponse: + """Get detailed information about the currently authenticated + user.""" + return auth_user + + @self.router.get( + "/users/{id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Get User Details", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + # Get user details + users = client.users.retrieve( + id="b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.retrieve({ + id: "b4ac4dd6-5f27-596e-a55b-7cf242ca30aa" + }); + } + + main(); + """), + }, + { + "lang": "Shell", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/users/550e8400-e29b-41d4-a716-446655440000" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_user( + id: UUID = Path( + ..., example="550e8400-e29b-41d4-a716-446655440000" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedUserResponse: + """Get detailed information about a specific user. + + Users can only access their own information unless they are + superusers. + """ + if not auth_user.is_superuser and auth_user.id != id: + raise R2RException( + "Only a superuser can call the get `user` endpoint for other users.", + 403, + ) + + users_overview_response = ( + await self.services.management.users_overview( + offset=0, + limit=1, + user_ids=[id], + ) + ) + + return users_overview_response["results"][0] + + @self.router.delete( + "/users/{id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Delete User", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + # Delete user + client.users.delete(id="550e8400-e29b-41d4-a716-446655440000", password="secure_password123") + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.delete({ + id: "550e8400-e29b-41d4-a716-446655440000", + password: "secure_password123" + }); + } + + main(); + """), + }, + ] + }, + ) + @self.base_endpoint + async def delete_user( + id: UUID = Path( + ..., example="550e8400-e29b-41d4-a716-446655440000" + ), + password: Optional[str] = Body( + None, description="User's current password" + ), + delete_vector_data: Optional[bool] = Body( + False, + description="Whether to delete the user's vector data", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Delete a specific user. + + Users can only delete their own account unless they are superusers. + """ + if not auth_user.is_superuser and auth_user.id != id: + raise R2RException( + "Only a superuser can delete other users.", + 403, + ) + + await self.services.auth.delete_user( + user_id=id, + password=password, + delete_vector_data=delete_vector_data or False, + is_superuser=auth_user.is_superuser, + ) + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.get( + "/users/{id}/collections", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Get User Collections", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + # Get user collections + collections = client.user.list_collections( + "550e8400-e29b-41d4-a716-446655440000", + offset=0, + limit=100 + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.listCollections({ + id: "550e8400-e29b-41d4-a716-446655440000", + offset: 0, + limit: 100 + }); + } + + main(); + """), + }, + { + "lang": "Shell", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/users/550e8400-e29b-41d4-a716-446655440000/collections?offset=0&limit=100" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def get_user_collections( + id: UUID = Path( + ..., example="550e8400-e29b-41d4-a716-446655440000" + ), + offset: int = Query( + 0, + ge=0, + description="Specifies the number of objects to skip. Defaults to 0.", + ), + limit: int = Query( + 100, + ge=1, + le=1000, + description="Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.", + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedCollectionsResponse: + """Get all collections associated with a specific user. + + Users can only access their own collections unless they are + superusers. + """ + if auth_user.id != id and not auth_user.is_superuser: + raise R2RException( + "The currently authenticated user does not have access to the specified collection.", + 403, + ) + user_collection_response = ( + await self.services.management.collections_overview( + offset=offset, + limit=limit, + user_ids=[id], + ) + ) + return user_collection_response["results"], { # type: ignore + "total_entries": user_collection_response["total_entries"] + } + + @self.router.post( + "/users/{id}/collections/{collection_id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Add User to Collection", + response_model=WrappedBooleanResponse, + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + # Add user to collection + client.users.add_to_collection( + id="550e8400-e29b-41d4-a716-446655440000", + collection_id="750e8400-e29b-41d4-a716-446655440000" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.addToCollection({ + id: "550e8400-e29b-41d4-a716-446655440000", + collectionId: "750e8400-e29b-41d4-a716-446655440000" + }); + } + + main(); + """), + }, + { + "lang": "Shell", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/users/550e8400-e29b-41d4-a716-446655440000/collections/750e8400-e29b-41d4-a716-446655440000" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def add_user_to_collection( + id: UUID = Path( + ..., example="550e8400-e29b-41d4-a716-446655440000" + ), + collection_id: UUID = Path( + ..., example="750e8400-e29b-41d4-a716-446655440000" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + if auth_user.id != id and not auth_user.is_superuser: + raise R2RException( + "The currently authenticated user does not have access to the specified collection.", + 403, + ) + + # TODO - Do we need a check on user access to the collection? + await self.services.management.add_user_to_collection( # type: ignore + id, collection_id + ) + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.delete( + "/users/{id}/collections/{collection_id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Remove User from Collection", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + # Remove user from collection + client.users.remove_from_collection( + id="550e8400-e29b-41d4-a716-446655440000", + collection_id="750e8400-e29b-41d4-a716-446655440000" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.removeFromCollection({ + id: "550e8400-e29b-41d4-a716-446655440000", + collectionId: "750e8400-e29b-41d4-a716-446655440000" + }); + } + + main(); + """), + }, + { + "lang": "Shell", + "source": textwrap.dedent(""" + curl -X DELETE "https://api.example.com/users/550e8400-e29b-41d4-a716-446655440000/collections/750e8400-e29b-41d4-a716-446655440000" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """), + }, + ] + }, + ) + @self.base_endpoint + async def remove_user_from_collection( + id: UUID = Path( + ..., example="550e8400-e29b-41d4-a716-446655440000" + ), + collection_id: UUID = Path( + ..., example="750e8400-e29b-41d4-a716-446655440000" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Remove a user from a collection. + + Requires either superuser status or access to the collection. + """ + if auth_user.id != id and not auth_user.is_superuser: + raise R2RException( + "The currently authenticated user does not have access to the specified collection.", + 403, + ) + + # TODO - Do we need a check on user access to the collection? + await self.services.management.remove_user_from_collection( # type: ignore + id, collection_id + ) + return GenericBooleanResponse(success=True) # type: ignore + + @self.router.post( + "/users/{id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Update User", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + # Update user + updated_user = client.update_user( + "550e8400-e29b-41d4-a716-446655440000", + name="John Doe" + ) + """), + }, + { + "lang": "JavaScript", + "source": textwrap.dedent(""" + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + + function main() { + const response = await client.users.update({ + id: "550e8400-e29b-41d4-a716-446655440000", + name: "John Doe" + }); + } + + main(); + """), + }, + { + "lang": "Shell", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/users/550e8400-e29b-41d4-a716-446655440000" \\ + -H "Authorization: Bearer YOUR_API_KEY" \\ + -H "Content-Type: application/json" \\ + -d '{ + "id": "550e8400-e29b-41d4-a716-446655440000", + "name": "John Doe", + }' + """), + }, + ] + }, + ) + # TODO - Modify update user to have synced params with user object + @self.base_endpoint + async def update_user( + id: UUID = Path(..., description="ID of the user to update"), + email: EmailStr | None = Body( + None, description="Updated email address" + ), + is_superuser: bool | None = Body( + None, description="Updated superuser status" + ), + name: str | None = Body(None, description="Updated user name"), + bio: str | None = Body(None, description="Updated user bio"), + profile_picture: str | None = Body( + None, description="Updated profile picture URL" + ), + limits_overrides: dict = Body( + None, + description="Updated limits overrides", + ), + metadata: dict[str, str | None] | None = None, + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedUserResponse: + """Update user information. + + Users can only update their own information unless they are + superusers. Superuser status can only be modified by existing + superusers. + """ + + if is_superuser is not None and not auth_user.is_superuser: + raise R2RException( + "Only superusers can update the superuser status of a user", + 403, + ) + + if not auth_user.is_superuser and auth_user.id != id: + raise R2RException( + "Only superusers can update other users' information", + 403, + ) + + if not auth_user.is_superuser and limits_overrides is not None: + raise R2RException( + "Only superusers can update other users' limits overrides", + 403, + ) + + # Pass `metadata` to our auth or management service so it can do a + # partial (Stripe-like) merge of metadata. + return await self.services.auth.update_user( # type: ignore + user_id=id, + email=email, + is_superuser=is_superuser, + name=name, + bio=bio, + profile_picture=profile_picture, + limits_overrides=limits_overrides, + new_metadata=metadata, + ) + + @self.router.post( + "/users/{id}/api-keys", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Create User API Key", + response_model=WrappedAPIKeyResponse, + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + result = client.users.create_api_key( + id="550e8400-e29b-41d4-a716-446655440000", + name="My API Key", + description="API key for accessing the app", + ) + # result["api_key"] contains the newly created API key + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X POST "https://api.example.com/users/550e8400-e29b-41d4-a716-446655440000/api-keys" \\ + -H "Authorization: Bearer YOUR_API_TOKEN" \\ + -d '{"name": "My API Key", "description": "API key for accessing the app"}' + """), + }, + ] + }, + ) + @self.base_endpoint + async def create_user_api_key( + id: UUID = Path( + ..., description="ID of the user for whom to create an API key" + ), + name: Optional[str] = Body( + None, description="Name of the API key" + ), + description: Optional[str] = Body( + None, description="Description of the API key" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedAPIKeyResponse: + """Create a new API key for the specified user. + + Only superusers or the user themselves may create an API key. + """ + if auth_user.id != id and not auth_user.is_superuser: + raise R2RException( + "Only the user themselves or a superuser can create API keys for this user.", + 403, + ) + + api_key = await self.services.auth.create_user_api_key( + id, name=name, description=description + ) + return api_key # type: ignore + + @self.router.get( + "/users/{id}/api-keys", + dependencies=[Depends(self.rate_limit_dependency)], + summary="List User API Keys", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + keys = client.users.list_api_keys( + id="550e8400-e29b-41d4-a716-446655440000" + ) + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X GET "https://api.example.com/users/550e8400-e29b-41d4-a716-446655440000/api-keys" \\ + -H "Authorization: Bearer YOUR_API_TOKEN" + """), + }, + ] + }, + ) + @self.base_endpoint + async def list_user_api_keys( + id: UUID = Path( + ..., description="ID of the user whose API keys to list" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedAPIKeysResponse: + """List all API keys for the specified user. + + Only superusers or the user themselves may list the API keys. + """ + if auth_user.id != id and not auth_user.is_superuser: + raise R2RException( + "Only the user themselves or a superuser can list API keys for this user.", + 403, + ) + + keys = ( + await self.providers.database.users_handler.get_user_api_keys( + id + ) + ) + return keys, {"total_entries": len(keys)} # type: ignore + + @self.router.delete( + "/users/{id}/api-keys/{key_id}", + dependencies=[Depends(self.rate_limit_dependency)], + summary="Delete User API Key", + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": textwrap.dedent(""" + from r2r import R2RClient + from uuid import UUID + + client = R2RClient() + # client.login(...) + + response = client.users.delete_api_key( + id="550e8400-e29b-41d4-a716-446655440000", + key_id="d9c562d4-3aef-43e8-8f08-0cf7cd5e0a25" + ) + """), + }, + { + "lang": "cURL", + "source": textwrap.dedent(""" + curl -X DELETE "https://api.example.com/users/550e8400-e29b-41d4-a716-446655440000/api-keys/d9c562d4-3aef-43e8-8f08-0cf7cd5e0a25" \\ + -H "Authorization: Bearer YOUR_API_TOKEN" + """), + }, + ] + }, + ) + @self.base_endpoint + async def delete_user_api_key( + id: UUID = Path(..., description="ID of the user"), + key_id: UUID = Path( + ..., description="ID of the API key to delete" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedBooleanResponse: + """Delete a specific API key for the specified user. + + Only superusers or the user themselves may delete the API key. + """ + if auth_user.id != id and not auth_user.is_superuser: + raise R2RException( + "Only the user themselves or a superuser can delete this API key.", + 403, + ) + + success = ( + await self.providers.database.users_handler.delete_api_key( + id, key_id + ) + ) + if not success: + raise R2RException( + "API key not found or could not be deleted", 400 + ) + return {"success": True} # type: ignore + + @self.router.get( + "/users/{id}/limits", + summary="Fetch User Limits", + responses={ + 200: { + "description": "Returns system default limits, user overrides, and final effective settings." + }, + 403: { + "description": "If the requesting user is neither the same user nor a superuser." + }, + 404: {"description": "If the user ID does not exist."}, + }, + openapi_extra={ + "x-codeSamples": [ + { + "lang": "Python", + "source": """ + from r2r import R2RClient + + client = R2RClient() + # client.login(...) + + user_limits = client.users.get_limits("550e8400-e29b-41d4-a716-446655440000") + """, + }, + { + "lang": "JavaScript", + "source": """ + const { r2rClient } = require("r2r-js"); + + const client = new r2rClient(); + // await client.users.login(...) + + async function main() { + const userLimits = await client.users.getLimits({ + id: "550e8400-e29b-41d4-a716-446655440000" + }); + console.log(userLimits); + } + + main(); + """, + }, + { + "lang": "cURL", + "source": """ + curl -X GET "https://api.example.com/v3/users/550e8400-e29b-41d4-a716-446655440000/limits" \\ + -H "Authorization: Bearer YOUR_API_KEY" + """, + }, + ] + }, + ) + @self.base_endpoint + async def get_user_limits( + id: UUID = Path( + ..., description="ID of the user to fetch limits for" + ), + auth_user=Depends(self.providers.auth.auth_wrapper()), + ) -> WrappedLimitsResponse: + """Return the system default limits, user-level overrides, and + final "effective" limit settings for the specified user. + + Only superusers or the user themself may fetch these values. + """ + if (auth_user.id != id) and (not auth_user.is_superuser): + raise R2RException( + "Only the user themselves or a superuser can view these limits.", + status_code=403, + ) + + # This calls the new helper you created in ManagementService + limits_info = await self.services.management.get_all_user_limits( + id + ) + return limits_info # type: ignore + + @self.router.get("/users/oauth/google/authorize") + @self.base_endpoint + async def google_authorize() -> WrappedGenericMessageResponse: + """Redirect user to Google's OAuth 2.0 consent screen.""" + state = "some_random_string_or_csrf_token" # Usually you store a random state in session/Redis + scope = "openid email profile" + + # Build the Google OAuth URL + params = { + "client_id": self.google_client_id, + "redirect_uri": self.google_redirect_uri, + "response_type": "code", + "scope": scope, + "state": state, + "access_type": "offline", # to get refresh token if needed + "prompt": "consent", # Force consent each time if you want + } + google_auth_url = f"https://accounts.google.com/o/oauth2/v2/auth?{urllib.parse.urlencode(params)}" + return GenericMessageResponse(message=google_auth_url) # type: ignore + + @self.router.get("/users/oauth/google/callback") + @self.base_endpoint + async def google_callback( + code: str = Query(...), state: str = Query(...) + ) -> WrappedLoginResponse: + """Google's callback that will receive the `code` and `state`. + + We then exchange code for tokens, verify, and log the user in. + """ + # 1. Exchange `code` for tokens + token_data = requests.post( + "https://oauth2.googleapis.com/token", + data={ + "code": code, + "client_id": self.google_client_id, + "client_secret": self.google_client_secret, + "redirect_uri": self.google_redirect_uri, + "grant_type": "authorization_code", + }, + ).json() + if "error" in token_data: + raise HTTPException( + status_code=400, + detail=f"Failed to get token: {token_data}", + ) + + # 2. Verify the ID token + id_token_str = token_data["id_token"] + try: + # google_auth.transport.requests.Request() is a session for verifying + id_info = id_token.verify_oauth2_token( + id_token_str, + google_requests.Request(), + self.google_client_id, + ) + except ValueError as e: + raise HTTPException( + status_code=400, + detail=f"Token verification failed: {str(e)}", + ) from e + + # id_info will contain "sub", "email", etc. + google_id = id_info["sub"] + email = id_info.get("email") + email = email or f"{google_id}@google_oauth.fake" + + # 3. Now call our R2RAuthProvider method that handles "oauth-based" user creation or login + return await self.providers.auth.oauth_callback_handler( # type: ignore + provider="google", + oauth_id=google_id, + email=email, + ) + + @self.router.get("/users/oauth/github/authorize") + @self.base_endpoint + async def github_authorize() -> WrappedGenericMessageResponse: + """Redirect user to GitHub's OAuth consent screen.""" + state = "some_random_string_or_csrf_token" + scope = "read:user user:email" + + params = { + "client_id": self.github_client_id, + "redirect_uri": self.github_redirect_uri, + "scope": scope, + "state": state, + } + github_auth_url = f"https://github.com/login/oauth/authorize?{urllib.parse.urlencode(params)}" + return GenericMessageResponse(message=github_auth_url) # type: ignore + + @self.router.get("/users/oauth/github/callback") + @self.base_endpoint + async def github_callback( + code: str = Query(...), state: str = Query(...) + ) -> WrappedLoginResponse: + """GitHub callback route to exchange code for an access_token, then + fetch user info from GitHub's API, then do the same 'oauth-based' + login or registration.""" + # 1. Exchange code for access_token + token_resp = requests.post( + "https://github.com/login/oauth/access_token", + data={ + "client_id": self.github_client_id, + "client_secret": self.github_client_secret, + "code": code, + "redirect_uri": self.github_redirect_uri, + "state": state, + }, + headers={"Accept": "application/json"}, + ) + token_data = token_resp.json() + if "error" in token_data: + raise HTTPException( + status_code=400, + detail=f"Failed to get token: {token_data}", + ) + access_token = token_data["access_token"] + + # 2. Use the access_token to fetch user info + user_info_resp = requests.get( + "https://api.github.com/user", + headers={"Authorization": f"Bearer {access_token}"}, + ).json() + + github_id = str( + user_info_resp["id"] + ) # GitHub user ID is typically an integer + # fetch email (sometimes you need to call /user/emails endpoint if user sets email private) + email = user_info_resp.get("email") + email = email or f"{github_id}@github_oauth.fake" + # 3. Pass to your auth provider + return await self.providers.auth.oauth_callback_handler( # type: ignore + provider="github", + oauth_id=github_id, + email=email, + ) |
