import json from datetime import datetime from io import BytesIO from pathlib import Path from typing import Any, Optional from uuid import UUID import aiofiles from shared.api.models import ( WrappedBooleanResponse, WrappedChunksResponse, WrappedCollectionsResponse, WrappedDocumentResponse, WrappedDocumentSearchResponse, WrappedDocumentsResponse, WrappedEntitiesResponse, WrappedGenericMessageResponse, WrappedIngestionResponse, WrappedRelationshipsResponse, ) from ..models import IngestionMode, SearchMode, SearchSettings class DocumentsSDK: """SDK for interacting with documents in the v3 API.""" def __init__(self, client): self.client = client async def create( self, file_path: Optional[str] = None, raw_text: Optional[str] = None, chunks: Optional[list[str]] = None, id: Optional[str | UUID] = None, ingestion_mode: Optional[str] = None, collection_ids: Optional[list[str | UUID]] = None, metadata: Optional[dict] = None, ingestion_config: Optional[dict | IngestionMode] = None, run_with_orchestration: Optional[bool] = True, ) -> WrappedIngestionResponse: """Create a new document from either a file or content. Args: file_path (Optional[str]): The file to upload, if any content (Optional[str]): Optional text content to upload, if no file path is provided id (Optional[str | UUID]): Optional ID to assign to the document collection_ids (Optional[list[str | UUID]]): Collection IDs to associate with the document. If none are provided, the document will be assigned to the user's default collection. metadata (Optional[dict]): Optional metadata to assign to the document ingestion_config (Optional[dict]): Optional ingestion configuration to use run_with_orchestration (Optional[bool]): Whether to run with orchestration Returns: WrappedIngestionResponse """ if not file_path and not raw_text and not chunks: raise ValueError( "Either `file_path`, `raw_text` or `chunks` must be provided" ) if ( (file_path and raw_text) or (file_path and chunks) or (raw_text and chunks) ): raise ValueError( "Only one of `file_path`, `raw_text` or `chunks` may be provided" ) data: dict[str, Any] = {} files = None if id: data["id"] = str(id) if metadata: data["metadata"] = json.dumps(metadata) if ingestion_config: if isinstance(ingestion_config, IngestionMode): ingestion_config = {"mode": ingestion_config.value} app_config: dict[str, Any] = ( {} if isinstance(ingestion_config, dict) else ingestion_config["app"] ) ingestion_config = dict(ingestion_config) ingestion_config["app"] = app_config data["ingestion_config"] = json.dumps(ingestion_config) if collection_ids: collection_ids = [ str(collection_id) for collection_id in collection_ids ] # type: ignore data["collection_ids"] = json.dumps(collection_ids) if run_with_orchestration is not None: data["run_with_orchestration"] = str(run_with_orchestration) if ingestion_mode is not None: data["ingestion_mode"] = ingestion_mode if file_path: # Create a new file instance that will remain open during the request file_instance = open(file_path, "rb") files = [ ( "file", (file_path, file_instance, "application/octet-stream"), ) ] try: response_dict = await self.client._make_request( "POST", "documents", data=data, files=files, version="v3", ) finally: # Ensure we close the file after the request is complete file_instance.close() elif raw_text: data["raw_text"] = raw_text # type: ignore response_dict = await self.client._make_request( "POST", "documents", data=data, version="v3", ) else: data["chunks"] = json.dumps(chunks) response_dict = await self.client._make_request( "POST", "documents", data=data, version="v3", ) return WrappedIngestionResponse(**response_dict) async def append_metadata( self, id: str | UUID, metadata: list[dict], ) -> WrappedDocumentResponse: """Append metadata to a document. Args: id (str | UUID): ID of document to append metadata to metadata (list[dict]): Metadata to append Returns: WrappedDocumentResponse """ data = json.dumps(metadata) response_dict = await self.client._make_request( "PATCH", f"documents/{str(id)}/metadata", data=data, version="v3", ) return WrappedDocumentResponse(**response_dict) async def replace_metadata( self, id: str | UUID, metadata: list[dict], ) -> WrappedDocumentResponse: """Replace metadata for a document. Args: id (str | UUID): ID of document to replace metadata for metadata (list[dict]): The metadata that will replace the existing metadata """ data = json.dumps(metadata) response_dict = await self.client._make_request( "PUT", f"documents/{str(id)}/metadata", data=data, version="v3", ) return WrappedDocumentResponse(**response_dict) async def retrieve( self, id: str | UUID, ) -> WrappedDocumentResponse: """Get a specific document by ID. Args: id (str | UUID): ID of document to retrieve Returns: WrappedDocumentResponse """ response_dict = await self.client._make_request( "GET", f"documents/{str(id)}", version="v3", ) return WrappedDocumentResponse(**response_dict) async def download( self, id: str | UUID, ) -> BytesIO: response = await self.client._make_request( "GET", f"documents/{str(id)}/download", version="v3", ) if not isinstance(response, BytesIO): raise ValueError("Expected BytesIO response") return response async def download_zip( self, document_ids: Optional[list[str | UUID]] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, output_path: Optional[str | Path] = None, ) -> BytesIO | None: """Download multiple documents as a zip file.""" params: dict[str, Any] = {} if document_ids: params["document_ids"] = [str(doc_id) for doc_id in document_ids] if start_date: params["start_date"] = start_date.isoformat() if end_date: params["end_date"] = end_date.isoformat() response = await self.client._make_request( "GET", "documents/download_zip", params=params, version="v3", ) if not isinstance(response, BytesIO): raise ValueError("Expected BytesIO response") if output_path: output_path = ( Path(output_path) if isinstance(output_path, str) else output_path ) async with aiofiles.open(output_path, "wb") as f: await f.write(response.getvalue()) return None return response async def export( self, output_path: str | Path, columns: Optional[list[str]] = None, filters: Optional[dict] = None, include_header: bool = True, ) -> None: """Export documents to a CSV file, streaming the results directly to disk. Args: output_path (str | Path): Local path where the CSV file should be saved columns (Optional[list[str]]): Specific columns to export. If None, exports default columns filters (Optional[dict]): Optional filters to apply when selecting documents include_header (bool): Whether to include column headers in the CSV (default: True) Returns: None """ # Convert path to string if it's a Path object output_path = ( str(output_path) if isinstance(output_path, Path) else output_path ) data: dict[str, Any] = {"include_header": include_header} if columns: data["columns"] = columns if filters: data["filters"] = filters # Stream response directly to file async with aiofiles.open(output_path, "wb") as f: async with self.client.session.post( f"{self.client.base_url}/v3/documents/export", json=data, headers={ "Accept": "text/csv", **self.client._get_auth_header(), }, ) as response: if response.status != 200: raise ValueError( f"Export failed with status {response.status}", response, ) async for chunk in response.content.iter_chunks(): if chunk: await f.write(chunk[0]) async def export_entities( self, id: str | UUID, output_path: str | Path, columns: Optional[list[str]] = None, filters: Optional[dict] = None, include_header: bool = True, ) -> None: """Export documents to a CSV file, streaming the results directly to disk. Args: output_path (str | Path): Local path where the CSV file should be saved columns (Optional[list[str]]): Specific columns to export. If None, exports default columns filters (Optional[dict]): Optional filters to apply when selecting documents include_header (bool): Whether to include column headers in the CSV (default: True) Returns: None """ # Convert path to string if it's a Path object output_path = ( str(output_path) if isinstance(output_path, Path) else output_path ) # Prepare request data data: dict[str, Any] = {"include_header": include_header} if columns: data["columns"] = columns if filters: data["filters"] = filters # Stream response directly to file async with aiofiles.open(output_path, "wb") as f: async with self.client.session.post( f"{self.client.base_url}/v3/documents/{str(id)}/entities/export", json=data, headers={ "Accept": "text/csv", **self.client._get_auth_header(), }, ) as response: if response.status != 200: raise ValueError( f"Export failed with status {response.status}", response, ) async for chunk in response.content.iter_chunks(): if chunk: await f.write(chunk[0]) async def export_relationships( self, id: str | UUID, output_path: str | Path, columns: Optional[list[str]] = None, filters: Optional[dict] = None, include_header: bool = True, ) -> None: """Export document relationships to a CSV file, streaming the results directly to disk. Args: output_path (str | Path): Local path where the CSV file should be saved columns (Optional[list[str]]): Specific columns to export. If None, exports default columns filters (Optional[dict]): Optional filters to apply when selecting documents include_header (bool): Whether to include column headers in the CSV (default: True) Returns: None """ # Convert path to string if it's a Path object output_path = ( str(output_path) if isinstance(output_path, Path) else output_path ) # Prepare request data data: dict[str, Any] = {"include_header": include_header} if columns: data["columns"] = columns if filters: data["filters"] = filters # Stream response directly to file async with aiofiles.open(output_path, "wb") as f: async with self.client.session.post( f"{self.client.base_url}/v3/documents/{str(id)}/relationships/export", json=data, headers={ "Accept": "text/csv", **self.client._get_auth_header(), }, ) as response: if response.status != 200: raise ValueError( f"Export failed with status {response.status}", response, ) async for chunk in response.content.iter_chunks(): if chunk: await f.write(chunk[0]) async def delete( self, id: str | UUID, ) -> WrappedBooleanResponse: """Delete a specific document. Args: id (str | UUID): ID of document to delete Returns: WrappedBooleanResponse """ response_dict = await self.client._make_request( "DELETE", f"documents/{str(id)}", version="v3", ) return WrappedBooleanResponse(**response_dict) async def list_chunks( self, id: str | UUID, include_vectors: Optional[bool] = False, offset: Optional[int] = 0, limit: Optional[int] = 100, ) -> WrappedChunksResponse: """Get chunks for a specific document. Args: id (str | UUID): ID of document to retrieve chunks for include_vectors (Optional[bool]): Whether to include vector embeddings in the response offset (int, optional): Specifies the number of objects to skip. Defaults to 0. limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100. Returns: WrappedChunksResponse """ params = { "offset": offset, "limit": limit, "include_vectors": include_vectors, } response_dict = await self.client._make_request( "GET", f"documents/{str(id)}/chunks", params=params, version="v3", ) return WrappedChunksResponse(**response_dict) async def list_collections( self, id: str | UUID, include_vectors: Optional[bool] = False, offset: Optional[int] = 0, limit: Optional[int] = 100, ) -> WrappedCollectionsResponse: """List collections for a specific document. Args: id (str | UUID): ID of document to retrieve collections for offset (int, optional): Specifies the number of objects to skip. Defaults to 0. limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100. Returns: WrappedCollectionsResponse """ params = { "offset": offset, "limit": limit, } response_dict = await self.client._make_request( "GET", f"documents/{str(id)}/collections", params=params, version="v3", ) return WrappedCollectionsResponse(**response_dict) async def delete_by_filter( self, filters: dict, ) -> WrappedBooleanResponse: """Delete documents based on filters. Args: filters (dict): Filters to apply when selecting documents to delete Returns: WrappedBooleanResponse """ filters_json = json.dumps(filters) response_dict = await self.client._make_request( "DELETE", "documents/by-filter", data=filters_json, version="v3", ) return WrappedBooleanResponse(**response_dict) async def extract( self, id: str | UUID, settings: Optional[dict] = None, run_with_orchestration: Optional[bool] = True, ) -> WrappedGenericMessageResponse: """Extract entities and relationships from a document. Args: id (str, UUID): ID of document to extract from settings (Optional[dict]): Settings for extraction process run_with_orchestration (Optional[bool]): Whether to run with orchestration Returns: WrappedGenericMessageResponse """ data: dict[str, Any] = {} if settings: data["settings"] = json.dumps(settings) if run_with_orchestration is not None: data["run_with_orchestration"] = str(run_with_orchestration) response_dict = await self.client._make_request( "POST", f"documents/{str(id)}/extract", params=data, version="v3", ) return WrappedGenericMessageResponse(**response_dict) async def list_entities( self, id: str | UUID, offset: Optional[int] = 0, limit: Optional[int] = 100, include_embeddings: Optional[bool] = False, ) -> WrappedEntitiesResponse: """List entities extracted from a document. Args: id (str | UUID): ID of document to get entities from offset (Optional[int]): Number of items to skip limit (Optional[int]): Max number of items to return include_embeddings (Optional[bool]): Whether to include embeddings Returns: WrappedEntitiesResponse """ params = { "offset": offset, "limit": limit, "include_embeddings": include_embeddings, } response_dict = await self.client._make_request( "GET", f"documents/{str(id)}/entities", params=params, version="v3", ) return WrappedEntitiesResponse(**response_dict) async def list_relationships( self, id: str | UUID, offset: Optional[int] = 0, limit: Optional[int] = 100, entity_names: Optional[list[str]] = None, relationship_types: Optional[list[str]] = None, ) -> WrappedRelationshipsResponse: """List relationships extracted from a document. Args: id (str | UUID): ID of document to get relationships from offset (Optional[int]): Number of items to skip limit (Optional[int]): Max number of items to return entity_names (Optional[list[str]]): Filter by entity names relationship_types (Optional[list[str]]): Filter by relationship types Returns: WrappedRelationshipsResponse """ params: dict[str, Any] = { "offset": offset, "limit": limit, } if entity_names: params["entity_names"] = entity_names if relationship_types: params["relationship_types"] = relationship_types response_dict = await self.client._make_request( "GET", f"documents/{str(id)}/relationships", params=params, version="v3", ) return WrappedRelationshipsResponse(**response_dict) async def list( self, ids: Optional[list[str | UUID]] = None, offset: Optional[int] = 0, limit: Optional[int] = 100, ) -> WrappedDocumentsResponse: """List documents with pagination. Args: ids (Optional[list[str | UUID]]): Optional list of document IDs to filter by offset (int, optional): Specifies the number of objects to skip. Defaults to 0. limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100. Returns: WrappedDocumentsResponse """ params = { "offset": offset, "limit": limit, } if ids: params["ids"] = [str(doc_id) for doc_id in ids] # type: ignore response_dict = await self.client._make_request( "GET", "documents", params=params, version="v3", ) return WrappedDocumentsResponse(**response_dict) async def search( self, query: str, search_mode: Optional[str | SearchMode] = "custom", search_settings: Optional[dict | SearchSettings] = None, ) -> WrappedDocumentSearchResponse: """Conduct a vector and/or graph search. Args: query (str): The query to search for. search_settings (Optional[dict, SearchSettings]]): Vector search settings. Returns: WrappedDocumentSearchResponse """ if search_settings and not isinstance(search_settings, dict): search_settings = search_settings.model_dump() data: dict[str, Any] = { "query": query, "search_settings": search_settings, } if search_mode: data["search_mode"] = search_mode response_dict = await self.client._make_request( "POST", "documents/search", json=data, version="v3", ) return WrappedDocumentSearchResponse(**response_dict) async def deduplicate( self, id: str | UUID, settings: Optional[dict] = None, run_with_orchestration: Optional[bool] = True, ) -> WrappedGenericMessageResponse: """Deduplicate entities and relationships from a document. Args: id (str, UUID): ID of document to extract from settings (Optional[dict]): Settings for extraction process run_with_orchestration (Optional[bool]): Whether to run with orchestration Returns: WrappedGenericMessageResponse """ data: dict[str, Any] = {} if settings: data["settings"] = json.dumps(settings) if run_with_orchestration is not None: data["run_with_orchestration"] = str(run_with_orchestration) response_dict = await self.client._make_request( "POST", f"documents/{str(id)}/deduplicate", params=data, version="v3", ) return WrappedGenericMessageResponse(**response_dict)