aboutsummaryrefslogtreecommitdiff
import json
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
from typing import Any, Optional
from uuid import UUID

from pydantic import Field

from ..abstractions.llm import GenerationConfig
from .base import R2RSerializable


class Entity(R2RSerializable):
    """An entity extracted from a document."""

    name: str
    description: Optional[str] = None
    category: Optional[str] = None
    metadata: Optional[dict[str, Any]] = None

    id: Optional[UUID] = None
    parent_id: Optional[UUID] = None  # graph_id | document_id
    description_embedding: Optional[list[float] | str] = None
    chunk_ids: Optional[list[UUID]] = []

    def __str__(self):
        return f"{self.name}:{self.category}"

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        if isinstance(self.metadata, str):
            try:
                self.metadata = json.loads(self.metadata)
            except json.JSONDecodeError:
                self.metadata = self.metadata


class Relationship(R2RSerializable):
    """A relationship between two entities.

    This is a generic relationship, and can be used to represent any type of
    relationship between any two entities.
    """

    id: Optional[UUID] = None
    subject: str
    predicate: str
    object: str
    description: Optional[str] = None
    subject_id: Optional[UUID] = None
    object_id: Optional[UUID] = None
    weight: float | None = 1.0
    chunk_ids: Optional[list[UUID]] = []
    parent_id: Optional[UUID] = None
    description_embedding: Optional[list[float] | str] = None
    metadata: Optional[dict[str, Any] | str] = None

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        if isinstance(self.metadata, str):
            try:
                self.metadata = json.loads(self.metadata)
            except json.JSONDecodeError:
                self.metadata = self.metadata


@dataclass
class Community(R2RSerializable):
    name: str = ""
    summary: str = ""
    level: Optional[int] = None
    findings: list[str] = []
    id: Optional[int | UUID] = None
    community_id: Optional[UUID] = None
    collection_id: Optional[UUID] = None
    rating: Optional[float] = None
    rating_explanation: Optional[str] = None
    description_embedding: Optional[list[float]] = None
    attributes: dict[str, Any] | None = None
    created_at: datetime = Field(
        default_factory=datetime.utcnow,
    )
    updated_at: datetime = Field(
        default_factory=datetime.utcnow,
    )

    def __init__(self, **kwargs):
        if isinstance(kwargs.get("attributes", None), str):
            kwargs["attributes"] = json.loads(kwargs["attributes"])

        if isinstance(kwargs.get("embedding", None), str):
            kwargs["embedding"] = json.loads(kwargs["embedding"])

        super().__init__(**kwargs)

    @classmethod
    def from_dict(cls, data: dict[str, Any] | str) -> "Community":
        parsed_data: dict[str, Any] = (
            json.loads(data) if isinstance(data, str) else data
        )
        if isinstance(parsed_data.get("embedding", None), str):
            parsed_data["embedding"] = json.loads(parsed_data["embedding"])
        return cls(**parsed_data)


class GraphExtraction(R2RSerializable):
    """A protocol for a knowledge graph extraction."""

    entities: list[Entity]
    relationships: list[Relationship]


class Graph(R2RSerializable):
    id: UUID | None = Field()
    name: str
    description: Optional[str] = None
    created_at: datetime = Field(
        default_factory=datetime.utcnow,
    )
    updated_at: datetime = Field(
        default_factory=datetime.utcnow,
    )
    status: str = "pending"

    class Config:
        populate_by_name = True
        from_attributes = True

    @classmethod
    def from_dict(cls, data: dict[str, Any] | str) -> "Graph":
        """Create a Graph instance from a dictionary."""
        # Convert string to dict if needed
        parsed_data: dict[str, Any] = (
            json.loads(data) if isinstance(data, str) else data
        )
        return cls(**parsed_data)

    def __init__(self, **kwargs):
        super().__init__(**kwargs)


class StoreType(str, Enum):
    GRAPHS = "graphs"
    DOCUMENTS = "documents"


class GraphCreationSettings(R2RSerializable):
    """Settings for knowledge graph creation."""

    graph_extraction_prompt: str = Field(
        default="graph_extraction",
        description="The prompt to use for knowledge graph extraction.",
    )

    graph_entity_description_prompt: str = Field(
        default="graph_entity_description",
        description="The prompt to use for entity description generation.",
    )

    entity_types: list[str] = Field(
        default=[],
        description="The types of entities to extract.",
    )

    relation_types: list[str] = Field(
        default=[],
        description="The types of relations to extract.",
    )

    chunk_merge_count: int = Field(
        default=2,
        description="""The number of extractions to merge into a single graph
        extraction.""",
    )

    max_knowledge_relationships: int = Field(
        default=100,
        description="""The maximum number of knowledge relationships to extract
        from each chunk.""",
    )

    max_description_input_length: int = Field(
        default=65536,
        description="""The maximum length of the description for a node in the
        graph.""",
    )

    generation_config: Optional[GenerationConfig] = Field(
        default=None,
        description="Configuration for text generation during graph enrichment.",
    )

    automatic_deduplication: bool = Field(
        default=False,
        description="Whether to automatically deduplicate entities.",
    )


class GraphEnrichmentSettings(R2RSerializable):
    """Settings for knowledge graph enrichment."""

    force_graph_search_results_enrichment: bool = Field(
        default=False,
        description="""Force run the enrichment step even if graph creation is
        still in progress for some documents.""",
    )

    graph_communities_prompt: str = Field(
        default="graph_communities",
        description="The prompt to use for knowledge graph enrichment.",
    )

    max_summary_input_length: int = Field(
        default=65536,
        description="The maximum length of the summary for a community.",
    )

    generation_config: Optional[GenerationConfig] = Field(
        default=None,
        description="Configuration for text generation during graph enrichment.",
    )

    leiden_params: dict = Field(
        default_factory=dict,
        description="Parameters for the Leiden algorithm.",
    )


class GraphCommunitySettings(R2RSerializable):
    """Settings for knowledge graph community enrichment."""

    force_graph_search_results_enrichment: bool = Field(
        default=False,
        description="""Force run the enrichment step even if graph creation is
        still in progress for some documents.""",
    )

    graph_communities: str = Field(
        default="graph_communities",
        description="The prompt to use for knowledge graph enrichment.",
    )

    max_summary_input_length: int = Field(
        default=65536,
        description="The maximum length of the summary for a community.",
    )

    generation_config: Optional[GenerationConfig] = Field(
        default=None,
        description="Configuration for text generation during graph enrichment.",
    )

    leiden_params: dict = Field(
        default_factory=dict,
        description="Parameters for the Leiden algorithm.",
    )