from builtins import list as _list
from typing import Any, Optional
from uuid import UUID
from shared.api.models import (
WrappedBooleanResponse,
WrappedCommunitiesResponse,
WrappedCommunityResponse,
WrappedEntitiesResponse,
WrappedEntityResponse,
WrappedGenericMessageResponse,
WrappedGraphResponse,
WrappedGraphsResponse,
WrappedRelationshipResponse,
WrappedRelationshipsResponse,
)
class GraphsSDK:
"""SDK for interacting with knowledge graphs in the v3 API."""
def __init__(self, client):
self.client = client
async def list(
self,
collection_ids: Optional[list[str | UUID]] = None,
offset: Optional[int] = 0,
limit: Optional[int] = 100,
) -> WrappedGraphsResponse:
"""List graphs with pagination and filtering options.
Args:
ids (Optional[list[str | UUID]]): Filter graphs by ids
offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
Returns:
WrappedGraphsResponse
"""
params: dict = {
"offset": offset,
"limit": limit,
}
if collection_ids:
params["collection_ids"] = collection_ids
response_dict = await self.client._make_request(
"GET", "graphs", params=params, version="v3"
)
return WrappedGraphsResponse(**response_dict)
async def retrieve(
self,
collection_id: str | UUID,
) -> WrappedGraphResponse:
"""Get detailed information about a specific graph.
Args:
collection_id (str | UUID): Graph ID to retrieve
Returns:
WrappedGraphResponse
"""
response_dict = await self.client._make_request(
"GET", f"graphs/{str(collection_id)}", version="v3"
)
return WrappedGraphResponse(**response_dict)
async def reset(
self,
collection_id: str | UUID,
) -> WrappedBooleanResponse:
"""Deletes a graph and all its associated data.
This endpoint permanently removes the specified graph along with all
entities and relationships that belong to only this graph.
Entities and relationships extracted from documents are not deleted.
Args:
collection_id (str | UUID): Graph ID to reset
Returns:
WrappedBooleanResponse
"""
response_dict = await self.client._make_request(
"POST", f"graphs/{str(collection_id)}/reset", version="v3"
)
return WrappedBooleanResponse(**response_dict)
async def update(
self,
collection_id: str | UUID,
name: Optional[str] = None,
description: Optional[str] = None,
) -> WrappedGraphResponse:
"""Update graph information.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
name (Optional[str]): Optional new name for the graph
description (Optional[str]): Optional new description for the graph
Returns:
WrappedGraphResponse
"""
data: dict[str, Any] = {}
if name is not None:
data["name"] = name
if description is not None:
data["description"] = description
response_dict = await self.client._make_request(
"POST",
f"graphs/{str(collection_id)}",
json=data,
version="v3",
)
return WrappedGraphResponse(**response_dict)
async def list_entities(
self,
collection_id: str | UUID,
offset: Optional[int] = 0,
limit: Optional[int] = 100,
) -> WrappedEntitiesResponse:
"""List entities in a graph.
Args:
collection_id (str | UUID): Graph ID to list entities from
offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
Returns:
WrappedEntitiesResponse
"""
params: dict = {
"offset": offset,
"limit": limit,
}
response_dict = await self.client._make_request(
"GET",
f"graphs/{str(collection_id)}/entities",
params=params,
version="v3",
)
return WrappedEntitiesResponse(**response_dict)
async def get_entity(
self,
collection_id: str | UUID,
entity_id: str | UUID,
) -> WrappedEntityResponse:
"""Get entity information in a graph.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
entity_id (str | UUID): Entity ID to get from the graph
Returns:
WrappedEntityResponse
"""
response_dict = await self.client._make_request(
"GET",
f"graphs/{str(collection_id)}/entities/{str(entity_id)}",
version="v3",
)
return WrappedEntityResponse(**response_dict)
async def remove_entity(
self,
collection_id: str | UUID,
entity_id: str | UUID,
) -> WrappedBooleanResponse:
"""Remove an entity from a graph.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
entity_id (str | UUID): Entity ID to remove from the graph
Returns:
WrappedBooleanResponse
"""
return await self.client._make_request(
"DELETE",
f"graphs/{str(collection_id)}/entities/{str(entity_id)}",
version="v3",
)
async def list_relationships(
self,
collection_id: str | UUID,
offset: Optional[int] = 0,
limit: Optional[int] = 100,
) -> WrappedRelationshipsResponse:
"""List relationships in a graph.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
Returns:
WrappedRelationshipsResponse
"""
params: dict = {
"offset": offset,
"limit": limit,
}
response_dict = await self.client._make_request(
"GET",
f"graphs/{str(collection_id)}/relationships",
params=params,
version="v3",
)
return WrappedRelationshipsResponse(**response_dict)
async def get_relationship(
self,
collection_id: str | UUID,
relationship_id: str | UUID,
) -> WrappedRelationshipResponse:
"""Get relationship information in a graph.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
relationship_id (str | UUID): Relationship ID to get from the graph
Returns:
WrappedRelationshipResponse
"""
response_dict = await self.client._make_request(
"GET",
f"graphs/{str(collection_id)}/relationships/{str(relationship_id)}",
version="v3",
)
return WrappedRelationshipResponse(**response_dict)
async def remove_relationship(
self,
collection_id: str | UUID,
relationship_id: str | UUID,
) -> WrappedBooleanResponse:
"""Remove a relationship from a graph.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
relationship_id (str | UUID): Relationship ID to remove from the graph
Returns:
WrappedBooleanResponse
"""
response_dict = await self.client._make_request(
"DELETE",
f"graphs/{str(collection_id)}/relationships/{str(relationship_id)}",
version="v3",
)
return WrappedBooleanResponse(**response_dict)
async def build(
self,
collection_id: str | UUID,
settings: Optional[dict] = None,
run_with_orchestration: bool = True,
) -> WrappedGenericMessageResponse:
"""Build a graph.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
settings (dict): Settings for the build
run_with_orchestration (bool, optional): Whether to run with orchestration. Defaults to True.
Returns:
WrappedGenericMessageResponse
"""
data: dict[str, Any] = {
"run_with_orchestration": run_with_orchestration,
}
if settings:
data["settings"] = settings
response_dict = await self.client._make_request(
"POST",
f"graphs/{str(collection_id)}/communities/build",
json=data,
version="v3",
)
return WrappedGenericMessageResponse(**response_dict)
async def list_communities(
self,
collection_id: str | UUID,
offset: Optional[int] = 0,
limit: Optional[int] = 100,
) -> WrappedCommunitiesResponse:
"""List communities in a graph.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
Returns:
WrappedCommunitiesResponse
"""
params: dict = {
"offset": offset,
"limit": limit,
}
response_dict = await self.client._make_request(
"GET",
f"graphs/{str(collection_id)}/communities",
params=params,
version="v3",
)
return WrappedCommunitiesResponse(**response_dict)
async def get_community(
self,
collection_id: str | UUID,
community_id: str | UUID,
) -> WrappedCommunityResponse:
"""Get community information in a graph.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
community_id (str | UUID): Community ID to get from the graph
Returns:
WrappedCommunityResponse
"""
response_dict = await self.client._make_request(
"GET",
f"graphs/{str(collection_id)}/communities/{str(community_id)}",
version="v3",
)
return WrappedCommunityResponse(**response_dict)
async def update_community(
self,
collection_id: str | UUID,
community_id: str | UUID,
name: Optional[str] = None,
summary: Optional[str] = None,
findings: Optional[_list[str]] = None,
rating: Optional[int] = None,
rating_explanation: Optional[str] = None,
level: Optional[int] = None,
attributes: Optional[dict] = None,
) -> WrappedCommunityResponse:
"""Update community information.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
community_id (str | UUID): Community ID to update
name (Optional[str]): Optional new name for the community
summary (Optional[str]): Optional new summary for the community
findings (Optional[list[str]]): Optional new findings for the community
rating (Optional[int]): Optional new rating for the community
rating_explanation (Optional[str]): Optional new rating explanation for the community
level (Optional[int]): Optional new level for the community
attributes (Optional[dict]): Optional new attributes for the community
Returns:
WrappedCommunityResponse
"""
data: dict[str, Any] = {}
if name is not None:
data["name"] = name
if summary is not None:
data["summary"] = summary
if findings is not None:
data["findings"] = findings
if rating is not None:
data["rating"] = str(rating)
if rating_explanation is not None:
data["rating_explanation"] = rating_explanation
if level is not None:
data["level"] = level
if attributes is not None:
data["attributes"] = attributes
response_dict = await self.client._make_request(
"POST",
f"graphs/{str(collection_id)}/communities/{str(community_id)}",
json=data,
version="v3",
)
return WrappedCommunityResponse(**response_dict)
async def delete_community(
self,
collection_id: str | UUID,
community_id: str | UUID,
) -> WrappedBooleanResponse:
"""Remove a community from a graph.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
community_id (str | UUID): Community ID to remove from the graph
Returns:
WrappedBooleanResponse
"""
response_dict = await self.client._make_request(
"DELETE",
f"graphs/{str(collection_id)}/communities/{str(community_id)}",
version="v3",
)
return WrappedBooleanResponse(**response_dict)
async def pull(
self,
collection_id: str | UUID,
) -> WrappedBooleanResponse:
"""Adds documents to a graph by copying their entities and
relationships.
This endpoint:
1. Copies document entities to the graphs_entities table
2. Copies document relationships to the graphs_relationships table
3. Associates the documents with the graph
When a document is added:
- Its entities and relationships are copied to graph-specific tables
- Existing entities/relationships are updated by merging their properties
- The document ID is recorded in the graph's document_ids array
Documents added to a graph will contribute their knowledge to:
- Graph analysis and querying
- Community detection
- Knowledge graph enrichment
Returns:
WrappedBooleanResponse
"""
response_dict = await self.client._make_request(
"POST",
f"graphs/{str(collection_id)}/pull",
version="v3",
)
return WrappedBooleanResponse(**response_dict)
async def remove_document(
self,
collection_id: str | UUID,
document_id: str | UUID,
) -> WrappedBooleanResponse:
"""Removes a document from a graph and removes any associated entities.
This endpoint:
1. Removes the document ID from the graph's document_ids array
2. Optionally deletes the document's copied entities and relationships
The user must have access to both the graph and the document being removed.
Returns:
WrappedBooleanResponse
"""
response_dict = await self.client._make_request(
"DELETE",
f"graphs/{str(collection_id)}/documents/{str(document_id)}",
version="v3",
)
return WrappedBooleanResponse(**response_dict)
async def create_entity(
self,
collection_id: str | UUID,
name: str,
description: str,
category: Optional[str] = None,
metadata: Optional[dict] = None,
) -> WrappedEntityResponse:
"""Creates a new entity in the graph.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
name (str): The name of the entity to create
description (Optional[str]): The description of the entity
category (Optional[str]): The category of the entity
metadata (Optional[dict]): Additional metadata for the entity
Returns:
WrappedEntityResponse
"""
data: dict[str, Any] = {
"name": name,
"description": description,
}
if category is not None:
data["category"] = category
if metadata is not None:
data["metadata"] = metadata
response_dict = await self.client._make_request(
"POST",
f"graphs/{str(collection_id)}/entities",
json=data,
version="v3",
)
return WrappedEntityResponse(**response_dict)
async def create_relationship(
self,
collection_id: str | UUID,
subject: str,
subject_id: str | UUID,
predicate: str,
object: str,
object_id: str | UUID,
description: str,
weight: Optional[float] = None,
metadata: Optional[dict] = None,
) -> WrappedRelationshipResponse:
"""Creates a new relationship in the graph.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
subject (str): The subject of the relationship
subject_id (str | UUID): The ID of the subject entity
predicate (str): The predicate/type of the relationship
object (str): The object of the relationship
object_id (str | UUID): The ID of the object entity
description (Optional[str]): Description of the relationship
weight (Optional[float]): Weight/strength of the relationship
metadata (Optional[dict]): Additional metadata for the relationship
Returns:
WrappedRelationshipResponse
"""
data: dict[str, Any] = {
"subject": subject,
"subject_id": str(subject_id),
"predicate": predicate,
"object": object,
"object_id": str(object_id),
"description": description,
}
if weight is not None:
data["weight"] = weight
if metadata is not None:
data["metadata"] = metadata
response_dict = await self.client._make_request(
"POST",
f"graphs/{str(collection_id)}/relationships",
json=data,
version="v3",
)
return WrappedRelationshipResponse(**response_dict)
async def create_community(
self,
collection_id: str | UUID,
name: str,
summary: str,
findings: Optional[_list[str]] = None,
rating: Optional[float] = None,
rating_explanation: Optional[str] = None,
) -> WrappedCommunityResponse:
"""Creates a new community in the graph.
Args:
collection_id (str | UUID): The collection ID corresponding to the graph
name (str): The name of the community
summary (str): A summary description of the community
findings (Optional[list[str]]): List of findings about the community
rating (Optional[float]): Rating between 1 and 10
rating_explanation (Optional[str]): Explanation for the rating
Returns:
WrappedCommunityResponse
"""
data: dict[str, Any] = {
"name": name,
"summary": summary,
}
if findings is not None:
data["findings"] = findings
if rating is not None:
data["rating"] = rating
if rating_explanation is not None:
data["rating_explanation"] = rating_explanation
response_dict = await self.client._make_request(
"POST",
f"graphs/{str(collection_id)}/communities",
json=data,
version="v3",
)
return WrappedCommunityResponse(**response_dict)