about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sdk/asnyc_methods/documents.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sdk/asnyc_methods/documents.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sdk/asnyc_methods/documents.py706
1 files changed, 706 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sdk/asnyc_methods/documents.py b/.venv/lib/python3.12/site-packages/sdk/asnyc_methods/documents.py
new file mode 100644
index 00000000..b8a254ee
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sdk/asnyc_methods/documents.py
@@ -0,0 +1,706 @@
+import json
+from datetime import datetime
+from io import BytesIO
+from pathlib import Path
+from typing import Any, Optional
+from uuid import UUID
+
+import aiofiles
+
+from shared.api.models import (
+    WrappedBooleanResponse,
+    WrappedChunksResponse,
+    WrappedCollectionsResponse,
+    WrappedDocumentResponse,
+    WrappedDocumentSearchResponse,
+    WrappedDocumentsResponse,
+    WrappedEntitiesResponse,
+    WrappedGenericMessageResponse,
+    WrappedIngestionResponse,
+    WrappedRelationshipsResponse,
+)
+
+from ..models import IngestionMode, SearchMode, SearchSettings
+
+
+class DocumentsSDK:
+    """SDK for interacting with documents in the v3 API."""
+
+    def __init__(self, client):
+        self.client = client
+
+    async def create(
+        self,
+        file_path: Optional[str] = None,
+        raw_text: Optional[str] = None,
+        chunks: Optional[list[str]] = None,
+        id: Optional[str | UUID] = None,
+        ingestion_mode: Optional[str] = None,
+        collection_ids: Optional[list[str | UUID]] = None,
+        metadata: Optional[dict] = None,
+        ingestion_config: Optional[dict | IngestionMode] = None,
+        run_with_orchestration: Optional[bool] = True,
+    ) -> WrappedIngestionResponse:
+        """Create a new document from either a file or content.
+
+        Args:
+            file_path (Optional[str]): The file to upload, if any
+            content (Optional[str]): Optional text content to upload, if no file path is provided
+            id (Optional[str | UUID]): Optional ID to assign to the document
+            collection_ids (Optional[list[str | UUID]]): Collection IDs to associate with the document. If none are provided, the document will be assigned to the user's default collection.
+            metadata (Optional[dict]): Optional metadata to assign to the document
+            ingestion_config (Optional[dict]): Optional ingestion configuration to use
+            run_with_orchestration (Optional[bool]): Whether to run with orchestration
+
+        Returns:
+            WrappedIngestionResponse
+        """
+        if not file_path and not raw_text and not chunks:
+            raise ValueError(
+                "Either `file_path`, `raw_text` or `chunks` must be provided"
+            )
+        if (
+            (file_path and raw_text)
+            or (file_path and chunks)
+            or (raw_text and chunks)
+        ):
+            raise ValueError(
+                "Only one of `file_path`, `raw_text` or `chunks` may be provided"
+            )
+
+        data: dict[str, Any] = {}
+        files = None
+
+        if id:
+            data["id"] = str(id)
+        if metadata:
+            data["metadata"] = json.dumps(metadata)
+        if ingestion_config:
+            if isinstance(ingestion_config, IngestionMode):
+                ingestion_config = {"mode": ingestion_config.value}
+            app_config: dict[str, Any] = (
+                {}
+                if isinstance(ingestion_config, dict)
+                else ingestion_config["app"]
+            )
+            ingestion_config = dict(ingestion_config)
+            ingestion_config["app"] = app_config
+            data["ingestion_config"] = json.dumps(ingestion_config)
+        if collection_ids:
+            collection_ids = [
+                str(collection_id) for collection_id in collection_ids
+            ]  # type: ignore
+            data["collection_ids"] = json.dumps(collection_ids)
+        if run_with_orchestration is not None:
+            data["run_with_orchestration"] = str(run_with_orchestration)
+        if ingestion_mode is not None:
+            data["ingestion_mode"] = ingestion_mode
+        if file_path:
+            # Create a new file instance that will remain open during the request
+            file_instance = open(file_path, "rb")
+            files = [
+                (
+                    "file",
+                    (file_path, file_instance, "application/octet-stream"),
+                )
+            ]
+            try:
+                response_dict = await self.client._make_request(
+                    "POST",
+                    "documents",
+                    data=data,
+                    files=files,
+                    version="v3",
+                )
+            finally:
+                # Ensure we close the file after the request is complete
+                file_instance.close()
+        elif raw_text:
+            data["raw_text"] = raw_text  # type: ignore
+            response_dict = await self.client._make_request(
+                "POST",
+                "documents",
+                data=data,
+                version="v3",
+            )
+        else:
+            data["chunks"] = json.dumps(chunks)
+            response_dict = await self.client._make_request(
+                "POST",
+                "documents",
+                data=data,
+                version="v3",
+            )
+
+        return WrappedIngestionResponse(**response_dict)
+
+    async def append_metadata(
+        self,
+        id: str | UUID,
+        metadata: list[dict],
+    ) -> WrappedDocumentResponse:
+        """Append metadata to a document.
+
+        Args:
+            id (str | UUID): ID of document to append metadata to
+            metadata (list[dict]): Metadata to append
+
+        Returns:
+            WrappedDocumentResponse
+        """
+        data = json.dumps(metadata)
+        response_dict = await self.client._make_request(
+            "PATCH",
+            f"documents/{str(id)}/metadata",
+            data=data,
+            version="v3",
+        )
+
+        return WrappedDocumentResponse(**response_dict)
+
+    async def replace_metadata(
+        self,
+        id: str | UUID,
+        metadata: list[dict],
+    ) -> WrappedDocumentResponse:
+        """Replace metadata for a document.
+
+        Args:
+            id (str | UUID): ID of document to replace metadata for
+            metadata (list[dict]): The metadata that will replace the existing metadata
+        """
+        data = json.dumps(metadata)
+        response_dict = await self.client._make_request(
+            "PUT",
+            f"documents/{str(id)}/metadata",
+            data=data,
+            version="v3",
+        )
+
+        return WrappedDocumentResponse(**response_dict)
+
+    async def retrieve(
+        self,
+        id: str | UUID,
+    ) -> WrappedDocumentResponse:
+        """Get a specific document by ID.
+
+        Args:
+            id (str | UUID): ID of document to retrieve
+
+        Returns:
+            WrappedDocumentResponse
+        """
+        response_dict = await self.client._make_request(
+            "GET",
+            f"documents/{str(id)}",
+            version="v3",
+        )
+
+        return WrappedDocumentResponse(**response_dict)
+
+    async def download(
+        self,
+        id: str | UUID,
+    ) -> BytesIO:
+        response = await self.client._make_request(
+            "GET",
+            f"documents/{str(id)}/download",
+            version="v3",
+        )
+        if not isinstance(response, BytesIO):
+            raise ValueError("Expected BytesIO response")
+        return response
+
+    async def download_zip(
+        self,
+        document_ids: Optional[list[str | UUID]] = None,
+        start_date: Optional[datetime] = None,
+        end_date: Optional[datetime] = None,
+        output_path: Optional[str | Path] = None,
+    ) -> BytesIO | None:
+        """Download multiple documents as a zip file."""
+        params: dict[str, Any] = {}
+        if document_ids:
+            params["document_ids"] = [str(doc_id) for doc_id in document_ids]
+        if start_date:
+            params["start_date"] = start_date.isoformat()
+        if end_date:
+            params["end_date"] = end_date.isoformat()
+
+        response = await self.client._make_request(
+            "GET",
+            "documents/download_zip",
+            params=params,
+            version="v3",
+        )
+
+        if not isinstance(response, BytesIO):
+            raise ValueError("Expected BytesIO response")
+
+        if output_path:
+            output_path = (
+                Path(output_path)
+                if isinstance(output_path, str)
+                else output_path
+            )
+            async with aiofiles.open(output_path, "wb") as f:
+                await f.write(response.getvalue())
+            return None
+
+        return response
+
+    async def export(
+        self,
+        output_path: str | Path,
+        columns: Optional[list[str]] = None,
+        filters: Optional[dict] = None,
+        include_header: bool = True,
+    ) -> None:
+        """Export documents to a CSV file, streaming the results directly to
+        disk.
+
+        Args:
+            output_path (str | Path): Local path where the CSV file should be saved
+            columns (Optional[list[str]]): Specific columns to export. If None, exports default columns
+            filters (Optional[dict]): Optional filters to apply when selecting documents
+            include_header (bool): Whether to include column headers in the CSV (default: True)
+
+        Returns:
+            None
+        """
+        # Convert path to string if it's a Path object
+        output_path = (
+            str(output_path) if isinstance(output_path, Path) else output_path
+        )
+
+        data: dict[str, Any] = {"include_header": include_header}
+        if columns:
+            data["columns"] = columns
+        if filters:
+            data["filters"] = filters
+
+        # Stream response directly to file
+        async with aiofiles.open(output_path, "wb") as f:
+            async with self.client.session.post(
+                f"{self.client.base_url}/v3/documents/export",
+                json=data,
+                headers={
+                    "Accept": "text/csv",
+                    **self.client._get_auth_header(),
+                },
+            ) as response:
+                if response.status != 200:
+                    raise ValueError(
+                        f"Export failed with status {response.status}",
+                        response,
+                    )
+
+                async for chunk in response.content.iter_chunks():
+                    if chunk:
+                        await f.write(chunk[0])
+
+    async def export_entities(
+        self,
+        id: str | UUID,
+        output_path: str | Path,
+        columns: Optional[list[str]] = None,
+        filters: Optional[dict] = None,
+        include_header: bool = True,
+    ) -> None:
+        """Export documents to a CSV file, streaming the results directly to
+        disk.
+
+        Args:
+            output_path (str | Path): Local path where the CSV file should be saved
+            columns (Optional[list[str]]): Specific columns to export. If None, exports default columns
+            filters (Optional[dict]): Optional filters to apply when selecting documents
+            include_header (bool): Whether to include column headers in the CSV (default: True)
+
+        Returns:
+            None
+        """
+        # Convert path to string if it's a Path object
+        output_path = (
+            str(output_path) if isinstance(output_path, Path) else output_path
+        )
+
+        # Prepare request data
+        data: dict[str, Any] = {"include_header": include_header}
+        if columns:
+            data["columns"] = columns
+        if filters:
+            data["filters"] = filters
+
+        # Stream response directly to file
+        async with aiofiles.open(output_path, "wb") as f:
+            async with self.client.session.post(
+                f"{self.client.base_url}/v3/documents/{str(id)}/entities/export",
+                json=data,
+                headers={
+                    "Accept": "text/csv",
+                    **self.client._get_auth_header(),
+                },
+            ) as response:
+                if response.status != 200:
+                    raise ValueError(
+                        f"Export failed with status {response.status}",
+                        response,
+                    )
+
+                async for chunk in response.content.iter_chunks():
+                    if chunk:
+                        await f.write(chunk[0])
+
+    async def export_relationships(
+        self,
+        id: str | UUID,
+        output_path: str | Path,
+        columns: Optional[list[str]] = None,
+        filters: Optional[dict] = None,
+        include_header: bool = True,
+    ) -> None:
+        """Export document relationships to a CSV file, streaming the results
+        directly to disk.
+
+        Args:
+            output_path (str | Path): Local path where the CSV file should be saved
+            columns (Optional[list[str]]): Specific columns to export. If None, exports default columns
+            filters (Optional[dict]): Optional filters to apply when selecting documents
+            include_header (bool): Whether to include column headers in the CSV (default: True)
+
+        Returns:
+            None
+        """
+        # Convert path to string if it's a Path object
+        output_path = (
+            str(output_path) if isinstance(output_path, Path) else output_path
+        )
+
+        # Prepare request data
+        data: dict[str, Any] = {"include_header": include_header}
+        if columns:
+            data["columns"] = columns
+        if filters:
+            data["filters"] = filters
+
+        # Stream response directly to file
+        async with aiofiles.open(output_path, "wb") as f:
+            async with self.client.session.post(
+                f"{self.client.base_url}/v3/documents/{str(id)}/relationships/export",
+                json=data,
+                headers={
+                    "Accept": "text/csv",
+                    **self.client._get_auth_header(),
+                },
+            ) as response:
+                if response.status != 200:
+                    raise ValueError(
+                        f"Export failed with status {response.status}",
+                        response,
+                    )
+
+                async for chunk in response.content.iter_chunks():
+                    if chunk:
+                        await f.write(chunk[0])
+
+    async def delete(
+        self,
+        id: str | UUID,
+    ) -> WrappedBooleanResponse:
+        """Delete a specific document.
+
+        Args:
+            id (str | UUID): ID of document to delete
+
+        Returns:
+            WrappedBooleanResponse
+        """
+        response_dict = await self.client._make_request(
+            "DELETE",
+            f"documents/{str(id)}",
+            version="v3",
+        )
+
+        return WrappedBooleanResponse(**response_dict)
+
+    async def list_chunks(
+        self,
+        id: str | UUID,
+        include_vectors: Optional[bool] = False,
+        offset: Optional[int] = 0,
+        limit: Optional[int] = 100,
+    ) -> WrappedChunksResponse:
+        """Get chunks for a specific document.
+
+        Args:
+            id (str | UUID): ID of document to retrieve chunks for
+            include_vectors (Optional[bool]): Whether to include vector embeddings in the response
+            offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
+            limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
+
+        Returns:
+            WrappedChunksResponse
+        """
+        params = {
+            "offset": offset,
+            "limit": limit,
+            "include_vectors": include_vectors,
+        }
+        response_dict = await self.client._make_request(
+            "GET",
+            f"documents/{str(id)}/chunks",
+            params=params,
+            version="v3",
+        )
+
+        return WrappedChunksResponse(**response_dict)
+
+    async def list_collections(
+        self,
+        id: str | UUID,
+        include_vectors: Optional[bool] = False,
+        offset: Optional[int] = 0,
+        limit: Optional[int] = 100,
+    ) -> WrappedCollectionsResponse:
+        """List collections for a specific document.
+
+        Args:
+            id (str | UUID): ID of document to retrieve collections for
+            offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
+            limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
+
+        Returns:
+            WrappedCollectionsResponse
+        """
+        params = {
+            "offset": offset,
+            "limit": limit,
+        }
+
+        response_dict = await self.client._make_request(
+            "GET",
+            f"documents/{str(id)}/collections",
+            params=params,
+            version="v3",
+        )
+
+        return WrappedCollectionsResponse(**response_dict)
+
+    async def delete_by_filter(
+        self,
+        filters: dict,
+    ) -> WrappedBooleanResponse:
+        """Delete documents based on filters.
+
+        Args:
+            filters (dict): Filters to apply when selecting documents to delete
+
+        Returns:
+            WrappedBooleanResponse
+        """
+        filters_json = json.dumps(filters)
+        response_dict = await self.client._make_request(
+            "DELETE",
+            "documents/by-filter",
+            data=filters_json,
+            version="v3",
+        )
+
+        return WrappedBooleanResponse(**response_dict)
+
+    async def extract(
+        self,
+        id: str | UUID,
+        settings: Optional[dict] = None,
+        run_with_orchestration: Optional[bool] = True,
+    ) -> WrappedGenericMessageResponse:
+        """Extract entities and relationships from a document.
+
+        Args:
+            id (str, UUID): ID of document to extract from
+            settings (Optional[dict]): Settings for extraction process
+            run_with_orchestration (Optional[bool]): Whether to run with orchestration
+
+        Returns:
+            WrappedGenericMessageResponse
+        """
+        data: dict[str, Any] = {}
+        if settings:
+            data["settings"] = json.dumps(settings)
+        if run_with_orchestration is not None:
+            data["run_with_orchestration"] = str(run_with_orchestration)
+
+        response_dict = await self.client._make_request(
+            "POST",
+            f"documents/{str(id)}/extract",
+            params=data,
+            version="v3",
+        )
+        return WrappedGenericMessageResponse(**response_dict)
+
+    async def list_entities(
+        self,
+        id: str | UUID,
+        offset: Optional[int] = 0,
+        limit: Optional[int] = 100,
+        include_embeddings: Optional[bool] = False,
+    ) -> WrappedEntitiesResponse:
+        """List entities extracted from a document.
+
+        Args:
+            id (str | UUID): ID of document to get entities from
+            offset (Optional[int]): Number of items to skip
+            limit (Optional[int]): Max number of items to return
+            include_embeddings (Optional[bool]): Whether to include embeddings
+
+        Returns:
+            WrappedEntitiesResponse
+        """
+        params = {
+            "offset": offset,
+            "limit": limit,
+            "include_embeddings": include_embeddings,
+        }
+        response_dict = await self.client._make_request(
+            "GET",
+            f"documents/{str(id)}/entities",
+            params=params,
+            version="v3",
+        )
+
+        return WrappedEntitiesResponse(**response_dict)
+
+    async def list_relationships(
+        self,
+        id: str | UUID,
+        offset: Optional[int] = 0,
+        limit: Optional[int] = 100,
+        entity_names: Optional[list[str]] = None,
+        relationship_types: Optional[list[str]] = None,
+    ) -> WrappedRelationshipsResponse:
+        """List relationships extracted from a document.
+
+        Args:
+            id (str | UUID): ID of document to get relationships from
+            offset (Optional[int]): Number of items to skip
+            limit (Optional[int]): Max number of items to return
+            entity_names (Optional[list[str]]): Filter by entity names
+            relationship_types (Optional[list[str]]): Filter by relationship types
+
+        Returns:
+            WrappedRelationshipsResponse
+        """
+        params: dict[str, Any] = {
+            "offset": offset,
+            "limit": limit,
+        }
+        if entity_names:
+            params["entity_names"] = entity_names
+        if relationship_types:
+            params["relationship_types"] = relationship_types
+
+        response_dict = await self.client._make_request(
+            "GET",
+            f"documents/{str(id)}/relationships",
+            params=params,
+            version="v3",
+        )
+
+        return WrappedRelationshipsResponse(**response_dict)
+
+    async def list(
+        self,
+        ids: Optional[list[str | UUID]] = None,
+        offset: Optional[int] = 0,
+        limit: Optional[int] = 100,
+    ) -> WrappedDocumentsResponse:
+        """List documents with pagination.
+
+        Args:
+            ids (Optional[list[str | UUID]]): Optional list of document IDs to filter by
+            offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
+            limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
+
+        Returns:
+            WrappedDocumentsResponse
+        """
+        params = {
+            "offset": offset,
+            "limit": limit,
+        }
+        if ids:
+            params["ids"] = [str(doc_id) for doc_id in ids]  # type: ignore
+
+        response_dict = await self.client._make_request(
+            "GET",
+            "documents",
+            params=params,
+            version="v3",
+        )
+
+        return WrappedDocumentsResponse(**response_dict)
+
+    async def search(
+        self,
+        query: str,
+        search_mode: Optional[str | SearchMode] = "custom",
+        search_settings: Optional[dict | SearchSettings] = None,
+    ) -> WrappedDocumentSearchResponse:
+        """Conduct a vector and/or graph search.
+
+        Args:
+            query (str): The query to search for.
+            search_settings (Optional[dict, SearchSettings]]): Vector search settings.
+
+        Returns:
+            WrappedDocumentSearchResponse
+        """
+        if search_settings and not isinstance(search_settings, dict):
+            search_settings = search_settings.model_dump()
+        data: dict[str, Any] = {
+            "query": query,
+            "search_settings": search_settings,
+        }
+        if search_mode:
+            data["search_mode"] = search_mode
+
+        response_dict = await self.client._make_request(
+            "POST",
+            "documents/search",
+            json=data,
+            version="v3",
+        )
+
+        return WrappedDocumentSearchResponse(**response_dict)
+
+    async def deduplicate(
+        self,
+        id: str | UUID,
+        settings: Optional[dict] = None,
+        run_with_orchestration: Optional[bool] = True,
+    ) -> WrappedGenericMessageResponse:
+        """Deduplicate entities and relationships from a document.
+
+        Args:
+            id (str, UUID): ID of document to extract from
+            settings (Optional[dict]): Settings for extraction process
+            run_with_orchestration (Optional[bool]): Whether to run with orchestration
+
+        Returns:
+            WrappedGenericMessageResponse
+        """
+        data: dict[str, Any] = {}
+        if settings:
+            data["settings"] = json.dumps(settings)
+        if run_with_orchestration is not None:
+            data["run_with_orchestration"] = str(run_with_orchestration)
+
+        response_dict = await self.client._make_request(
+            "POST",
+            f"documents/{str(id)}/deduplicate",
+            params=data,
+            version="v3",
+        )
+
+        return WrappedGenericMessageResponse(**response_dict)