diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sdk/asnyc_methods/documents.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/sdk/asnyc_methods/documents.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/sdk/asnyc_methods/documents.py | 706 |
1 files changed, 706 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sdk/asnyc_methods/documents.py b/.venv/lib/python3.12/site-packages/sdk/asnyc_methods/documents.py new file mode 100644 index 00000000..b8a254ee --- /dev/null +++ b/.venv/lib/python3.12/site-packages/sdk/asnyc_methods/documents.py @@ -0,0 +1,706 @@ +import json +from datetime import datetime +from io import BytesIO +from pathlib import Path +from typing import Any, Optional +from uuid import UUID + +import aiofiles + +from shared.api.models import ( + WrappedBooleanResponse, + WrappedChunksResponse, + WrappedCollectionsResponse, + WrappedDocumentResponse, + WrappedDocumentSearchResponse, + WrappedDocumentsResponse, + WrappedEntitiesResponse, + WrappedGenericMessageResponse, + WrappedIngestionResponse, + WrappedRelationshipsResponse, +) + +from ..models import IngestionMode, SearchMode, SearchSettings + + +class DocumentsSDK: + """SDK for interacting with documents in the v3 API.""" + + def __init__(self, client): + self.client = client + + async def create( + self, + file_path: Optional[str] = None, + raw_text: Optional[str] = None, + chunks: Optional[list[str]] = None, + id: Optional[str | UUID] = None, + ingestion_mode: Optional[str] = None, + collection_ids: Optional[list[str | UUID]] = None, + metadata: Optional[dict] = None, + ingestion_config: Optional[dict | IngestionMode] = None, + run_with_orchestration: Optional[bool] = True, + ) -> WrappedIngestionResponse: + """Create a new document from either a file or content. + + Args: + file_path (Optional[str]): The file to upload, if any + content (Optional[str]): Optional text content to upload, if no file path is provided + id (Optional[str | UUID]): Optional ID to assign to the document + collection_ids (Optional[list[str | UUID]]): Collection IDs to associate with the document. If none are provided, the document will be assigned to the user's default collection. + metadata (Optional[dict]): Optional metadata to assign to the document + ingestion_config (Optional[dict]): Optional ingestion configuration to use + run_with_orchestration (Optional[bool]): Whether to run with orchestration + + Returns: + WrappedIngestionResponse + """ + if not file_path and not raw_text and not chunks: + raise ValueError( + "Either `file_path`, `raw_text` or `chunks` must be provided" + ) + if ( + (file_path and raw_text) + or (file_path and chunks) + or (raw_text and chunks) + ): + raise ValueError( + "Only one of `file_path`, `raw_text` or `chunks` may be provided" + ) + + data: dict[str, Any] = {} + files = None + + if id: + data["id"] = str(id) + if metadata: + data["metadata"] = json.dumps(metadata) + if ingestion_config: + if isinstance(ingestion_config, IngestionMode): + ingestion_config = {"mode": ingestion_config.value} + app_config: dict[str, Any] = ( + {} + if isinstance(ingestion_config, dict) + else ingestion_config["app"] + ) + ingestion_config = dict(ingestion_config) + ingestion_config["app"] = app_config + data["ingestion_config"] = json.dumps(ingestion_config) + if collection_ids: + collection_ids = [ + str(collection_id) for collection_id in collection_ids + ] # type: ignore + data["collection_ids"] = json.dumps(collection_ids) + if run_with_orchestration is not None: + data["run_with_orchestration"] = str(run_with_orchestration) + if ingestion_mode is not None: + data["ingestion_mode"] = ingestion_mode + if file_path: + # Create a new file instance that will remain open during the request + file_instance = open(file_path, "rb") + files = [ + ( + "file", + (file_path, file_instance, "application/octet-stream"), + ) + ] + try: + response_dict = await self.client._make_request( + "POST", + "documents", + data=data, + files=files, + version="v3", + ) + finally: + # Ensure we close the file after the request is complete + file_instance.close() + elif raw_text: + data["raw_text"] = raw_text # type: ignore + response_dict = await self.client._make_request( + "POST", + "documents", + data=data, + version="v3", + ) + else: + data["chunks"] = json.dumps(chunks) + response_dict = await self.client._make_request( + "POST", + "documents", + data=data, + version="v3", + ) + + return WrappedIngestionResponse(**response_dict) + + async def append_metadata( + self, + id: str | UUID, + metadata: list[dict], + ) -> WrappedDocumentResponse: + """Append metadata to a document. + + Args: + id (str | UUID): ID of document to append metadata to + metadata (list[dict]): Metadata to append + + Returns: + WrappedDocumentResponse + """ + data = json.dumps(metadata) + response_dict = await self.client._make_request( + "PATCH", + f"documents/{str(id)}/metadata", + data=data, + version="v3", + ) + + return WrappedDocumentResponse(**response_dict) + + async def replace_metadata( + self, + id: str | UUID, + metadata: list[dict], + ) -> WrappedDocumentResponse: + """Replace metadata for a document. + + Args: + id (str | UUID): ID of document to replace metadata for + metadata (list[dict]): The metadata that will replace the existing metadata + """ + data = json.dumps(metadata) + response_dict = await self.client._make_request( + "PUT", + f"documents/{str(id)}/metadata", + data=data, + version="v3", + ) + + return WrappedDocumentResponse(**response_dict) + + async def retrieve( + self, + id: str | UUID, + ) -> WrappedDocumentResponse: + """Get a specific document by ID. + + Args: + id (str | UUID): ID of document to retrieve + + Returns: + WrappedDocumentResponse + """ + response_dict = await self.client._make_request( + "GET", + f"documents/{str(id)}", + version="v3", + ) + + return WrappedDocumentResponse(**response_dict) + + async def download( + self, + id: str | UUID, + ) -> BytesIO: + response = await self.client._make_request( + "GET", + f"documents/{str(id)}/download", + version="v3", + ) + if not isinstance(response, BytesIO): + raise ValueError("Expected BytesIO response") + return response + + async def download_zip( + self, + document_ids: Optional[list[str | UUID]] = None, + start_date: Optional[datetime] = None, + end_date: Optional[datetime] = None, + output_path: Optional[str | Path] = None, + ) -> BytesIO | None: + """Download multiple documents as a zip file.""" + params: dict[str, Any] = {} + if document_ids: + params["document_ids"] = [str(doc_id) for doc_id in document_ids] + if start_date: + params["start_date"] = start_date.isoformat() + if end_date: + params["end_date"] = end_date.isoformat() + + response = await self.client._make_request( + "GET", + "documents/download_zip", + params=params, + version="v3", + ) + + if not isinstance(response, BytesIO): + raise ValueError("Expected BytesIO response") + + if output_path: + output_path = ( + Path(output_path) + if isinstance(output_path, str) + else output_path + ) + async with aiofiles.open(output_path, "wb") as f: + await f.write(response.getvalue()) + return None + + return response + + async def export( + self, + output_path: str | Path, + columns: Optional[list[str]] = None, + filters: Optional[dict] = None, + include_header: bool = True, + ) -> None: + """Export documents to a CSV file, streaming the results directly to + disk. + + Args: + output_path (str | Path): Local path where the CSV file should be saved + columns (Optional[list[str]]): Specific columns to export. If None, exports default columns + filters (Optional[dict]): Optional filters to apply when selecting documents + include_header (bool): Whether to include column headers in the CSV (default: True) + + Returns: + None + """ + # Convert path to string if it's a Path object + output_path = ( + str(output_path) if isinstance(output_path, Path) else output_path + ) + + data: dict[str, Any] = {"include_header": include_header} + if columns: + data["columns"] = columns + if filters: + data["filters"] = filters + + # Stream response directly to file + async with aiofiles.open(output_path, "wb") as f: + async with self.client.session.post( + f"{self.client.base_url}/v3/documents/export", + json=data, + headers={ + "Accept": "text/csv", + **self.client._get_auth_header(), + }, + ) as response: + if response.status != 200: + raise ValueError( + f"Export failed with status {response.status}", + response, + ) + + async for chunk in response.content.iter_chunks(): + if chunk: + await f.write(chunk[0]) + + async def export_entities( + self, + id: str | UUID, + output_path: str | Path, + columns: Optional[list[str]] = None, + filters: Optional[dict] = None, + include_header: bool = True, + ) -> None: + """Export documents to a CSV file, streaming the results directly to + disk. + + Args: + output_path (str | Path): Local path where the CSV file should be saved + columns (Optional[list[str]]): Specific columns to export. If None, exports default columns + filters (Optional[dict]): Optional filters to apply when selecting documents + include_header (bool): Whether to include column headers in the CSV (default: True) + + Returns: + None + """ + # Convert path to string if it's a Path object + output_path = ( + str(output_path) if isinstance(output_path, Path) else output_path + ) + + # Prepare request data + data: dict[str, Any] = {"include_header": include_header} + if columns: + data["columns"] = columns + if filters: + data["filters"] = filters + + # Stream response directly to file + async with aiofiles.open(output_path, "wb") as f: + async with self.client.session.post( + f"{self.client.base_url}/v3/documents/{str(id)}/entities/export", + json=data, + headers={ + "Accept": "text/csv", + **self.client._get_auth_header(), + }, + ) as response: + if response.status != 200: + raise ValueError( + f"Export failed with status {response.status}", + response, + ) + + async for chunk in response.content.iter_chunks(): + if chunk: + await f.write(chunk[0]) + + async def export_relationships( + self, + id: str | UUID, + output_path: str | Path, + columns: Optional[list[str]] = None, + filters: Optional[dict] = None, + include_header: bool = True, + ) -> None: + """Export document relationships to a CSV file, streaming the results + directly to disk. + + Args: + output_path (str | Path): Local path where the CSV file should be saved + columns (Optional[list[str]]): Specific columns to export. If None, exports default columns + filters (Optional[dict]): Optional filters to apply when selecting documents + include_header (bool): Whether to include column headers in the CSV (default: True) + + Returns: + None + """ + # Convert path to string if it's a Path object + output_path = ( + str(output_path) if isinstance(output_path, Path) else output_path + ) + + # Prepare request data + data: dict[str, Any] = {"include_header": include_header} + if columns: + data["columns"] = columns + if filters: + data["filters"] = filters + + # Stream response directly to file + async with aiofiles.open(output_path, "wb") as f: + async with self.client.session.post( + f"{self.client.base_url}/v3/documents/{str(id)}/relationships/export", + json=data, + headers={ + "Accept": "text/csv", + **self.client._get_auth_header(), + }, + ) as response: + if response.status != 200: + raise ValueError( + f"Export failed with status {response.status}", + response, + ) + + async for chunk in response.content.iter_chunks(): + if chunk: + await f.write(chunk[0]) + + async def delete( + self, + id: str | UUID, + ) -> WrappedBooleanResponse: + """Delete a specific document. + + Args: + id (str | UUID): ID of document to delete + + Returns: + WrappedBooleanResponse + """ + response_dict = await self.client._make_request( + "DELETE", + f"documents/{str(id)}", + version="v3", + ) + + return WrappedBooleanResponse(**response_dict) + + async def list_chunks( + self, + id: str | UUID, + include_vectors: Optional[bool] = False, + offset: Optional[int] = 0, + limit: Optional[int] = 100, + ) -> WrappedChunksResponse: + """Get chunks for a specific document. + + Args: + id (str | UUID): ID of document to retrieve chunks for + include_vectors (Optional[bool]): Whether to include vector embeddings in the response + offset (int, optional): Specifies the number of objects to skip. Defaults to 0. + limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100. + + Returns: + WrappedChunksResponse + """ + params = { + "offset": offset, + "limit": limit, + "include_vectors": include_vectors, + } + response_dict = await self.client._make_request( + "GET", + f"documents/{str(id)}/chunks", + params=params, + version="v3", + ) + + return WrappedChunksResponse(**response_dict) + + async def list_collections( + self, + id: str | UUID, + include_vectors: Optional[bool] = False, + offset: Optional[int] = 0, + limit: Optional[int] = 100, + ) -> WrappedCollectionsResponse: + """List collections for a specific document. + + Args: + id (str | UUID): ID of document to retrieve collections for + offset (int, optional): Specifies the number of objects to skip. Defaults to 0. + limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100. + + Returns: + WrappedCollectionsResponse + """ + params = { + "offset": offset, + "limit": limit, + } + + response_dict = await self.client._make_request( + "GET", + f"documents/{str(id)}/collections", + params=params, + version="v3", + ) + + return WrappedCollectionsResponse(**response_dict) + + async def delete_by_filter( + self, + filters: dict, + ) -> WrappedBooleanResponse: + """Delete documents based on filters. + + Args: + filters (dict): Filters to apply when selecting documents to delete + + Returns: + WrappedBooleanResponse + """ + filters_json = json.dumps(filters) + response_dict = await self.client._make_request( + "DELETE", + "documents/by-filter", + data=filters_json, + version="v3", + ) + + return WrappedBooleanResponse(**response_dict) + + async def extract( + self, + id: str | UUID, + settings: Optional[dict] = None, + run_with_orchestration: Optional[bool] = True, + ) -> WrappedGenericMessageResponse: + """Extract entities and relationships from a document. + + Args: + id (str, UUID): ID of document to extract from + settings (Optional[dict]): Settings for extraction process + run_with_orchestration (Optional[bool]): Whether to run with orchestration + + Returns: + WrappedGenericMessageResponse + """ + data: dict[str, Any] = {} + if settings: + data["settings"] = json.dumps(settings) + if run_with_orchestration is not None: + data["run_with_orchestration"] = str(run_with_orchestration) + + response_dict = await self.client._make_request( + "POST", + f"documents/{str(id)}/extract", + params=data, + version="v3", + ) + return WrappedGenericMessageResponse(**response_dict) + + async def list_entities( + self, + id: str | UUID, + offset: Optional[int] = 0, + limit: Optional[int] = 100, + include_embeddings: Optional[bool] = False, + ) -> WrappedEntitiesResponse: + """List entities extracted from a document. + + Args: + id (str | UUID): ID of document to get entities from + offset (Optional[int]): Number of items to skip + limit (Optional[int]): Max number of items to return + include_embeddings (Optional[bool]): Whether to include embeddings + + Returns: + WrappedEntitiesResponse + """ + params = { + "offset": offset, + "limit": limit, + "include_embeddings": include_embeddings, + } + response_dict = await self.client._make_request( + "GET", + f"documents/{str(id)}/entities", + params=params, + version="v3", + ) + + return WrappedEntitiesResponse(**response_dict) + + async def list_relationships( + self, + id: str | UUID, + offset: Optional[int] = 0, + limit: Optional[int] = 100, + entity_names: Optional[list[str]] = None, + relationship_types: Optional[list[str]] = None, + ) -> WrappedRelationshipsResponse: + """List relationships extracted from a document. + + Args: + id (str | UUID): ID of document to get relationships from + offset (Optional[int]): Number of items to skip + limit (Optional[int]): Max number of items to return + entity_names (Optional[list[str]]): Filter by entity names + relationship_types (Optional[list[str]]): Filter by relationship types + + Returns: + WrappedRelationshipsResponse + """ + params: dict[str, Any] = { + "offset": offset, + "limit": limit, + } + if entity_names: + params["entity_names"] = entity_names + if relationship_types: + params["relationship_types"] = relationship_types + + response_dict = await self.client._make_request( + "GET", + f"documents/{str(id)}/relationships", + params=params, + version="v3", + ) + + return WrappedRelationshipsResponse(**response_dict) + + async def list( + self, + ids: Optional[list[str | UUID]] = None, + offset: Optional[int] = 0, + limit: Optional[int] = 100, + ) -> WrappedDocumentsResponse: + """List documents with pagination. + + Args: + ids (Optional[list[str | UUID]]): Optional list of document IDs to filter by + offset (int, optional): Specifies the number of objects to skip. Defaults to 0. + limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100. + + Returns: + WrappedDocumentsResponse + """ + params = { + "offset": offset, + "limit": limit, + } + if ids: + params["ids"] = [str(doc_id) for doc_id in ids] # type: ignore + + response_dict = await self.client._make_request( + "GET", + "documents", + params=params, + version="v3", + ) + + return WrappedDocumentsResponse(**response_dict) + + async def search( + self, + query: str, + search_mode: Optional[str | SearchMode] = "custom", + search_settings: Optional[dict | SearchSettings] = None, + ) -> WrappedDocumentSearchResponse: + """Conduct a vector and/or graph search. + + Args: + query (str): The query to search for. + search_settings (Optional[dict, SearchSettings]]): Vector search settings. + + Returns: + WrappedDocumentSearchResponse + """ + if search_settings and not isinstance(search_settings, dict): + search_settings = search_settings.model_dump() + data: dict[str, Any] = { + "query": query, + "search_settings": search_settings, + } + if search_mode: + data["search_mode"] = search_mode + + response_dict = await self.client._make_request( + "POST", + "documents/search", + json=data, + version="v3", + ) + + return WrappedDocumentSearchResponse(**response_dict) + + async def deduplicate( + self, + id: str | UUID, + settings: Optional[dict] = None, + run_with_orchestration: Optional[bool] = True, + ) -> WrappedGenericMessageResponse: + """Deduplicate entities and relationships from a document. + + Args: + id (str, UUID): ID of document to extract from + settings (Optional[dict]): Settings for extraction process + run_with_orchestration (Optional[bool]): Whether to run with orchestration + + Returns: + WrappedGenericMessageResponse + """ + data: dict[str, Any] = {} + if settings: + data["settings"] = json.dumps(settings) + if run_with_orchestration is not None: + data["run_with_orchestration"] = str(run_with_orchestration) + + response_dict = await self.client._make_request( + "POST", + f"documents/{str(id)}/deduplicate", + params=data, + version="v3", + ) + + return WrappedGenericMessageResponse(**response_dict) |