about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sdk/sync_methods/documents.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sdk/sync_methods/documents.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sdk/sync_methods/documents.py761
1 files changed, 761 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sdk/sync_methods/documents.py b/.venv/lib/python3.12/site-packages/sdk/sync_methods/documents.py
new file mode 100644
index 00000000..0c9ad3bb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sdk/sync_methods/documents.py
@@ -0,0 +1,761 @@
+import json
+import os
+import tempfile
+import uuid
+from datetime import datetime
+from io import BytesIO
+from pathlib import Path
+from typing import Any, Optional
+from urllib.parse import urlparse
+from uuid import UUID
+
+import requests
+
+from shared.api.models import (
+    WrappedBooleanResponse,
+    WrappedChunksResponse,
+    WrappedCollectionsResponse,
+    WrappedDocumentResponse,
+    WrappedDocumentSearchResponse,
+    WrappedDocumentsResponse,
+    WrappedEntitiesResponse,
+    WrappedGenericMessageResponse,
+    WrappedIngestionResponse,
+    WrappedRelationshipsResponse,
+)
+
+from ..models import IngestionMode, SearchMode, SearchSettings
+
+
+class DocumentsSDK:
+    """SDK for interacting with documents in the v3 API."""
+
+    def __init__(self, client):
+        self.client = client
+
+    def create(
+        self,
+        file_path: Optional[str] = None,
+        raw_text: Optional[str] = None,
+        chunks: Optional[list[str]] = None,
+        id: Optional[str | UUID] = None,
+        ingestion_mode: Optional[str] = None,
+        collection_ids: Optional[list[str | UUID]] = None,
+        metadata: Optional[dict] = None,
+        ingestion_config: Optional[dict | IngestionMode] = None,
+        run_with_orchestration: Optional[bool] = True,
+    ) -> WrappedIngestionResponse:
+        """Create a new document from either a file or content.
+
+        Args:
+            file_path (Optional[str]): The file to upload, if any
+            content (Optional[str]): Optional text content to upload, if no file path is provided
+            id (Optional[str | UUID]): Optional ID to assign to the document
+            collection_ids (Optional[list[str | UUID]]): Collection IDs to associate with the document. If none are provided, the document will be assigned to the user's default collection.
+            metadata (Optional[dict]): Optional metadata to assign to the document
+            ingestion_config (Optional[dict]): Optional ingestion configuration to use
+            run_with_orchestration (Optional[bool]): Whether to run with orchestration
+
+        Returns:
+            WrappedIngestionResponse
+        """
+        if not file_path and not raw_text and not chunks:
+            raise ValueError(
+                "Either `file_path`, `raw_text` or `chunks` must be provided"
+            )
+        if (
+            (file_path and raw_text)
+            or (file_path and chunks)
+            or (raw_text and chunks)
+        ):
+            raise ValueError(
+                "Only one of `file_path`, `raw_text` or `chunks` may be provided"
+            )
+
+        data: dict[str, Any] = {}
+        files = None
+
+        if id:
+            data["id"] = str(id)
+        if metadata:
+            data["metadata"] = json.dumps(metadata)
+        if ingestion_config:
+            if isinstance(ingestion_config, IngestionMode):
+                ingestion_config = {"mode": ingestion_config.value}
+            app_config: dict[str, Any] = (
+                {}
+                if isinstance(ingestion_config, dict)
+                else ingestion_config["app"]
+            )
+            ingestion_config = dict(ingestion_config)
+            ingestion_config["app"] = app_config
+            data["ingestion_config"] = json.dumps(ingestion_config)
+        if collection_ids:
+            collection_ids = [
+                str(collection_id) for collection_id in collection_ids
+            ]  # type: ignore
+            data["collection_ids"] = json.dumps(collection_ids)
+        if run_with_orchestration is not None:
+            data["run_with_orchestration"] = str(run_with_orchestration)
+        if ingestion_mode is not None:
+            data["ingestion_mode"] = ingestion_mode
+        if file_path:
+            # Create a new file instance that will remain open during the request
+            file_instance = open(file_path, "rb")
+            files = [
+                (
+                    "file",
+                    (file_path, file_instance, "application/octet-stream"),
+                )
+            ]
+            try:
+                response_dict = self.client._make_request(
+                    "POST",
+                    "documents",
+                    data=data,
+                    files=files,
+                    version="v3",
+                )
+            finally:
+                # Ensure we close the file after the request is complete
+                file_instance.close()
+        elif raw_text:
+            data["raw_text"] = raw_text  # type: ignore
+            response_dict = self.client._make_request(
+                "POST",
+                "documents",
+                data=data,
+                version="v3",
+            )
+        else:
+            data["chunks"] = json.dumps(chunks)
+            response_dict = self.client._make_request(
+                "POST",
+                "documents",
+                data=data,
+                version="v3",
+            )
+
+        return WrappedIngestionResponse(**response_dict)
+
+    def append_metadata(
+        self,
+        id: str | UUID,
+        metadata: list[dict],
+    ) -> WrappedDocumentResponse:
+        """Append metadata to a document.
+
+        Args:
+            id (str | UUID): ID of document to append metadata to
+            metadata (list[dict]): Metadata to append
+
+        Returns:
+            WrappedDocumentResponse
+        """
+        data = json.dumps(metadata)
+        response_dict = self.client._make_request(
+            "PATCH",
+            f"documents/{str(id)}/metadata",
+            data=data,
+            version="v3",
+        )
+
+        return WrappedDocumentResponse(**response_dict)
+
+    def replace_metadata(
+        self,
+        id: str | UUID,
+        metadata: list[dict],
+    ) -> WrappedDocumentResponse:
+        """Replace metadata for a document.
+
+        Args:
+            id (str | UUID): ID of document to replace metadata for
+            metadata (list[dict]): The metadata that will replace the existing metadata
+        """
+        data = json.dumps(metadata)
+        response_dict = self.client._make_request(
+            "PUT",
+            f"documents/{str(id)}/metadata",
+            data=data,
+            version="v3",
+        )
+
+        return WrappedDocumentResponse(**response_dict)
+
+    def retrieve(
+        self,
+        id: str | UUID,
+    ) -> WrappedDocumentResponse:
+        """Get a specific document by ID.
+
+        Args:
+            id (str | UUID): ID of document to retrieve
+
+        Returns:
+            WrappedDocumentResponse
+        """
+        response_dict = self.client._make_request(
+            "GET",
+            f"documents/{str(id)}",
+            version="v3",
+        )
+
+        return WrappedDocumentResponse(**response_dict)
+
+    def download(
+        self,
+        id: str | UUID,
+    ) -> BytesIO:
+        response = self.client._make_request(
+            "GET",
+            f"documents/{str(id)}/download",
+            version="v3",
+        )
+        if not isinstance(response, BytesIO):
+            raise ValueError("Expected BytesIO response")
+        return response
+
+    def download_zip(
+        self,
+        document_ids: Optional[list[str | UUID]] = None,
+        start_date: Optional[datetime] = None,
+        end_date: Optional[datetime] = None,
+        output_path: Optional[str | Path] = None,
+    ) -> BytesIO | None:
+        """Download multiple documents as a zip file."""
+        params: dict[str, Any] = {}
+        if document_ids:
+            params["document_ids"] = [str(doc_id) for doc_id in document_ids]
+        if start_date:
+            params["start_date"] = start_date.isoformat()
+        if end_date:
+            params["end_date"] = end_date.isoformat()
+
+        response = self.client._make_request(
+            "GET",
+            "documents/download_zip",
+            params=params,
+            version="v3",
+        )
+
+        if not isinstance(response, BytesIO):
+            raise ValueError("Expected BytesIO response")
+
+        if output_path:
+            output_path = (
+                Path(output_path)
+                if isinstance(output_path, str)
+                else output_path
+            )
+            with open(output_path, "wb") as f:
+                f.write(response.getvalue())
+            return None
+
+        return response
+
+    def export(
+        self,
+        output_path: str | Path,
+        columns: Optional[list[str]] = None,
+        filters: Optional[dict] = None,
+        include_header: bool = True,
+    ) -> None:
+        """Export documents to a CSV file, streaming the results directly to
+        disk.
+
+        Args:
+            output_path (str | Path): Local path where the CSV file should be saved
+            columns (Optional[list[str]]): Specific columns to export. If None, exports default columns
+            filters (Optional[dict]): Optional filters to apply when selecting documents
+            include_header (bool): Whether to include column headers in the CSV (default: True)
+
+        Returns:
+            None
+        """
+        # Convert path to string if it's a Path object
+        output_path = (
+            str(output_path) if isinstance(output_path, Path) else output_path
+        )
+
+        data: dict[str, Any] = {"include_header": include_header}
+        if columns:
+            data["columns"] = columns
+        if filters:
+            data["filters"] = filters
+
+        # Stream response directly to file
+        with open(output_path, "wb") as f:
+            response = self.client.client.post(
+                f"{self.client.base_url}/v3/documents/export",
+                json=data,
+                headers={
+                    "Accept": "text/csv",
+                    **self.client._get_auth_header(),
+                },
+            )
+            if response.status_code != 200:
+                raise ValueError(
+                    f"Export failed with status {response.status_code}",
+                    response,
+                )
+
+            for chunk in response.iter_bytes():
+                if chunk:
+                    f.write(chunk)
+
+    def export_entities(
+        self,
+        id: str | UUID,
+        output_path: str | Path,
+        columns: Optional[list[str]] = None,
+        filters: Optional[dict] = None,
+        include_header: bool = True,
+    ) -> None:
+        """Export documents to a CSV file, streaming the results directly to
+        disk.
+
+        Args:
+            output_path (str | Path): Local path where the CSV file should be saved
+            columns (Optional[list[str]]): Specific columns to export. If None, exports default columns
+            filters (Optional[dict]): Optional filters to apply when selecting documents
+            include_header (bool): Whether to include column headers in the CSV (default: True)
+
+        Returns:
+            None
+        """
+        # Convert path to string if it's a Path object
+        output_path = (
+            str(output_path) if isinstance(output_path, Path) else output_path
+        )
+
+        # Prepare request data
+        data: dict[str, Any] = {"include_header": include_header}
+        if columns:
+            data["columns"] = columns
+        if filters:
+            data["filters"] = filters
+
+        # Stream response directly to file
+        with open(output_path, "wb") as f:
+            response = self.client.client.post(
+                f"{self.client.base_url}/v3/documents/{str(id)}/entities/export",
+                json=data,
+                headers={
+                    "Accept": "text/csv",
+                    **self.client._get_auth_header(),
+                },
+            )
+            if response.status_code != 200:
+                raise ValueError(
+                    f"Export failed with status {response.status_code}",
+                    response,
+                )
+
+            for chunk in response.iter_bytes():
+                if chunk:
+                    f.write(chunk)
+
+    def export_relationships(
+        self,
+        id: str | UUID,
+        output_path: str | Path,
+        columns: Optional[list[str]] = None,
+        filters: Optional[dict] = None,
+        include_header: bool = True,
+    ) -> None:
+        """Export document relationships to a CSV file, streaming the results
+        directly to disk.
+
+        Args:
+            output_path (str | Path): Local path where the CSV file should be saved
+            columns (Optional[list[str]]): Specific columns to export. If None, exports default columns
+            filters (Optional[dict]): Optional filters to apply when selecting documents
+            include_header (bool): Whether to include column headers in the CSV (default: True)
+
+        Returns:
+            None
+        """
+        # Convert path to string if it's a Path object
+        output_path = (
+            str(output_path) if isinstance(output_path, Path) else output_path
+        )
+
+        # Prepare request data
+        data: dict[str, Any] = {"include_header": include_header}
+        if columns:
+            data["columns"] = columns
+        if filters:
+            data["filters"] = filters
+
+        # Stream response directly to file
+        with open(output_path, "wb") as f:
+            response = self.client.client.post(
+                f"{self.client.base_url}/v3/documents/{str(id)}/relationships/export",
+                json=data,
+                headers={
+                    "Accept": "text/csv",
+                    **self.client._get_auth_header(),
+                },
+            )
+            if response.status_code != 200:
+                raise ValueError(
+                    f"Export failed with status {response.status_code}",
+                    response,
+                )
+
+            for chunk in response.iter_bytes():
+                if chunk:
+                    f.write(chunk)
+
+    def delete(
+        self,
+        id: str | UUID,
+    ) -> WrappedBooleanResponse:
+        """Delete a specific document.
+
+        Args:
+            id (str | UUID): ID of document to delete
+
+        Returns:
+            WrappedBooleanResponse
+        """
+        response_dict = self.client._make_request(
+            "DELETE",
+            f"documents/{str(id)}",
+            version="v3",
+        )
+
+        return WrappedBooleanResponse(**response_dict)
+
+    def list_chunks(
+        self,
+        id: str | UUID,
+        include_vectors: Optional[bool] = False,
+        offset: Optional[int] = 0,
+        limit: Optional[int] = 100,
+    ) -> WrappedChunksResponse:
+        """Get chunks for a specific document.
+
+        Args:
+            id (str | UUID): ID of document to retrieve chunks for
+            include_vectors (Optional[bool]): Whether to include vector embeddings in the response
+            offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
+            limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
+
+        Returns:
+            WrappedChunksResponse
+        """
+        params = {
+            "offset": offset,
+            "limit": limit,
+            "include_vectors": include_vectors,
+        }
+        response_dict = self.client._make_request(
+            "GET",
+            f"documents/{str(id)}/chunks",
+            params=params,
+            version="v3",
+        )
+
+        return WrappedChunksResponse(**response_dict)
+
+    def list_collections(
+        self,
+        id: str | UUID,
+        include_vectors: Optional[bool] = False,
+        offset: Optional[int] = 0,
+        limit: Optional[int] = 100,
+    ) -> WrappedCollectionsResponse:
+        """List collections for a specific document.
+
+        Args:
+            id (str | UUID): ID of document to retrieve collections for
+            offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
+            limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
+
+        Returns:
+            WrappedCollectionsResponse
+        """
+        params = {
+            "offset": offset,
+            "limit": limit,
+        }
+
+        response_dict = self.client._make_request(
+            "GET",
+            f"documents/{str(id)}/collections",
+            params=params,
+            version="v3",
+        )
+
+        return WrappedCollectionsResponse(**response_dict)
+
+    def delete_by_filter(
+        self,
+        filters: dict,
+    ) -> WrappedBooleanResponse:
+        """Delete documents based on filters.
+
+        Args:
+            filters (dict): Filters to apply when selecting documents to delete
+
+        Returns:
+            WrappedBooleanResponse
+        """
+        filters_json = json.dumps(filters)
+        response_dict = self.client._make_request(
+            "DELETE",
+            "documents/by-filter",
+            data=filters_json,
+            version="v3",
+        )
+
+        return WrappedBooleanResponse(**response_dict)
+
+    def extract(
+        self,
+        id: str | UUID,
+        settings: Optional[dict] = None,
+        run_with_orchestration: Optional[bool] = True,
+    ) -> WrappedGenericMessageResponse:
+        """Extract entities and relationships from a document.
+
+        Args:
+            id (str, UUID): ID of document to extract from
+            settings (Optional[dict]): Settings for extraction process
+            run_with_orchestration (Optional[bool]): Whether to run with orchestration
+
+        Returns:
+            WrappedGenericMessageResponse
+        """
+        data: dict[str, Any] = {}
+        if settings:
+            data["settings"] = json.dumps(settings)
+        if run_with_orchestration is not None:
+            data["run_with_orchestration"] = str(run_with_orchestration)
+
+        response_dict = self.client._make_request(
+            "POST",
+            f"documents/{str(id)}/extract",
+            params=data,
+            version="v3",
+        )
+        return WrappedGenericMessageResponse(**response_dict)
+
+    def list_entities(
+        self,
+        id: str | UUID,
+        offset: Optional[int] = 0,
+        limit: Optional[int] = 100,
+        include_embeddings: Optional[bool] = False,
+    ) -> WrappedEntitiesResponse:
+        """List entities extracted from a document.
+
+        Args:
+            id (str | UUID): ID of document to get entities from
+            offset (Optional[int]): Number of items to skip
+            limit (Optional[int]): Max number of items to return
+            include_embeddings (Optional[bool]): Whether to include embeddings
+
+        Returns:
+            WrappedEntitiesResponse
+        """
+        params = {
+            "offset": offset,
+            "limit": limit,
+            "include_embeddings": include_embeddings,
+        }
+        response_dict = self.client._make_request(
+            "GET",
+            f"documents/{str(id)}/entities",
+            params=params,
+            version="v3",
+        )
+
+        return WrappedEntitiesResponse(**response_dict)
+
+    def list_relationships(
+        self,
+        id: str | UUID,
+        offset: Optional[int] = 0,
+        limit: Optional[int] = 100,
+        entity_names: Optional[list[str]] = None,
+        relationship_types: Optional[list[str]] = None,
+    ) -> WrappedRelationshipsResponse:
+        """List relationships extracted from a document.
+
+        Args:
+            id (str | UUID): ID of document to get relationships from
+            offset (Optional[int]): Number of items to skip
+            limit (Optional[int]): Max number of items to return
+            entity_names (Optional[list[str]]): Filter by entity names
+            relationship_types (Optional[list[str]]): Filter by relationship types
+
+        Returns:
+            WrappedRelationshipsResponse
+        """
+        params: dict[str, Any] = {
+            "offset": offset,
+            "limit": limit,
+        }
+        if entity_names:
+            params["entity_names"] = entity_names
+        if relationship_types:
+            params["relationship_types"] = relationship_types
+
+        response_dict = self.client._make_request(
+            "GET",
+            f"documents/{str(id)}/relationships",
+            params=params,
+            version="v3",
+        )
+
+        return WrappedRelationshipsResponse(**response_dict)
+
+    def list(
+        self,
+        ids: Optional[list[str | UUID]] = None,
+        offset: Optional[int] = 0,
+        limit: Optional[int] = 100,
+    ) -> WrappedDocumentsResponse:
+        """List documents with pagination.
+
+        Args:
+            ids (Optional[list[str | UUID]]): Optional list of document IDs to filter by
+            offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
+            limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
+
+        Returns:
+            WrappedDocumentsResponse
+        """
+        params = {
+            "offset": offset,
+            "limit": limit,
+        }
+        if ids:
+            params["ids"] = [str(doc_id) for doc_id in ids]  # type: ignore
+
+        response_dict = self.client._make_request(
+            "GET",
+            "documents",
+            params=params,
+            version="v3",
+        )
+
+        return WrappedDocumentsResponse(**response_dict)
+
+    def search(
+        self,
+        query: str,
+        search_mode: Optional[str | SearchMode] = "custom",
+        search_settings: Optional[dict | SearchSettings] = None,
+    ) -> WrappedDocumentSearchResponse:
+        """Conduct a vector and/or graph search.
+
+        Args:
+            query (str): The query to search for.
+            search_settings (Optional[dict, SearchSettings]]): Vector search settings.
+
+        Returns:
+            WrappedDocumentSearchResponse
+        """
+        if search_settings and not isinstance(search_settings, dict):
+            search_settings = search_settings.model_dump()
+        data: dict[str, Any] = {
+            "query": query,
+            "search_settings": search_settings,
+        }
+        if search_mode:
+            data["search_mode"] = search_mode
+
+        response_dict = self.client._make_request(
+            "POST",
+            "documents/search",
+            json=data,
+            version="v3",
+        )
+
+        return WrappedDocumentSearchResponse(**response_dict)
+
+    def deduplicate(
+        self,
+        id: str | UUID,
+        settings: Optional[dict] = None,
+        run_with_orchestration: Optional[bool] = True,
+    ) -> WrappedGenericMessageResponse:
+        """Deduplicate entities and relationships from a document.
+
+        Args:
+            id (str, UUID): ID of document to extract from
+            settings (Optional[dict]): Settings for extraction process
+            run_with_orchestration (Optional[bool]): Whether to run with orchestration
+
+        Returns:
+            dict: Extraction results or cost estimate
+        """
+        data: dict[str, Any] = {}
+        if settings:
+            data["settings"] = json.dumps(settings)
+        if run_with_orchestration is not None:
+            data["run_with_orchestration"] = str(run_with_orchestration)
+
+        response_dict = self.client._make_request(
+            "POST",
+            f"documents/{str(id)}/deduplicate",
+            params=data,
+            version="v3",
+        )
+
+        return WrappedGenericMessageResponse(**response_dict)
+
+    def create_sample(self, hi_res: bool = False) -> WrappedIngestionResponse:
+        """Ingest a sample document into R2R.
+
+        This method downloads a sample file from a predefined URL, saves it
+        as a temporary file, and ingests it using the `create` method. The
+        temporary file is removed after ingestion.
+
+        Returns:
+            WrappedIngestionResponse: The response from the ingestion request.
+        """
+        # Define the sample file URL
+        sample_file_url = "https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/DeepSeek_R1.pdf"
+        # Parse the URL to extract the filename
+        parsed_url = urlparse(sample_file_url)
+        filename = os.path.basename(parsed_url.path)
+        # Determine whether the file is a PDF (this can affect how we write the file)
+
+        # Create a temporary file.
+        # We use binary mode ("wb") for both PDFs and text files because the `create`
+        # method will open the file in binary mode.
+        temp_file = tempfile.NamedTemporaryFile(
+            mode="wb", delete=False, suffix=f"_{filename}"
+        )
+        try:
+            response = requests.get(sample_file_url)
+            response.raise_for_status()
+            # Write the downloaded content to the temporary file.
+            # (For text files, using response.content avoids any potential encoding issues
+            # when the file is later opened in binary mode.)
+            temp_file.write(response.content)
+            temp_file.close()
+
+            # Prepare metadata and generate a stable document ID based on the URL
+            metadata = {"title": filename}
+            doc_id = str(uuid.uuid5(uuid.NAMESPACE_DNS, sample_file_url))
+
+            # Call the SDK's create method to ingest the file.
+            ingestion_response = self.create(
+                file_path=temp_file.name,
+                metadata=metadata,
+                id=doc_id,
+                ingestion_mode="hi-res" if hi_res else None,
+            )
+            return ingestion_response
+        finally:
+            # Remove the temporary file regardless of whether ingestion succeeded.
+            try:
+                os.unlink(temp_file.name)
+            except Exception:
+                pass