aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r')
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py355
1 files changed, 355 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py b/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py
new file mode 100644
index 00000000..7d452245
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py
@@ -0,0 +1,355 @@
+# type: ignore
+import logging
+import time
+from typing import Any, AsyncGenerator, Optional
+
+from core import parsers
+from core.base import (
+ AsyncParser,
+ ChunkingStrategy,
+ Document,
+ DocumentChunk,
+ DocumentType,
+ IngestionConfig,
+ IngestionProvider,
+ R2RDocumentProcessingError,
+ RecursiveCharacterTextSplitter,
+ TextSplitter,
+)
+from core.utils import generate_extraction_id
+
+from ...database import PostgresDatabaseProvider
+from ...llm import (
+ LiteLLMCompletionProvider,
+ OpenAICompletionProvider,
+ R2RCompletionProvider,
+)
+
+logger = logging.getLogger()
+
+
+class R2RIngestionConfig(IngestionConfig):
+ chunk_size: int = 1024
+ chunk_overlap: int = 512
+ chunking_strategy: ChunkingStrategy = ChunkingStrategy.RECURSIVE
+ extra_fields: dict[str, Any] = {}
+ separator: Optional[str] = None
+
+
+class R2RIngestionProvider(IngestionProvider):
+ DEFAULT_PARSERS = {
+ DocumentType.BMP: parsers.BMPParser,
+ DocumentType.CSV: parsers.CSVParser,
+ DocumentType.DOC: parsers.DOCParser,
+ DocumentType.DOCX: parsers.DOCXParser,
+ DocumentType.EML: parsers.EMLParser,
+ DocumentType.EPUB: parsers.EPUBParser,
+ DocumentType.HTML: parsers.HTMLParser,
+ DocumentType.HTM: parsers.HTMLParser,
+ DocumentType.ODT: parsers.ODTParser,
+ DocumentType.JSON: parsers.JSONParser,
+ DocumentType.MSG: parsers.MSGParser,
+ DocumentType.ORG: parsers.ORGParser,
+ DocumentType.MD: parsers.MDParser,
+ DocumentType.PDF: parsers.BasicPDFParser,
+ DocumentType.PPT: parsers.PPTParser,
+ DocumentType.PPTX: parsers.PPTXParser,
+ DocumentType.TXT: parsers.TextParser,
+ DocumentType.XLSX: parsers.XLSXParser,
+ DocumentType.GIF: parsers.ImageParser,
+ DocumentType.JPEG: parsers.ImageParser,
+ DocumentType.JPG: parsers.ImageParser,
+ DocumentType.TSV: parsers.TSVParser,
+ DocumentType.PNG: parsers.ImageParser,
+ DocumentType.HEIC: parsers.ImageParser,
+ DocumentType.SVG: parsers.ImageParser,
+ DocumentType.MP3: parsers.AudioParser,
+ DocumentType.P7S: parsers.P7SParser,
+ DocumentType.RST: parsers.RSTParser,
+ DocumentType.RTF: parsers.RTFParser,
+ DocumentType.TIFF: parsers.ImageParser,
+ DocumentType.XLS: parsers.XLSParser,
+ }
+
+ EXTRA_PARSERS = {
+ DocumentType.CSV: {"advanced": parsers.CSVParserAdvanced},
+ DocumentType.PDF: {
+ "unstructured": parsers.PDFParserUnstructured,
+ "zerox": parsers.VLMPDFParser,
+ },
+ DocumentType.XLSX: {"advanced": parsers.XLSXParserAdvanced},
+ }
+
+ IMAGE_TYPES = {
+ DocumentType.GIF,
+ DocumentType.HEIC,
+ DocumentType.JPG,
+ DocumentType.JPEG,
+ DocumentType.PNG,
+ DocumentType.SVG,
+ }
+
+ def __init__(
+ self,
+ config: R2RIngestionConfig,
+ database_provider: PostgresDatabaseProvider,
+ llm_provider: (
+ LiteLLMCompletionProvider
+ | OpenAICompletionProvider
+ | R2RCompletionProvider
+ ),
+ ):
+ super().__init__(config, database_provider, llm_provider)
+ self.config: R2RIngestionConfig = config
+ self.database_provider: PostgresDatabaseProvider = database_provider
+ self.llm_provider: (
+ LiteLLMCompletionProvider
+ | OpenAICompletionProvider
+ | R2RCompletionProvider
+ ) = llm_provider
+ self.parsers: dict[DocumentType, AsyncParser] = {}
+ self.text_splitter = self._build_text_splitter()
+ self._initialize_parsers()
+
+ logger.info(
+ f"R2RIngestionProvider initialized with config: {self.config}"
+ )
+
+ def _initialize_parsers(self):
+ for doc_type, parser in self.DEFAULT_PARSERS.items():
+ # will choose the first parser in the list
+ if doc_type not in self.config.excluded_parsers:
+ self.parsers[doc_type] = parser(
+ config=self.config,
+ database_provider=self.database_provider,
+ llm_provider=self.llm_provider,
+ )
+ for doc_type, doc_parser_name in self.config.extra_parsers.items():
+ self.parsers[f"{doc_parser_name}_{str(doc_type)}"] = (
+ R2RIngestionProvider.EXTRA_PARSERS[doc_type][doc_parser_name](
+ config=self.config,
+ database_provider=self.database_provider,
+ llm_provider=self.llm_provider,
+ )
+ )
+
+ def _build_text_splitter(
+ self, ingestion_config_override: Optional[dict] = None
+ ) -> TextSplitter:
+ logger.info(
+ f"Initializing text splitter with method: {self.config.chunking_strategy}"
+ )
+
+ if not ingestion_config_override:
+ ingestion_config_override = {}
+
+ chunking_strategy = (
+ ingestion_config_override.get("chunking_strategy")
+ or self.config.chunking_strategy
+ )
+
+ chunk_size = (
+ ingestion_config_override.get("chunk_size")
+ or self.config.chunk_size
+ )
+ chunk_overlap = (
+ ingestion_config_override.get("chunk_overlap")
+ or self.config.chunk_overlap
+ )
+
+ if chunking_strategy == ChunkingStrategy.RECURSIVE:
+ return RecursiveCharacterTextSplitter(
+ chunk_size=chunk_size,
+ chunk_overlap=chunk_overlap,
+ )
+ elif chunking_strategy == ChunkingStrategy.CHARACTER:
+ from core.base.utils.splitter.text import CharacterTextSplitter
+
+ separator = (
+ ingestion_config_override.get("separator")
+ or self.config.separator
+ or CharacterTextSplitter.DEFAULT_SEPARATOR
+ )
+
+ return CharacterTextSplitter(
+ chunk_size=chunk_size,
+ chunk_overlap=chunk_overlap,
+ separator=separator,
+ keep_separator=False,
+ strip_whitespace=True,
+ )
+ elif chunking_strategy == ChunkingStrategy.BASIC:
+ raise NotImplementedError(
+ "Basic chunking method not implemented. Please use Recursive."
+ )
+ elif chunking_strategy == ChunkingStrategy.BY_TITLE:
+ raise NotImplementedError("By title method not implemented")
+ else:
+ raise ValueError(f"Unsupported method type: {chunking_strategy}")
+
+ def validate_config(self) -> bool:
+ return self.config.chunk_size > 0 and self.config.chunk_overlap >= 0
+
+ def chunk(
+ self,
+ parsed_document: str | DocumentChunk,
+ ingestion_config_override: dict,
+ ) -> AsyncGenerator[Any, None]:
+ text_spliiter = self.text_splitter
+ if ingestion_config_override:
+ text_spliiter = self._build_text_splitter(
+ ingestion_config_override
+ )
+ if isinstance(parsed_document, DocumentChunk):
+ parsed_document = parsed_document.data
+
+ if isinstance(parsed_document, str):
+ chunks = text_spliiter.create_documents([parsed_document])
+ else:
+ # Assuming parsed_document is already a list of text chunks
+ chunks = parsed_document
+
+ for chunk in chunks:
+ yield (
+ chunk.page_content if hasattr(chunk, "page_content") else chunk
+ )
+
+ async def parse(
+ self,
+ file_content: bytes,
+ document: Document,
+ ingestion_config_override: dict,
+ ) -> AsyncGenerator[DocumentChunk, None]:
+ if document.document_type not in self.parsers:
+ raise R2RDocumentProcessingError(
+ document_id=document.id,
+ error_message=f"Parser for {document.document_type} not found in `R2RIngestionProvider`.",
+ )
+ else:
+ t0 = time.time()
+ contents = []
+ parser_overrides = ingestion_config_override.get(
+ "parser_overrides", {}
+ )
+ if document.document_type.value in parser_overrides:
+ logger.info(
+ f"Using parser_override for {document.document_type} with input value {parser_overrides[document.document_type.value]}"
+ )
+ # TODO - Cleanup this approach to be less hardcoded
+ if (
+ document.document_type != DocumentType.PDF
+ or parser_overrides[DocumentType.PDF.value] != "zerox"
+ ):
+ raise ValueError(
+ "Only Zerox PDF parser override is available."
+ )
+
+ # Collect content from VLMPDFParser
+ async for chunk in self.parsers[
+ f"zerox_{DocumentType.PDF.value}"
+ ].ingest(file_content, **ingestion_config_override):
+ if isinstance(chunk, dict) and chunk.get("content"):
+ contents.append(chunk)
+ elif (
+ chunk
+ ): # Handle string output for backward compatibility
+ contents.append({"content": chunk})
+
+ if (
+ contents
+ and document.document_type == DocumentType.PDF
+ and parser_overrides.get(DocumentType.PDF.value) == "zerox"
+ ):
+ text_splitter = self._build_text_splitter(
+ ingestion_config_override
+ )
+
+ iteration = 0
+
+ sorted_contents = [
+ item
+ for item in sorted(
+ contents, key=lambda x: x.get("page_number", 0)
+ )
+ if isinstance(item.get("content"), str)
+ ]
+
+ for content_item in sorted_contents:
+ page_num = content_item.get("page_number", 0)
+ page_content = content_item["content"]
+
+ page_chunks = text_splitter.create_documents(
+ [page_content]
+ )
+
+ # Create document chunks for each split piece
+ for chunk in page_chunks:
+ metadata = {
+ **document.metadata,
+ "chunk_order": iteration,
+ "page_number": page_num,
+ }
+
+ extraction = DocumentChunk(
+ id=generate_extraction_id(
+ document.id, iteration
+ ),
+ document_id=document.id,
+ owner_id=document.owner_id,
+ collection_ids=document.collection_ids,
+ data=chunk.page_content,
+ metadata=metadata,
+ )
+ iteration += 1
+ yield extraction
+
+ logger.debug(
+ f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, "
+ f"user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} "
+ f"into {iteration} extractions in t={time.time() - t0:.2f} seconds using page-by-page splitting."
+ )
+ return
+
+ else:
+ # Standard parsing for non-override cases
+ async for text in self.parsers[document.document_type].ingest(
+ file_content, **ingestion_config_override
+ ):
+ if text is not None:
+ contents.append({"content": text})
+
+ if not contents:
+ logging.warning(
+ "No valid text content was extracted during parsing"
+ )
+ return
+
+ iteration = 0
+ for content_item in contents:
+ chunk_text = content_item["content"]
+ chunks = self.chunk(chunk_text, ingestion_config_override)
+
+ for chunk in chunks:
+ metadata = {**document.metadata, "chunk_order": iteration}
+ if "page_number" in content_item:
+ metadata["page_number"] = content_item["page_number"]
+
+ extraction = DocumentChunk(
+ id=generate_extraction_id(document.id, iteration),
+ document_id=document.id,
+ owner_id=document.owner_id,
+ collection_ids=document.collection_ids,
+ data=chunk,
+ metadata=metadata,
+ )
+ iteration += 1
+ yield extraction
+
+ logger.debug(
+ f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, "
+ f"user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} "
+ f"into {iteration} extractions in t={time.time() - t0:.2f} seconds."
+ )
+
+ def get_parser_for_document_type(self, doc_type: DocumentType) -> Any:
+ return self.parsers.get(doc_type)