diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/providers/ingestion | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/providers/ingestion')
3 files changed, 764 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/providers/ingestion/__init__.py b/.venv/lib/python3.12/site-packages/core/providers/ingestion/__init__.py new file mode 100644 index 00000000..4a25d30d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/providers/ingestion/__init__.py @@ -0,0 +1,13 @@ +# type: ignore +from .r2r.base import R2RIngestionConfig, R2RIngestionProvider +from .unstructured.base import ( + UnstructuredIngestionConfig, + UnstructuredIngestionProvider, +) + +__all__ = [ + "R2RIngestionConfig", + "R2RIngestionProvider", + "UnstructuredIngestionProvider", + "UnstructuredIngestionConfig", +] diff --git a/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py b/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py new file mode 100644 index 00000000..7d452245 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py @@ -0,0 +1,355 @@ +# type: ignore +import logging +import time +from typing import Any, AsyncGenerator, Optional + +from core import parsers +from core.base import ( + AsyncParser, + ChunkingStrategy, + Document, + DocumentChunk, + DocumentType, + IngestionConfig, + IngestionProvider, + R2RDocumentProcessingError, + RecursiveCharacterTextSplitter, + TextSplitter, +) +from core.utils import generate_extraction_id + +from ...database import PostgresDatabaseProvider +from ...llm import ( + LiteLLMCompletionProvider, + OpenAICompletionProvider, + R2RCompletionProvider, +) + +logger = logging.getLogger() + + +class R2RIngestionConfig(IngestionConfig): + chunk_size: int = 1024 + chunk_overlap: int = 512 + chunking_strategy: ChunkingStrategy = ChunkingStrategy.RECURSIVE + extra_fields: dict[str, Any] = {} + separator: Optional[str] = None + + +class R2RIngestionProvider(IngestionProvider): + DEFAULT_PARSERS = { + DocumentType.BMP: parsers.BMPParser, + DocumentType.CSV: parsers.CSVParser, + DocumentType.DOC: parsers.DOCParser, + DocumentType.DOCX: parsers.DOCXParser, + DocumentType.EML: parsers.EMLParser, + DocumentType.EPUB: parsers.EPUBParser, + DocumentType.HTML: parsers.HTMLParser, + DocumentType.HTM: parsers.HTMLParser, + DocumentType.ODT: parsers.ODTParser, + DocumentType.JSON: parsers.JSONParser, + DocumentType.MSG: parsers.MSGParser, + DocumentType.ORG: parsers.ORGParser, + DocumentType.MD: parsers.MDParser, + DocumentType.PDF: parsers.BasicPDFParser, + DocumentType.PPT: parsers.PPTParser, + DocumentType.PPTX: parsers.PPTXParser, + DocumentType.TXT: parsers.TextParser, + DocumentType.XLSX: parsers.XLSXParser, + DocumentType.GIF: parsers.ImageParser, + DocumentType.JPEG: parsers.ImageParser, + DocumentType.JPG: parsers.ImageParser, + DocumentType.TSV: parsers.TSVParser, + DocumentType.PNG: parsers.ImageParser, + DocumentType.HEIC: parsers.ImageParser, + DocumentType.SVG: parsers.ImageParser, + DocumentType.MP3: parsers.AudioParser, + DocumentType.P7S: parsers.P7SParser, + DocumentType.RST: parsers.RSTParser, + DocumentType.RTF: parsers.RTFParser, + DocumentType.TIFF: parsers.ImageParser, + DocumentType.XLS: parsers.XLSParser, + } + + EXTRA_PARSERS = { + DocumentType.CSV: {"advanced": parsers.CSVParserAdvanced}, + DocumentType.PDF: { + "unstructured": parsers.PDFParserUnstructured, + "zerox": parsers.VLMPDFParser, + }, + DocumentType.XLSX: {"advanced": parsers.XLSXParserAdvanced}, + } + + IMAGE_TYPES = { + DocumentType.GIF, + DocumentType.HEIC, + DocumentType.JPG, + DocumentType.JPEG, + DocumentType.PNG, + DocumentType.SVG, + } + + def __init__( + self, + config: R2RIngestionConfig, + database_provider: PostgresDatabaseProvider, + llm_provider: ( + LiteLLMCompletionProvider + | OpenAICompletionProvider + | R2RCompletionProvider + ), + ): + super().__init__(config, database_provider, llm_provider) + self.config: R2RIngestionConfig = config + self.database_provider: PostgresDatabaseProvider = database_provider + self.llm_provider: ( + LiteLLMCompletionProvider + | OpenAICompletionProvider + | R2RCompletionProvider + ) = llm_provider + self.parsers: dict[DocumentType, AsyncParser] = {} + self.text_splitter = self._build_text_splitter() + self._initialize_parsers() + + logger.info( + f"R2RIngestionProvider initialized with config: {self.config}" + ) + + def _initialize_parsers(self): + for doc_type, parser in self.DEFAULT_PARSERS.items(): + # will choose the first parser in the list + if doc_type not in self.config.excluded_parsers: + self.parsers[doc_type] = parser( + config=self.config, + database_provider=self.database_provider, + llm_provider=self.llm_provider, + ) + for doc_type, doc_parser_name in self.config.extra_parsers.items(): + self.parsers[f"{doc_parser_name}_{str(doc_type)}"] = ( + R2RIngestionProvider.EXTRA_PARSERS[doc_type][doc_parser_name]( + config=self.config, + database_provider=self.database_provider, + llm_provider=self.llm_provider, + ) + ) + + def _build_text_splitter( + self, ingestion_config_override: Optional[dict] = None + ) -> TextSplitter: + logger.info( + f"Initializing text splitter with method: {self.config.chunking_strategy}" + ) + + if not ingestion_config_override: + ingestion_config_override = {} + + chunking_strategy = ( + ingestion_config_override.get("chunking_strategy") + or self.config.chunking_strategy + ) + + chunk_size = ( + ingestion_config_override.get("chunk_size") + or self.config.chunk_size + ) + chunk_overlap = ( + ingestion_config_override.get("chunk_overlap") + or self.config.chunk_overlap + ) + + if chunking_strategy == ChunkingStrategy.RECURSIVE: + return RecursiveCharacterTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + ) + elif chunking_strategy == ChunkingStrategy.CHARACTER: + from core.base.utils.splitter.text import CharacterTextSplitter + + separator = ( + ingestion_config_override.get("separator") + or self.config.separator + or CharacterTextSplitter.DEFAULT_SEPARATOR + ) + + return CharacterTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + separator=separator, + keep_separator=False, + strip_whitespace=True, + ) + elif chunking_strategy == ChunkingStrategy.BASIC: + raise NotImplementedError( + "Basic chunking method not implemented. Please use Recursive." + ) + elif chunking_strategy == ChunkingStrategy.BY_TITLE: + raise NotImplementedError("By title method not implemented") + else: + raise ValueError(f"Unsupported method type: {chunking_strategy}") + + def validate_config(self) -> bool: + return self.config.chunk_size > 0 and self.config.chunk_overlap >= 0 + + def chunk( + self, + parsed_document: str | DocumentChunk, + ingestion_config_override: dict, + ) -> AsyncGenerator[Any, None]: + text_spliiter = self.text_splitter + if ingestion_config_override: + text_spliiter = self._build_text_splitter( + ingestion_config_override + ) + if isinstance(parsed_document, DocumentChunk): + parsed_document = parsed_document.data + + if isinstance(parsed_document, str): + chunks = text_spliiter.create_documents([parsed_document]) + else: + # Assuming parsed_document is already a list of text chunks + chunks = parsed_document + + for chunk in chunks: + yield ( + chunk.page_content if hasattr(chunk, "page_content") else chunk + ) + + async def parse( + self, + file_content: bytes, + document: Document, + ingestion_config_override: dict, + ) -> AsyncGenerator[DocumentChunk, None]: + if document.document_type not in self.parsers: + raise R2RDocumentProcessingError( + document_id=document.id, + error_message=f"Parser for {document.document_type} not found in `R2RIngestionProvider`.", + ) + else: + t0 = time.time() + contents = [] + parser_overrides = ingestion_config_override.get( + "parser_overrides", {} + ) + if document.document_type.value in parser_overrides: + logger.info( + f"Using parser_override for {document.document_type} with input value {parser_overrides[document.document_type.value]}" + ) + # TODO - Cleanup this approach to be less hardcoded + if ( + document.document_type != DocumentType.PDF + or parser_overrides[DocumentType.PDF.value] != "zerox" + ): + raise ValueError( + "Only Zerox PDF parser override is available." + ) + + # Collect content from VLMPDFParser + async for chunk in self.parsers[ + f"zerox_{DocumentType.PDF.value}" + ].ingest(file_content, **ingestion_config_override): + if isinstance(chunk, dict) and chunk.get("content"): + contents.append(chunk) + elif ( + chunk + ): # Handle string output for backward compatibility + contents.append({"content": chunk}) + + if ( + contents + and document.document_type == DocumentType.PDF + and parser_overrides.get(DocumentType.PDF.value) == "zerox" + ): + text_splitter = self._build_text_splitter( + ingestion_config_override + ) + + iteration = 0 + + sorted_contents = [ + item + for item in sorted( + contents, key=lambda x: x.get("page_number", 0) + ) + if isinstance(item.get("content"), str) + ] + + for content_item in sorted_contents: + page_num = content_item.get("page_number", 0) + page_content = content_item["content"] + + page_chunks = text_splitter.create_documents( + [page_content] + ) + + # Create document chunks for each split piece + for chunk in page_chunks: + metadata = { + **document.metadata, + "chunk_order": iteration, + "page_number": page_num, + } + + extraction = DocumentChunk( + id=generate_extraction_id( + document.id, iteration + ), + document_id=document.id, + owner_id=document.owner_id, + collection_ids=document.collection_ids, + data=chunk.page_content, + metadata=metadata, + ) + iteration += 1 + yield extraction + + logger.debug( + f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, " + f"user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} " + f"into {iteration} extractions in t={time.time() - t0:.2f} seconds using page-by-page splitting." + ) + return + + else: + # Standard parsing for non-override cases + async for text in self.parsers[document.document_type].ingest( + file_content, **ingestion_config_override + ): + if text is not None: + contents.append({"content": text}) + + if not contents: + logging.warning( + "No valid text content was extracted during parsing" + ) + return + + iteration = 0 + for content_item in contents: + chunk_text = content_item["content"] + chunks = self.chunk(chunk_text, ingestion_config_override) + + for chunk in chunks: + metadata = {**document.metadata, "chunk_order": iteration} + if "page_number" in content_item: + metadata["page_number"] = content_item["page_number"] + + extraction = DocumentChunk( + id=generate_extraction_id(document.id, iteration), + document_id=document.id, + owner_id=document.owner_id, + collection_ids=document.collection_ids, + data=chunk, + metadata=metadata, + ) + iteration += 1 + yield extraction + + logger.debug( + f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, " + f"user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} " + f"into {iteration} extractions in t={time.time() - t0:.2f} seconds." + ) + + def get_parser_for_document_type(self, doc_type: DocumentType) -> Any: + return self.parsers.get(doc_type) diff --git a/.venv/lib/python3.12/site-packages/core/providers/ingestion/unstructured/base.py b/.venv/lib/python3.12/site-packages/core/providers/ingestion/unstructured/base.py new file mode 100644 index 00000000..29c09bf5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/providers/ingestion/unstructured/base.py @@ -0,0 +1,396 @@ +# TODO - cleanup type issues in this file that relate to `bytes` +import asyncio +import base64 +import logging +import os +import time +from copy import copy +from io import BytesIO +from typing import Any, AsyncGenerator + +import httpx +from unstructured_client import UnstructuredClient +from unstructured_client.models import operations, shared + +from core import parsers +from core.base import ( + AsyncParser, + ChunkingStrategy, + Document, + DocumentChunk, + DocumentType, + RecursiveCharacterTextSplitter, +) +from core.base.abstractions import R2RSerializable +from core.base.providers.ingestion import IngestionConfig, IngestionProvider +from core.utils import generate_extraction_id + +from ...database import PostgresDatabaseProvider +from ...llm import ( + LiteLLMCompletionProvider, + OpenAICompletionProvider, + R2RCompletionProvider, +) + +logger = logging.getLogger() + + +class FallbackElement(R2RSerializable): + text: str + metadata: dict[str, Any] + + +class UnstructuredIngestionConfig(IngestionConfig): + combine_under_n_chars: int = 128 + max_characters: int = 500 + new_after_n_chars: int = 1500 + overlap: int = 64 + + coordinates: bool | None = None + encoding: str | None = None # utf-8 + extract_image_block_types: list[str] | None = None + gz_uncompressed_content_type: str | None = None + hi_res_model_name: str | None = None + include_orig_elements: bool | None = None + include_page_breaks: bool | None = None + + languages: list[str] | None = None + multipage_sections: bool | None = None + ocr_languages: list[str] | None = None + # output_format: Optional[str] = "application/json" + overlap_all: bool | None = None + pdf_infer_table_structure: bool | None = None + + similarity_threshold: float | None = None + skip_infer_table_types: list[str] | None = None + split_pdf_concurrency_level: int | None = None + split_pdf_page: bool | None = None + starting_page_number: int | None = None + strategy: str | None = None + chunking_strategy: str | ChunkingStrategy | None = None # type: ignore + unique_element_ids: bool | None = None + xml_keep_tags: bool | None = None + + def to_ingestion_request(self): + import json + + x = json.loads(self.json()) + x.pop("extra_fields", None) + x.pop("provider", None) + x.pop("excluded_parsers", None) + + x = {k: v for k, v in x.items() if v is not None} + return x + + +class UnstructuredIngestionProvider(IngestionProvider): + R2R_FALLBACK_PARSERS = { + DocumentType.GIF: [parsers.ImageParser], # type: ignore + DocumentType.JPEG: [parsers.ImageParser], # type: ignore + DocumentType.JPG: [parsers.ImageParser], # type: ignore + DocumentType.PNG: [parsers.ImageParser], # type: ignore + DocumentType.SVG: [parsers.ImageParser], # type: ignore + DocumentType.HEIC: [parsers.ImageParser], # type: ignore + DocumentType.MP3: [parsers.AudioParser], # type: ignore + DocumentType.JSON: [parsers.JSONParser], # type: ignore + DocumentType.HTML: [parsers.HTMLParser], # type: ignore + DocumentType.XLS: [parsers.XLSParser], # type: ignore + DocumentType.XLSX: [parsers.XLSXParser], # type: ignore + DocumentType.DOC: [parsers.DOCParser], # type: ignore + DocumentType.PPT: [parsers.PPTParser], # type: ignore + } + + EXTRA_PARSERS = { + DocumentType.CSV: {"advanced": parsers.CSVParserAdvanced}, # type: ignore + DocumentType.PDF: { + "unstructured": parsers.PDFParserUnstructured, # type: ignore + "zerox": parsers.VLMPDFParser, # type: ignore + }, + DocumentType.XLSX: {"advanced": parsers.XLSXParserAdvanced}, # type: ignore + } + + IMAGE_TYPES = { + DocumentType.GIF, + DocumentType.HEIC, + DocumentType.JPG, + DocumentType.JPEG, + DocumentType.PNG, + DocumentType.SVG, + } + + def __init__( + self, + config: UnstructuredIngestionConfig, + database_provider: PostgresDatabaseProvider, + llm_provider: ( + LiteLLMCompletionProvider + | OpenAICompletionProvider + | R2RCompletionProvider + ), + ): + super().__init__(config, database_provider, llm_provider) + self.config: UnstructuredIngestionConfig = config + self.database_provider: PostgresDatabaseProvider = database_provider + self.llm_provider: ( + LiteLLMCompletionProvider + | OpenAICompletionProvider + | R2RCompletionProvider + ) = llm_provider + + if config.provider == "unstructured_api": + try: + self.unstructured_api_auth = os.environ["UNSTRUCTURED_API_KEY"] + except KeyError as e: + raise ValueError( + "UNSTRUCTURED_API_KEY environment variable is not set" + ) from e + + self.unstructured_api_url = os.environ.get( + "UNSTRUCTURED_API_URL", + "https://api.unstructuredapp.io/general/v0/general", + ) + + self.client = UnstructuredClient( + api_key_auth=self.unstructured_api_auth, + server_url=self.unstructured_api_url, + ) + self.shared = shared + self.operations = operations + + else: + try: + self.local_unstructured_url = os.environ[ + "UNSTRUCTURED_SERVICE_URL" + ] + except KeyError as e: + raise ValueError( + "UNSTRUCTURED_SERVICE_URL environment variable is not set" + ) from e + + self.client = httpx.AsyncClient() + + self.parsers: dict[DocumentType, AsyncParser] = {} + self._initialize_parsers() + + def _initialize_parsers(self): + for doc_type, parsers in self.R2R_FALLBACK_PARSERS.items(): + for parser in parsers: + if ( + doc_type not in self.config.excluded_parsers + and doc_type not in self.parsers + ): + # will choose the first parser in the list + self.parsers[doc_type] = parser( + config=self.config, + database_provider=self.database_provider, + llm_provider=self.llm_provider, + ) + # TODO - Reduce code duplication between Unstructured & R2R + for doc_type, doc_parser_name in self.config.extra_parsers.items(): + self.parsers[f"{doc_parser_name}_{str(doc_type)}"] = ( + UnstructuredIngestionProvider.EXTRA_PARSERS[doc_type][ + doc_parser_name + ]( + config=self.config, + database_provider=self.database_provider, + llm_provider=self.llm_provider, + ) + ) + + async def parse_fallback( + self, + file_content: bytes, + ingestion_config: dict, + parser_name: str, + ) -> AsyncGenerator[FallbackElement, None]: + contents = [] + async for chunk in self.parsers[parser_name].ingest( # type: ignore + file_content, **ingestion_config + ): # type: ignore + if isinstance(chunk, dict) and chunk.get("content"): + contents.append(chunk) + elif chunk: # Handle string output for backward compatibility + contents.append({"content": chunk}) + + if not contents: + logging.warning( + "No valid text content was extracted during parsing" + ) + return + + logging.info(f"Fallback ingestion with config = {ingestion_config}") + + iteration = 0 + for content_item in contents: + text = content_item["content"] + + loop = asyncio.get_event_loop() + splitter = RecursiveCharacterTextSplitter( + chunk_size=ingestion_config["new_after_n_chars"], + chunk_overlap=ingestion_config["overlap"], + ) + chunks = await loop.run_in_executor( + None, splitter.create_documents, [text] + ) + + for text_chunk in chunks: + metadata = {"chunk_id": iteration} + if "page_number" in content_item: + metadata["page_number"] = content_item["page_number"] + + yield FallbackElement( + text=text_chunk.page_content, + metadata=metadata, + ) + iteration += 1 + await asyncio.sleep(0) + + async def parse( + self, + file_content: bytes, + document: Document, + ingestion_config_override: dict, + ) -> AsyncGenerator[DocumentChunk, None]: + ingestion_config = copy( + { + **self.config.to_ingestion_request(), + **(ingestion_config_override or {}), + } + ) + # cleanup extra fields + ingestion_config.pop("provider", None) + ingestion_config.pop("excluded_parsers", None) + + t0 = time.time() + parser_overrides = ingestion_config_override.get( + "parser_overrides", {} + ) + elements = [] + + # TODO - Cleanup this approach to be less hardcoded + # TODO - Remove code duplication between Unstructured & R2R + if document.document_type.value in parser_overrides: + logger.info( + f"Using parser_override for {document.document_type} with input value {parser_overrides[document.document_type.value]}" + ) + async for element in self.parse_fallback( + file_content, + ingestion_config=ingestion_config, + parser_name=f"zerox_{DocumentType.PDF.value}", + ): + elements.append(element) + + elif document.document_type in self.R2R_FALLBACK_PARSERS.keys(): + logger.info( + f"Parsing {document.document_type}: {document.id} with fallback parser" + ) + async for element in self.parse_fallback( + file_content, + ingestion_config=ingestion_config, + parser_name=document.document_type, + ): + elements.append(element) + else: + logger.info( + f"Parsing {document.document_type}: {document.id} with unstructured" + ) + if isinstance(file_content, bytes): + file_content = BytesIO(file_content) # type: ignore + + # TODO - Include check on excluded parsers here. + if self.config.provider == "unstructured_api": + logger.info(f"Using API to parse document {document.id}") + files = self.shared.Files( + content=file_content.read(), # type: ignore + file_name=document.metadata.get("title", "unknown_file"), + ) + + ingestion_config.pop("app", None) + ingestion_config.pop("extra_parsers", None) + + req = self.operations.PartitionRequest( + self.shared.PartitionParameters( + files=files, + **ingestion_config, + ) + ) + elements = self.client.general.partition(req) # type: ignore + elements = list(elements.elements) # type: ignore + + else: + logger.info( + f"Using local unstructured fastapi server to parse document {document.id}" + ) + # Base64 encode the file content + encoded_content = base64.b64encode(file_content.read()).decode( # type: ignore + "utf-8" + ) + + logger.info( + f"Sending a request to {self.local_unstructured_url}/partition" + ) + + response = await self.client.post( + f"{self.local_unstructured_url}/partition", + json={ + "file_content": encoded_content, # Use encoded string + "ingestion_config": ingestion_config, + "filename": document.metadata.get("title", None), + }, + timeout=3600, # Adjust timeout as needed + ) + + if response.status_code != 200: + logger.error(f"Error partitioning file: {response.text}") + raise ValueError( + f"Error partitioning file: {response.text}" + ) + elements = response.json().get("elements", []) + + iteration = 0 # if there are no chunks + for iteration, element in enumerate(elements): + if isinstance(element, FallbackElement): + text = element.text + metadata = copy(document.metadata) + metadata.update(element.metadata) + else: + element_dict = ( + element if isinstance(element, dict) else element.to_dict() + ) + text = element_dict.get("text", "") + if text == "": + continue + + metadata = copy(document.metadata) + for key, value in element_dict.items(): + if key == "metadata": + for k, v in value.items(): + if k not in metadata and k != "orig_elements": + metadata[f"unstructured_{k}"] = v + + # indicate that the document was chunked using unstructured + # nullifies the need for chunking in the pipeline + metadata["partitioned_by_unstructured"] = True + metadata["chunk_order"] = iteration + # creating the text extraction + yield DocumentChunk( + id=generate_extraction_id(document.id, iteration), + document_id=document.id, + owner_id=document.owner_id, + collection_ids=document.collection_ids, + data=text, + metadata=metadata, + ) + + # TODO: explore why this is throwing inadvertedly + # if iteration == 0: + # raise ValueError(f"No chunks found for document {document.id}") + + logger.debug( + f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, " + f"user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} " + f"into {iteration + 1} extractions in t={time.time() - t0:.2f} seconds." + ) + + def get_parser_for_document_type(self, doc_type: DocumentType) -> str: + return "unstructured_local" |