about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/core/providers/ingestion
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/providers/ingestion')
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/ingestion/__init__.py13
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py355
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/ingestion/unstructured/base.py396
3 files changed, 764 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/providers/ingestion/__init__.py b/.venv/lib/python3.12/site-packages/core/providers/ingestion/__init__.py
new file mode 100644
index 00000000..4a25d30d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/ingestion/__init__.py
@@ -0,0 +1,13 @@
+# type: ignore
+from .r2r.base import R2RIngestionConfig, R2RIngestionProvider
+from .unstructured.base import (
+    UnstructuredIngestionConfig,
+    UnstructuredIngestionProvider,
+)
+
+__all__ = [
+    "R2RIngestionConfig",
+    "R2RIngestionProvider",
+    "UnstructuredIngestionProvider",
+    "UnstructuredIngestionConfig",
+]
diff --git a/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py b/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py
new file mode 100644
index 00000000..7d452245
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py
@@ -0,0 +1,355 @@
+# type: ignore
+import logging
+import time
+from typing import Any, AsyncGenerator, Optional
+
+from core import parsers
+from core.base import (
+    AsyncParser,
+    ChunkingStrategy,
+    Document,
+    DocumentChunk,
+    DocumentType,
+    IngestionConfig,
+    IngestionProvider,
+    R2RDocumentProcessingError,
+    RecursiveCharacterTextSplitter,
+    TextSplitter,
+)
+from core.utils import generate_extraction_id
+
+from ...database import PostgresDatabaseProvider
+from ...llm import (
+    LiteLLMCompletionProvider,
+    OpenAICompletionProvider,
+    R2RCompletionProvider,
+)
+
+logger = logging.getLogger()
+
+
+class R2RIngestionConfig(IngestionConfig):
+    chunk_size: int = 1024
+    chunk_overlap: int = 512
+    chunking_strategy: ChunkingStrategy = ChunkingStrategy.RECURSIVE
+    extra_fields: dict[str, Any] = {}
+    separator: Optional[str] = None
+
+
+class R2RIngestionProvider(IngestionProvider):
+    DEFAULT_PARSERS = {
+        DocumentType.BMP: parsers.BMPParser,
+        DocumentType.CSV: parsers.CSVParser,
+        DocumentType.DOC: parsers.DOCParser,
+        DocumentType.DOCX: parsers.DOCXParser,
+        DocumentType.EML: parsers.EMLParser,
+        DocumentType.EPUB: parsers.EPUBParser,
+        DocumentType.HTML: parsers.HTMLParser,
+        DocumentType.HTM: parsers.HTMLParser,
+        DocumentType.ODT: parsers.ODTParser,
+        DocumentType.JSON: parsers.JSONParser,
+        DocumentType.MSG: parsers.MSGParser,
+        DocumentType.ORG: parsers.ORGParser,
+        DocumentType.MD: parsers.MDParser,
+        DocumentType.PDF: parsers.BasicPDFParser,
+        DocumentType.PPT: parsers.PPTParser,
+        DocumentType.PPTX: parsers.PPTXParser,
+        DocumentType.TXT: parsers.TextParser,
+        DocumentType.XLSX: parsers.XLSXParser,
+        DocumentType.GIF: parsers.ImageParser,
+        DocumentType.JPEG: parsers.ImageParser,
+        DocumentType.JPG: parsers.ImageParser,
+        DocumentType.TSV: parsers.TSVParser,
+        DocumentType.PNG: parsers.ImageParser,
+        DocumentType.HEIC: parsers.ImageParser,
+        DocumentType.SVG: parsers.ImageParser,
+        DocumentType.MP3: parsers.AudioParser,
+        DocumentType.P7S: parsers.P7SParser,
+        DocumentType.RST: parsers.RSTParser,
+        DocumentType.RTF: parsers.RTFParser,
+        DocumentType.TIFF: parsers.ImageParser,
+        DocumentType.XLS: parsers.XLSParser,
+    }
+
+    EXTRA_PARSERS = {
+        DocumentType.CSV: {"advanced": parsers.CSVParserAdvanced},
+        DocumentType.PDF: {
+            "unstructured": parsers.PDFParserUnstructured,
+            "zerox": parsers.VLMPDFParser,
+        },
+        DocumentType.XLSX: {"advanced": parsers.XLSXParserAdvanced},
+    }
+
+    IMAGE_TYPES = {
+        DocumentType.GIF,
+        DocumentType.HEIC,
+        DocumentType.JPG,
+        DocumentType.JPEG,
+        DocumentType.PNG,
+        DocumentType.SVG,
+    }
+
+    def __init__(
+        self,
+        config: R2RIngestionConfig,
+        database_provider: PostgresDatabaseProvider,
+        llm_provider: (
+            LiteLLMCompletionProvider
+            | OpenAICompletionProvider
+            | R2RCompletionProvider
+        ),
+    ):
+        super().__init__(config, database_provider, llm_provider)
+        self.config: R2RIngestionConfig = config
+        self.database_provider: PostgresDatabaseProvider = database_provider
+        self.llm_provider: (
+            LiteLLMCompletionProvider
+            | OpenAICompletionProvider
+            | R2RCompletionProvider
+        ) = llm_provider
+        self.parsers: dict[DocumentType, AsyncParser] = {}
+        self.text_splitter = self._build_text_splitter()
+        self._initialize_parsers()
+
+        logger.info(
+            f"R2RIngestionProvider initialized with config: {self.config}"
+        )
+
+    def _initialize_parsers(self):
+        for doc_type, parser in self.DEFAULT_PARSERS.items():
+            # will choose the first parser in the list
+            if doc_type not in self.config.excluded_parsers:
+                self.parsers[doc_type] = parser(
+                    config=self.config,
+                    database_provider=self.database_provider,
+                    llm_provider=self.llm_provider,
+                )
+        for doc_type, doc_parser_name in self.config.extra_parsers.items():
+            self.parsers[f"{doc_parser_name}_{str(doc_type)}"] = (
+                R2RIngestionProvider.EXTRA_PARSERS[doc_type][doc_parser_name](
+                    config=self.config,
+                    database_provider=self.database_provider,
+                    llm_provider=self.llm_provider,
+                )
+            )
+
+    def _build_text_splitter(
+        self, ingestion_config_override: Optional[dict] = None
+    ) -> TextSplitter:
+        logger.info(
+            f"Initializing text splitter with method: {self.config.chunking_strategy}"
+        )
+
+        if not ingestion_config_override:
+            ingestion_config_override = {}
+
+        chunking_strategy = (
+            ingestion_config_override.get("chunking_strategy")
+            or self.config.chunking_strategy
+        )
+
+        chunk_size = (
+            ingestion_config_override.get("chunk_size")
+            or self.config.chunk_size
+        )
+        chunk_overlap = (
+            ingestion_config_override.get("chunk_overlap")
+            or self.config.chunk_overlap
+        )
+
+        if chunking_strategy == ChunkingStrategy.RECURSIVE:
+            return RecursiveCharacterTextSplitter(
+                chunk_size=chunk_size,
+                chunk_overlap=chunk_overlap,
+            )
+        elif chunking_strategy == ChunkingStrategy.CHARACTER:
+            from core.base.utils.splitter.text import CharacterTextSplitter
+
+            separator = (
+                ingestion_config_override.get("separator")
+                or self.config.separator
+                or CharacterTextSplitter.DEFAULT_SEPARATOR
+            )
+
+            return CharacterTextSplitter(
+                chunk_size=chunk_size,
+                chunk_overlap=chunk_overlap,
+                separator=separator,
+                keep_separator=False,
+                strip_whitespace=True,
+            )
+        elif chunking_strategy == ChunkingStrategy.BASIC:
+            raise NotImplementedError(
+                "Basic chunking method not implemented. Please use Recursive."
+            )
+        elif chunking_strategy == ChunkingStrategy.BY_TITLE:
+            raise NotImplementedError("By title method not implemented")
+        else:
+            raise ValueError(f"Unsupported method type: {chunking_strategy}")
+
+    def validate_config(self) -> bool:
+        return self.config.chunk_size > 0 and self.config.chunk_overlap >= 0
+
+    def chunk(
+        self,
+        parsed_document: str | DocumentChunk,
+        ingestion_config_override: dict,
+    ) -> AsyncGenerator[Any, None]:
+        text_spliiter = self.text_splitter
+        if ingestion_config_override:
+            text_spliiter = self._build_text_splitter(
+                ingestion_config_override
+            )
+        if isinstance(parsed_document, DocumentChunk):
+            parsed_document = parsed_document.data
+
+        if isinstance(parsed_document, str):
+            chunks = text_spliiter.create_documents([parsed_document])
+        else:
+            # Assuming parsed_document is already a list of text chunks
+            chunks = parsed_document
+
+        for chunk in chunks:
+            yield (
+                chunk.page_content if hasattr(chunk, "page_content") else chunk
+            )
+
+    async def parse(
+        self,
+        file_content: bytes,
+        document: Document,
+        ingestion_config_override: dict,
+    ) -> AsyncGenerator[DocumentChunk, None]:
+        if document.document_type not in self.parsers:
+            raise R2RDocumentProcessingError(
+                document_id=document.id,
+                error_message=f"Parser for {document.document_type} not found in `R2RIngestionProvider`.",
+            )
+        else:
+            t0 = time.time()
+            contents = []
+            parser_overrides = ingestion_config_override.get(
+                "parser_overrides", {}
+            )
+            if document.document_type.value in parser_overrides:
+                logger.info(
+                    f"Using parser_override for {document.document_type} with input value {parser_overrides[document.document_type.value]}"
+                )
+                # TODO - Cleanup this approach to be less hardcoded
+                if (
+                    document.document_type != DocumentType.PDF
+                    or parser_overrides[DocumentType.PDF.value] != "zerox"
+                ):
+                    raise ValueError(
+                        "Only Zerox PDF parser override is available."
+                    )
+
+                # Collect content from VLMPDFParser
+                async for chunk in self.parsers[
+                    f"zerox_{DocumentType.PDF.value}"
+                ].ingest(file_content, **ingestion_config_override):
+                    if isinstance(chunk, dict) and chunk.get("content"):
+                        contents.append(chunk)
+                    elif (
+                        chunk
+                    ):  # Handle string output for backward compatibility
+                        contents.append({"content": chunk})
+
+                if (
+                    contents
+                    and document.document_type == DocumentType.PDF
+                    and parser_overrides.get(DocumentType.PDF.value) == "zerox"
+                ):
+                    text_splitter = self._build_text_splitter(
+                        ingestion_config_override
+                    )
+
+                    iteration = 0
+
+                    sorted_contents = [
+                        item
+                        for item in sorted(
+                            contents, key=lambda x: x.get("page_number", 0)
+                        )
+                        if isinstance(item.get("content"), str)
+                    ]
+
+                    for content_item in sorted_contents:
+                        page_num = content_item.get("page_number", 0)
+                        page_content = content_item["content"]
+
+                        page_chunks = text_splitter.create_documents(
+                            [page_content]
+                        )
+
+                        # Create document chunks for each split piece
+                        for chunk in page_chunks:
+                            metadata = {
+                                **document.metadata,
+                                "chunk_order": iteration,
+                                "page_number": page_num,
+                            }
+
+                            extraction = DocumentChunk(
+                                id=generate_extraction_id(
+                                    document.id, iteration
+                                ),
+                                document_id=document.id,
+                                owner_id=document.owner_id,
+                                collection_ids=document.collection_ids,
+                                data=chunk.page_content,
+                                metadata=metadata,
+                            )
+                            iteration += 1
+                            yield extraction
+
+                    logger.debug(
+                        f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, "
+                        f"user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} "
+                        f"into {iteration} extractions in t={time.time() - t0:.2f} seconds using page-by-page splitting."
+                    )
+                    return
+
+            else:
+                # Standard parsing for non-override cases
+                async for text in self.parsers[document.document_type].ingest(
+                    file_content, **ingestion_config_override
+                ):
+                    if text is not None:
+                        contents.append({"content": text})
+
+            if not contents:
+                logging.warning(
+                    "No valid text content was extracted during parsing"
+                )
+                return
+
+            iteration = 0
+            for content_item in contents:
+                chunk_text = content_item["content"]
+                chunks = self.chunk(chunk_text, ingestion_config_override)
+
+                for chunk in chunks:
+                    metadata = {**document.metadata, "chunk_order": iteration}
+                    if "page_number" in content_item:
+                        metadata["page_number"] = content_item["page_number"]
+
+                    extraction = DocumentChunk(
+                        id=generate_extraction_id(document.id, iteration),
+                        document_id=document.id,
+                        owner_id=document.owner_id,
+                        collection_ids=document.collection_ids,
+                        data=chunk,
+                        metadata=metadata,
+                    )
+                    iteration += 1
+                    yield extraction
+
+            logger.debug(
+                f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, "
+                f"user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} "
+                f"into {iteration} extractions in t={time.time() - t0:.2f} seconds."
+            )
+
+    def get_parser_for_document_type(self, doc_type: DocumentType) -> Any:
+        return self.parsers.get(doc_type)
diff --git a/.venv/lib/python3.12/site-packages/core/providers/ingestion/unstructured/base.py b/.venv/lib/python3.12/site-packages/core/providers/ingestion/unstructured/base.py
new file mode 100644
index 00000000..29c09bf5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/ingestion/unstructured/base.py
@@ -0,0 +1,396 @@
+# TODO - cleanup type issues in this file that relate to `bytes`
+import asyncio
+import base64
+import logging
+import os
+import time
+from copy import copy
+from io import BytesIO
+from typing import Any, AsyncGenerator
+
+import httpx
+from unstructured_client import UnstructuredClient
+from unstructured_client.models import operations, shared
+
+from core import parsers
+from core.base import (
+    AsyncParser,
+    ChunkingStrategy,
+    Document,
+    DocumentChunk,
+    DocumentType,
+    RecursiveCharacterTextSplitter,
+)
+from core.base.abstractions import R2RSerializable
+from core.base.providers.ingestion import IngestionConfig, IngestionProvider
+from core.utils import generate_extraction_id
+
+from ...database import PostgresDatabaseProvider
+from ...llm import (
+    LiteLLMCompletionProvider,
+    OpenAICompletionProvider,
+    R2RCompletionProvider,
+)
+
+logger = logging.getLogger()
+
+
+class FallbackElement(R2RSerializable):
+    text: str
+    metadata: dict[str, Any]
+
+
+class UnstructuredIngestionConfig(IngestionConfig):
+    combine_under_n_chars: int = 128
+    max_characters: int = 500
+    new_after_n_chars: int = 1500
+    overlap: int = 64
+
+    coordinates: bool | None = None
+    encoding: str | None = None  # utf-8
+    extract_image_block_types: list[str] | None = None
+    gz_uncompressed_content_type: str | None = None
+    hi_res_model_name: str | None = None
+    include_orig_elements: bool | None = None
+    include_page_breaks: bool | None = None
+
+    languages: list[str] | None = None
+    multipage_sections: bool | None = None
+    ocr_languages: list[str] | None = None
+    # output_format: Optional[str] = "application/json"
+    overlap_all: bool | None = None
+    pdf_infer_table_structure: bool | None = None
+
+    similarity_threshold: float | None = None
+    skip_infer_table_types: list[str] | None = None
+    split_pdf_concurrency_level: int | None = None
+    split_pdf_page: bool | None = None
+    starting_page_number: int | None = None
+    strategy: str | None = None
+    chunking_strategy: str | ChunkingStrategy | None = None  # type: ignore
+    unique_element_ids: bool | None = None
+    xml_keep_tags: bool | None = None
+
+    def to_ingestion_request(self):
+        import json
+
+        x = json.loads(self.json())
+        x.pop("extra_fields", None)
+        x.pop("provider", None)
+        x.pop("excluded_parsers", None)
+
+        x = {k: v for k, v in x.items() if v is not None}
+        return x
+
+
+class UnstructuredIngestionProvider(IngestionProvider):
+    R2R_FALLBACK_PARSERS = {
+        DocumentType.GIF: [parsers.ImageParser],  # type: ignore
+        DocumentType.JPEG: [parsers.ImageParser],  # type: ignore
+        DocumentType.JPG: [parsers.ImageParser],  # type: ignore
+        DocumentType.PNG: [parsers.ImageParser],  # type: ignore
+        DocumentType.SVG: [parsers.ImageParser],  # type: ignore
+        DocumentType.HEIC: [parsers.ImageParser],  # type: ignore
+        DocumentType.MP3: [parsers.AudioParser],  # type: ignore
+        DocumentType.JSON: [parsers.JSONParser],  # type: ignore
+        DocumentType.HTML: [parsers.HTMLParser],  # type: ignore
+        DocumentType.XLS: [parsers.XLSParser],  # type: ignore
+        DocumentType.XLSX: [parsers.XLSXParser],  # type: ignore
+        DocumentType.DOC: [parsers.DOCParser],  # type: ignore
+        DocumentType.PPT: [parsers.PPTParser],  # type: ignore
+    }
+
+    EXTRA_PARSERS = {
+        DocumentType.CSV: {"advanced": parsers.CSVParserAdvanced},  # type: ignore
+        DocumentType.PDF: {
+            "unstructured": parsers.PDFParserUnstructured,  # type: ignore
+            "zerox": parsers.VLMPDFParser,  # type: ignore
+        },
+        DocumentType.XLSX: {"advanced": parsers.XLSXParserAdvanced},  # type: ignore
+    }
+
+    IMAGE_TYPES = {
+        DocumentType.GIF,
+        DocumentType.HEIC,
+        DocumentType.JPG,
+        DocumentType.JPEG,
+        DocumentType.PNG,
+        DocumentType.SVG,
+    }
+
+    def __init__(
+        self,
+        config: UnstructuredIngestionConfig,
+        database_provider: PostgresDatabaseProvider,
+        llm_provider: (
+            LiteLLMCompletionProvider
+            | OpenAICompletionProvider
+            | R2RCompletionProvider
+        ),
+    ):
+        super().__init__(config, database_provider, llm_provider)
+        self.config: UnstructuredIngestionConfig = config
+        self.database_provider: PostgresDatabaseProvider = database_provider
+        self.llm_provider: (
+            LiteLLMCompletionProvider
+            | OpenAICompletionProvider
+            | R2RCompletionProvider
+        ) = llm_provider
+
+        if config.provider == "unstructured_api":
+            try:
+                self.unstructured_api_auth = os.environ["UNSTRUCTURED_API_KEY"]
+            except KeyError as e:
+                raise ValueError(
+                    "UNSTRUCTURED_API_KEY environment variable is not set"
+                ) from e
+
+            self.unstructured_api_url = os.environ.get(
+                "UNSTRUCTURED_API_URL",
+                "https://api.unstructuredapp.io/general/v0/general",
+            )
+
+            self.client = UnstructuredClient(
+                api_key_auth=self.unstructured_api_auth,
+                server_url=self.unstructured_api_url,
+            )
+            self.shared = shared
+            self.operations = operations
+
+        else:
+            try:
+                self.local_unstructured_url = os.environ[
+                    "UNSTRUCTURED_SERVICE_URL"
+                ]
+            except KeyError as e:
+                raise ValueError(
+                    "UNSTRUCTURED_SERVICE_URL environment variable is not set"
+                ) from e
+
+            self.client = httpx.AsyncClient()
+
+        self.parsers: dict[DocumentType, AsyncParser] = {}
+        self._initialize_parsers()
+
+    def _initialize_parsers(self):
+        for doc_type, parsers in self.R2R_FALLBACK_PARSERS.items():
+            for parser in parsers:
+                if (
+                    doc_type not in self.config.excluded_parsers
+                    and doc_type not in self.parsers
+                ):
+                    # will choose the first parser in the list
+                    self.parsers[doc_type] = parser(
+                        config=self.config,
+                        database_provider=self.database_provider,
+                        llm_provider=self.llm_provider,
+                    )
+        # TODO - Reduce code duplication between Unstructured & R2R
+        for doc_type, doc_parser_name in self.config.extra_parsers.items():
+            self.parsers[f"{doc_parser_name}_{str(doc_type)}"] = (
+                UnstructuredIngestionProvider.EXTRA_PARSERS[doc_type][
+                    doc_parser_name
+                ](
+                    config=self.config,
+                    database_provider=self.database_provider,
+                    llm_provider=self.llm_provider,
+                )
+            )
+
+    async def parse_fallback(
+        self,
+        file_content: bytes,
+        ingestion_config: dict,
+        parser_name: str,
+    ) -> AsyncGenerator[FallbackElement, None]:
+        contents = []
+        async for chunk in self.parsers[parser_name].ingest(  # type: ignore
+            file_content, **ingestion_config
+        ):  # type: ignore
+            if isinstance(chunk, dict) and chunk.get("content"):
+                contents.append(chunk)
+            elif chunk:  # Handle string output for backward compatibility
+                contents.append({"content": chunk})
+
+        if not contents:
+            logging.warning(
+                "No valid text content was extracted during parsing"
+            )
+            return
+
+        logging.info(f"Fallback ingestion with config = {ingestion_config}")
+
+        iteration = 0
+        for content_item in contents:
+            text = content_item["content"]
+
+            loop = asyncio.get_event_loop()
+            splitter = RecursiveCharacterTextSplitter(
+                chunk_size=ingestion_config["new_after_n_chars"],
+                chunk_overlap=ingestion_config["overlap"],
+            )
+            chunks = await loop.run_in_executor(
+                None, splitter.create_documents, [text]
+            )
+
+            for text_chunk in chunks:
+                metadata = {"chunk_id": iteration}
+                if "page_number" in content_item:
+                    metadata["page_number"] = content_item["page_number"]
+
+                yield FallbackElement(
+                    text=text_chunk.page_content,
+                    metadata=metadata,
+                )
+                iteration += 1
+                await asyncio.sleep(0)
+
+    async def parse(
+        self,
+        file_content: bytes,
+        document: Document,
+        ingestion_config_override: dict,
+    ) -> AsyncGenerator[DocumentChunk, None]:
+        ingestion_config = copy(
+            {
+                **self.config.to_ingestion_request(),
+                **(ingestion_config_override or {}),
+            }
+        )
+        # cleanup extra fields
+        ingestion_config.pop("provider", None)
+        ingestion_config.pop("excluded_parsers", None)
+
+        t0 = time.time()
+        parser_overrides = ingestion_config_override.get(
+            "parser_overrides", {}
+        )
+        elements = []
+
+        # TODO - Cleanup this approach to be less hardcoded
+        # TODO - Remove code duplication between Unstructured & R2R
+        if document.document_type.value in parser_overrides:
+            logger.info(
+                f"Using parser_override for {document.document_type} with input value {parser_overrides[document.document_type.value]}"
+            )
+            async for element in self.parse_fallback(
+                file_content,
+                ingestion_config=ingestion_config,
+                parser_name=f"zerox_{DocumentType.PDF.value}",
+            ):
+                elements.append(element)
+
+        elif document.document_type in self.R2R_FALLBACK_PARSERS.keys():
+            logger.info(
+                f"Parsing {document.document_type}: {document.id} with fallback parser"
+            )
+            async for element in self.parse_fallback(
+                file_content,
+                ingestion_config=ingestion_config,
+                parser_name=document.document_type,
+            ):
+                elements.append(element)
+        else:
+            logger.info(
+                f"Parsing {document.document_type}: {document.id} with unstructured"
+            )
+            if isinstance(file_content, bytes):
+                file_content = BytesIO(file_content)  # type: ignore
+
+            # TODO - Include check on excluded parsers here.
+            if self.config.provider == "unstructured_api":
+                logger.info(f"Using API to parse document {document.id}")
+                files = self.shared.Files(
+                    content=file_content.read(),  # type: ignore
+                    file_name=document.metadata.get("title", "unknown_file"),
+                )
+
+                ingestion_config.pop("app", None)
+                ingestion_config.pop("extra_parsers", None)
+
+                req = self.operations.PartitionRequest(
+                    self.shared.PartitionParameters(
+                        files=files,
+                        **ingestion_config,
+                    )
+                )
+                elements = self.client.general.partition(req)  # type: ignore
+                elements = list(elements.elements)  # type: ignore
+
+            else:
+                logger.info(
+                    f"Using local unstructured fastapi server to parse document {document.id}"
+                )
+                # Base64 encode the file content
+                encoded_content = base64.b64encode(file_content.read()).decode(  # type: ignore
+                    "utf-8"
+                )
+
+                logger.info(
+                    f"Sending a request to {self.local_unstructured_url}/partition"
+                )
+
+                response = await self.client.post(
+                    f"{self.local_unstructured_url}/partition",
+                    json={
+                        "file_content": encoded_content,  # Use encoded string
+                        "ingestion_config": ingestion_config,
+                        "filename": document.metadata.get("title", None),
+                    },
+                    timeout=3600,  # Adjust timeout as needed
+                )
+
+                if response.status_code != 200:
+                    logger.error(f"Error partitioning file: {response.text}")
+                    raise ValueError(
+                        f"Error partitioning file: {response.text}"
+                    )
+                elements = response.json().get("elements", [])
+
+        iteration = 0  # if there are no chunks
+        for iteration, element in enumerate(elements):
+            if isinstance(element, FallbackElement):
+                text = element.text
+                metadata = copy(document.metadata)
+                metadata.update(element.metadata)
+            else:
+                element_dict = (
+                    element if isinstance(element, dict) else element.to_dict()
+                )
+                text = element_dict.get("text", "")
+                if text == "":
+                    continue
+
+                metadata = copy(document.metadata)
+                for key, value in element_dict.items():
+                    if key == "metadata":
+                        for k, v in value.items():
+                            if k not in metadata and k != "orig_elements":
+                                metadata[f"unstructured_{k}"] = v
+
+            # indicate that the document was chunked using unstructured
+            # nullifies the need for chunking in the pipeline
+            metadata["partitioned_by_unstructured"] = True
+            metadata["chunk_order"] = iteration
+            # creating the text extraction
+            yield DocumentChunk(
+                id=generate_extraction_id(document.id, iteration),
+                document_id=document.id,
+                owner_id=document.owner_id,
+                collection_ids=document.collection_ids,
+                data=text,
+                metadata=metadata,
+            )
+
+        # TODO: explore why this is throwing inadvertedly
+        # if iteration == 0:
+        #     raise ValueError(f"No chunks found for document {document.id}")
+
+        logger.debug(
+            f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, "
+            f"user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} "
+            f"into {iteration + 1} extractions in t={time.time() - t0:.2f} seconds."
+        )
+
+    def get_parser_for_document_type(self, doc_type: DocumentType) -> str:
+        return "unstructured_local"