aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/core/providers/ingestion
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/providers/ingestion')
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/ingestion/__init__.py13
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py355
-rw-r--r--.venv/lib/python3.12/site-packages/core/providers/ingestion/unstructured/base.py396
3 files changed, 764 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/providers/ingestion/__init__.py b/.venv/lib/python3.12/site-packages/core/providers/ingestion/__init__.py
new file mode 100644
index 00000000..4a25d30d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/ingestion/__init__.py
@@ -0,0 +1,13 @@
+# type: ignore
+from .r2r.base import R2RIngestionConfig, R2RIngestionProvider
+from .unstructured.base import (
+ UnstructuredIngestionConfig,
+ UnstructuredIngestionProvider,
+)
+
+__all__ = [
+ "R2RIngestionConfig",
+ "R2RIngestionProvider",
+ "UnstructuredIngestionProvider",
+ "UnstructuredIngestionConfig",
+]
diff --git a/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py b/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py
new file mode 100644
index 00000000..7d452245
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/ingestion/r2r/base.py
@@ -0,0 +1,355 @@
+# type: ignore
+import logging
+import time
+from typing import Any, AsyncGenerator, Optional
+
+from core import parsers
+from core.base import (
+ AsyncParser,
+ ChunkingStrategy,
+ Document,
+ DocumentChunk,
+ DocumentType,
+ IngestionConfig,
+ IngestionProvider,
+ R2RDocumentProcessingError,
+ RecursiveCharacterTextSplitter,
+ TextSplitter,
+)
+from core.utils import generate_extraction_id
+
+from ...database import PostgresDatabaseProvider
+from ...llm import (
+ LiteLLMCompletionProvider,
+ OpenAICompletionProvider,
+ R2RCompletionProvider,
+)
+
+logger = logging.getLogger()
+
+
+class R2RIngestionConfig(IngestionConfig):
+ chunk_size: int = 1024
+ chunk_overlap: int = 512
+ chunking_strategy: ChunkingStrategy = ChunkingStrategy.RECURSIVE
+ extra_fields: dict[str, Any] = {}
+ separator: Optional[str] = None
+
+
+class R2RIngestionProvider(IngestionProvider):
+ DEFAULT_PARSERS = {
+ DocumentType.BMP: parsers.BMPParser,
+ DocumentType.CSV: parsers.CSVParser,
+ DocumentType.DOC: parsers.DOCParser,
+ DocumentType.DOCX: parsers.DOCXParser,
+ DocumentType.EML: parsers.EMLParser,
+ DocumentType.EPUB: parsers.EPUBParser,
+ DocumentType.HTML: parsers.HTMLParser,
+ DocumentType.HTM: parsers.HTMLParser,
+ DocumentType.ODT: parsers.ODTParser,
+ DocumentType.JSON: parsers.JSONParser,
+ DocumentType.MSG: parsers.MSGParser,
+ DocumentType.ORG: parsers.ORGParser,
+ DocumentType.MD: parsers.MDParser,
+ DocumentType.PDF: parsers.BasicPDFParser,
+ DocumentType.PPT: parsers.PPTParser,
+ DocumentType.PPTX: parsers.PPTXParser,
+ DocumentType.TXT: parsers.TextParser,
+ DocumentType.XLSX: parsers.XLSXParser,
+ DocumentType.GIF: parsers.ImageParser,
+ DocumentType.JPEG: parsers.ImageParser,
+ DocumentType.JPG: parsers.ImageParser,
+ DocumentType.TSV: parsers.TSVParser,
+ DocumentType.PNG: parsers.ImageParser,
+ DocumentType.HEIC: parsers.ImageParser,
+ DocumentType.SVG: parsers.ImageParser,
+ DocumentType.MP3: parsers.AudioParser,
+ DocumentType.P7S: parsers.P7SParser,
+ DocumentType.RST: parsers.RSTParser,
+ DocumentType.RTF: parsers.RTFParser,
+ DocumentType.TIFF: parsers.ImageParser,
+ DocumentType.XLS: parsers.XLSParser,
+ }
+
+ EXTRA_PARSERS = {
+ DocumentType.CSV: {"advanced": parsers.CSVParserAdvanced},
+ DocumentType.PDF: {
+ "unstructured": parsers.PDFParserUnstructured,
+ "zerox": parsers.VLMPDFParser,
+ },
+ DocumentType.XLSX: {"advanced": parsers.XLSXParserAdvanced},
+ }
+
+ IMAGE_TYPES = {
+ DocumentType.GIF,
+ DocumentType.HEIC,
+ DocumentType.JPG,
+ DocumentType.JPEG,
+ DocumentType.PNG,
+ DocumentType.SVG,
+ }
+
+ def __init__(
+ self,
+ config: R2RIngestionConfig,
+ database_provider: PostgresDatabaseProvider,
+ llm_provider: (
+ LiteLLMCompletionProvider
+ | OpenAICompletionProvider
+ | R2RCompletionProvider
+ ),
+ ):
+ super().__init__(config, database_provider, llm_provider)
+ self.config: R2RIngestionConfig = config
+ self.database_provider: PostgresDatabaseProvider = database_provider
+ self.llm_provider: (
+ LiteLLMCompletionProvider
+ | OpenAICompletionProvider
+ | R2RCompletionProvider
+ ) = llm_provider
+ self.parsers: dict[DocumentType, AsyncParser] = {}
+ self.text_splitter = self._build_text_splitter()
+ self._initialize_parsers()
+
+ logger.info(
+ f"R2RIngestionProvider initialized with config: {self.config}"
+ )
+
+ def _initialize_parsers(self):
+ for doc_type, parser in self.DEFAULT_PARSERS.items():
+ # will choose the first parser in the list
+ if doc_type not in self.config.excluded_parsers:
+ self.parsers[doc_type] = parser(
+ config=self.config,
+ database_provider=self.database_provider,
+ llm_provider=self.llm_provider,
+ )
+ for doc_type, doc_parser_name in self.config.extra_parsers.items():
+ self.parsers[f"{doc_parser_name}_{str(doc_type)}"] = (
+ R2RIngestionProvider.EXTRA_PARSERS[doc_type][doc_parser_name](
+ config=self.config,
+ database_provider=self.database_provider,
+ llm_provider=self.llm_provider,
+ )
+ )
+
+ def _build_text_splitter(
+ self, ingestion_config_override: Optional[dict] = None
+ ) -> TextSplitter:
+ logger.info(
+ f"Initializing text splitter with method: {self.config.chunking_strategy}"
+ )
+
+ if not ingestion_config_override:
+ ingestion_config_override = {}
+
+ chunking_strategy = (
+ ingestion_config_override.get("chunking_strategy")
+ or self.config.chunking_strategy
+ )
+
+ chunk_size = (
+ ingestion_config_override.get("chunk_size")
+ or self.config.chunk_size
+ )
+ chunk_overlap = (
+ ingestion_config_override.get("chunk_overlap")
+ or self.config.chunk_overlap
+ )
+
+ if chunking_strategy == ChunkingStrategy.RECURSIVE:
+ return RecursiveCharacterTextSplitter(
+ chunk_size=chunk_size,
+ chunk_overlap=chunk_overlap,
+ )
+ elif chunking_strategy == ChunkingStrategy.CHARACTER:
+ from core.base.utils.splitter.text import CharacterTextSplitter
+
+ separator = (
+ ingestion_config_override.get("separator")
+ or self.config.separator
+ or CharacterTextSplitter.DEFAULT_SEPARATOR
+ )
+
+ return CharacterTextSplitter(
+ chunk_size=chunk_size,
+ chunk_overlap=chunk_overlap,
+ separator=separator,
+ keep_separator=False,
+ strip_whitespace=True,
+ )
+ elif chunking_strategy == ChunkingStrategy.BASIC:
+ raise NotImplementedError(
+ "Basic chunking method not implemented. Please use Recursive."
+ )
+ elif chunking_strategy == ChunkingStrategy.BY_TITLE:
+ raise NotImplementedError("By title method not implemented")
+ else:
+ raise ValueError(f"Unsupported method type: {chunking_strategy}")
+
+ def validate_config(self) -> bool:
+ return self.config.chunk_size > 0 and self.config.chunk_overlap >= 0
+
+ def chunk(
+ self,
+ parsed_document: str | DocumentChunk,
+ ingestion_config_override: dict,
+ ) -> AsyncGenerator[Any, None]:
+ text_spliiter = self.text_splitter
+ if ingestion_config_override:
+ text_spliiter = self._build_text_splitter(
+ ingestion_config_override
+ )
+ if isinstance(parsed_document, DocumentChunk):
+ parsed_document = parsed_document.data
+
+ if isinstance(parsed_document, str):
+ chunks = text_spliiter.create_documents([parsed_document])
+ else:
+ # Assuming parsed_document is already a list of text chunks
+ chunks = parsed_document
+
+ for chunk in chunks:
+ yield (
+ chunk.page_content if hasattr(chunk, "page_content") else chunk
+ )
+
+ async def parse(
+ self,
+ file_content: bytes,
+ document: Document,
+ ingestion_config_override: dict,
+ ) -> AsyncGenerator[DocumentChunk, None]:
+ if document.document_type not in self.parsers:
+ raise R2RDocumentProcessingError(
+ document_id=document.id,
+ error_message=f"Parser for {document.document_type} not found in `R2RIngestionProvider`.",
+ )
+ else:
+ t0 = time.time()
+ contents = []
+ parser_overrides = ingestion_config_override.get(
+ "parser_overrides", {}
+ )
+ if document.document_type.value in parser_overrides:
+ logger.info(
+ f"Using parser_override for {document.document_type} with input value {parser_overrides[document.document_type.value]}"
+ )
+ # TODO - Cleanup this approach to be less hardcoded
+ if (
+ document.document_type != DocumentType.PDF
+ or parser_overrides[DocumentType.PDF.value] != "zerox"
+ ):
+ raise ValueError(
+ "Only Zerox PDF parser override is available."
+ )
+
+ # Collect content from VLMPDFParser
+ async for chunk in self.parsers[
+ f"zerox_{DocumentType.PDF.value}"
+ ].ingest(file_content, **ingestion_config_override):
+ if isinstance(chunk, dict) and chunk.get("content"):
+ contents.append(chunk)
+ elif (
+ chunk
+ ): # Handle string output for backward compatibility
+ contents.append({"content": chunk})
+
+ if (
+ contents
+ and document.document_type == DocumentType.PDF
+ and parser_overrides.get(DocumentType.PDF.value) == "zerox"
+ ):
+ text_splitter = self._build_text_splitter(
+ ingestion_config_override
+ )
+
+ iteration = 0
+
+ sorted_contents = [
+ item
+ for item in sorted(
+ contents, key=lambda x: x.get("page_number", 0)
+ )
+ if isinstance(item.get("content"), str)
+ ]
+
+ for content_item in sorted_contents:
+ page_num = content_item.get("page_number", 0)
+ page_content = content_item["content"]
+
+ page_chunks = text_splitter.create_documents(
+ [page_content]
+ )
+
+ # Create document chunks for each split piece
+ for chunk in page_chunks:
+ metadata = {
+ **document.metadata,
+ "chunk_order": iteration,
+ "page_number": page_num,
+ }
+
+ extraction = DocumentChunk(
+ id=generate_extraction_id(
+ document.id, iteration
+ ),
+ document_id=document.id,
+ owner_id=document.owner_id,
+ collection_ids=document.collection_ids,
+ data=chunk.page_content,
+ metadata=metadata,
+ )
+ iteration += 1
+ yield extraction
+
+ logger.debug(
+ f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, "
+ f"user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} "
+ f"into {iteration} extractions in t={time.time() - t0:.2f} seconds using page-by-page splitting."
+ )
+ return
+
+ else:
+ # Standard parsing for non-override cases
+ async for text in self.parsers[document.document_type].ingest(
+ file_content, **ingestion_config_override
+ ):
+ if text is not None:
+ contents.append({"content": text})
+
+ if not contents:
+ logging.warning(
+ "No valid text content was extracted during parsing"
+ )
+ return
+
+ iteration = 0
+ for content_item in contents:
+ chunk_text = content_item["content"]
+ chunks = self.chunk(chunk_text, ingestion_config_override)
+
+ for chunk in chunks:
+ metadata = {**document.metadata, "chunk_order": iteration}
+ if "page_number" in content_item:
+ metadata["page_number"] = content_item["page_number"]
+
+ extraction = DocumentChunk(
+ id=generate_extraction_id(document.id, iteration),
+ document_id=document.id,
+ owner_id=document.owner_id,
+ collection_ids=document.collection_ids,
+ data=chunk,
+ metadata=metadata,
+ )
+ iteration += 1
+ yield extraction
+
+ logger.debug(
+ f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, "
+ f"user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} "
+ f"into {iteration} extractions in t={time.time() - t0:.2f} seconds."
+ )
+
+ def get_parser_for_document_type(self, doc_type: DocumentType) -> Any:
+ return self.parsers.get(doc_type)
diff --git a/.venv/lib/python3.12/site-packages/core/providers/ingestion/unstructured/base.py b/.venv/lib/python3.12/site-packages/core/providers/ingestion/unstructured/base.py
new file mode 100644
index 00000000..29c09bf5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/providers/ingestion/unstructured/base.py
@@ -0,0 +1,396 @@
+# TODO - cleanup type issues in this file that relate to `bytes`
+import asyncio
+import base64
+import logging
+import os
+import time
+from copy import copy
+from io import BytesIO
+from typing import Any, AsyncGenerator
+
+import httpx
+from unstructured_client import UnstructuredClient
+from unstructured_client.models import operations, shared
+
+from core import parsers
+from core.base import (
+ AsyncParser,
+ ChunkingStrategy,
+ Document,
+ DocumentChunk,
+ DocumentType,
+ RecursiveCharacterTextSplitter,
+)
+from core.base.abstractions import R2RSerializable
+from core.base.providers.ingestion import IngestionConfig, IngestionProvider
+from core.utils import generate_extraction_id
+
+from ...database import PostgresDatabaseProvider
+from ...llm import (
+ LiteLLMCompletionProvider,
+ OpenAICompletionProvider,
+ R2RCompletionProvider,
+)
+
+logger = logging.getLogger()
+
+
+class FallbackElement(R2RSerializable):
+ text: str
+ metadata: dict[str, Any]
+
+
+class UnstructuredIngestionConfig(IngestionConfig):
+ combine_under_n_chars: int = 128
+ max_characters: int = 500
+ new_after_n_chars: int = 1500
+ overlap: int = 64
+
+ coordinates: bool | None = None
+ encoding: str | None = None # utf-8
+ extract_image_block_types: list[str] | None = None
+ gz_uncompressed_content_type: str | None = None
+ hi_res_model_name: str | None = None
+ include_orig_elements: bool | None = None
+ include_page_breaks: bool | None = None
+
+ languages: list[str] | None = None
+ multipage_sections: bool | None = None
+ ocr_languages: list[str] | None = None
+ # output_format: Optional[str] = "application/json"
+ overlap_all: bool | None = None
+ pdf_infer_table_structure: bool | None = None
+
+ similarity_threshold: float | None = None
+ skip_infer_table_types: list[str] | None = None
+ split_pdf_concurrency_level: int | None = None
+ split_pdf_page: bool | None = None
+ starting_page_number: int | None = None
+ strategy: str | None = None
+ chunking_strategy: str | ChunkingStrategy | None = None # type: ignore
+ unique_element_ids: bool | None = None
+ xml_keep_tags: bool | None = None
+
+ def to_ingestion_request(self):
+ import json
+
+ x = json.loads(self.json())
+ x.pop("extra_fields", None)
+ x.pop("provider", None)
+ x.pop("excluded_parsers", None)
+
+ x = {k: v for k, v in x.items() if v is not None}
+ return x
+
+
+class UnstructuredIngestionProvider(IngestionProvider):
+ R2R_FALLBACK_PARSERS = {
+ DocumentType.GIF: [parsers.ImageParser], # type: ignore
+ DocumentType.JPEG: [parsers.ImageParser], # type: ignore
+ DocumentType.JPG: [parsers.ImageParser], # type: ignore
+ DocumentType.PNG: [parsers.ImageParser], # type: ignore
+ DocumentType.SVG: [parsers.ImageParser], # type: ignore
+ DocumentType.HEIC: [parsers.ImageParser], # type: ignore
+ DocumentType.MP3: [parsers.AudioParser], # type: ignore
+ DocumentType.JSON: [parsers.JSONParser], # type: ignore
+ DocumentType.HTML: [parsers.HTMLParser], # type: ignore
+ DocumentType.XLS: [parsers.XLSParser], # type: ignore
+ DocumentType.XLSX: [parsers.XLSXParser], # type: ignore
+ DocumentType.DOC: [parsers.DOCParser], # type: ignore
+ DocumentType.PPT: [parsers.PPTParser], # type: ignore
+ }
+
+ EXTRA_PARSERS = {
+ DocumentType.CSV: {"advanced": parsers.CSVParserAdvanced}, # type: ignore
+ DocumentType.PDF: {
+ "unstructured": parsers.PDFParserUnstructured, # type: ignore
+ "zerox": parsers.VLMPDFParser, # type: ignore
+ },
+ DocumentType.XLSX: {"advanced": parsers.XLSXParserAdvanced}, # type: ignore
+ }
+
+ IMAGE_TYPES = {
+ DocumentType.GIF,
+ DocumentType.HEIC,
+ DocumentType.JPG,
+ DocumentType.JPEG,
+ DocumentType.PNG,
+ DocumentType.SVG,
+ }
+
+ def __init__(
+ self,
+ config: UnstructuredIngestionConfig,
+ database_provider: PostgresDatabaseProvider,
+ llm_provider: (
+ LiteLLMCompletionProvider
+ | OpenAICompletionProvider
+ | R2RCompletionProvider
+ ),
+ ):
+ super().__init__(config, database_provider, llm_provider)
+ self.config: UnstructuredIngestionConfig = config
+ self.database_provider: PostgresDatabaseProvider = database_provider
+ self.llm_provider: (
+ LiteLLMCompletionProvider
+ | OpenAICompletionProvider
+ | R2RCompletionProvider
+ ) = llm_provider
+
+ if config.provider == "unstructured_api":
+ try:
+ self.unstructured_api_auth = os.environ["UNSTRUCTURED_API_KEY"]
+ except KeyError as e:
+ raise ValueError(
+ "UNSTRUCTURED_API_KEY environment variable is not set"
+ ) from e
+
+ self.unstructured_api_url = os.environ.get(
+ "UNSTRUCTURED_API_URL",
+ "https://api.unstructuredapp.io/general/v0/general",
+ )
+
+ self.client = UnstructuredClient(
+ api_key_auth=self.unstructured_api_auth,
+ server_url=self.unstructured_api_url,
+ )
+ self.shared = shared
+ self.operations = operations
+
+ else:
+ try:
+ self.local_unstructured_url = os.environ[
+ "UNSTRUCTURED_SERVICE_URL"
+ ]
+ except KeyError as e:
+ raise ValueError(
+ "UNSTRUCTURED_SERVICE_URL environment variable is not set"
+ ) from e
+
+ self.client = httpx.AsyncClient()
+
+ self.parsers: dict[DocumentType, AsyncParser] = {}
+ self._initialize_parsers()
+
+ def _initialize_parsers(self):
+ for doc_type, parsers in self.R2R_FALLBACK_PARSERS.items():
+ for parser in parsers:
+ if (
+ doc_type not in self.config.excluded_parsers
+ and doc_type not in self.parsers
+ ):
+ # will choose the first parser in the list
+ self.parsers[doc_type] = parser(
+ config=self.config,
+ database_provider=self.database_provider,
+ llm_provider=self.llm_provider,
+ )
+ # TODO - Reduce code duplication between Unstructured & R2R
+ for doc_type, doc_parser_name in self.config.extra_parsers.items():
+ self.parsers[f"{doc_parser_name}_{str(doc_type)}"] = (
+ UnstructuredIngestionProvider.EXTRA_PARSERS[doc_type][
+ doc_parser_name
+ ](
+ config=self.config,
+ database_provider=self.database_provider,
+ llm_provider=self.llm_provider,
+ )
+ )
+
+ async def parse_fallback(
+ self,
+ file_content: bytes,
+ ingestion_config: dict,
+ parser_name: str,
+ ) -> AsyncGenerator[FallbackElement, None]:
+ contents = []
+ async for chunk in self.parsers[parser_name].ingest( # type: ignore
+ file_content, **ingestion_config
+ ): # type: ignore
+ if isinstance(chunk, dict) and chunk.get("content"):
+ contents.append(chunk)
+ elif chunk: # Handle string output for backward compatibility
+ contents.append({"content": chunk})
+
+ if not contents:
+ logging.warning(
+ "No valid text content was extracted during parsing"
+ )
+ return
+
+ logging.info(f"Fallback ingestion with config = {ingestion_config}")
+
+ iteration = 0
+ for content_item in contents:
+ text = content_item["content"]
+
+ loop = asyncio.get_event_loop()
+ splitter = RecursiveCharacterTextSplitter(
+ chunk_size=ingestion_config["new_after_n_chars"],
+ chunk_overlap=ingestion_config["overlap"],
+ )
+ chunks = await loop.run_in_executor(
+ None, splitter.create_documents, [text]
+ )
+
+ for text_chunk in chunks:
+ metadata = {"chunk_id": iteration}
+ if "page_number" in content_item:
+ metadata["page_number"] = content_item["page_number"]
+
+ yield FallbackElement(
+ text=text_chunk.page_content,
+ metadata=metadata,
+ )
+ iteration += 1
+ await asyncio.sleep(0)
+
+ async def parse(
+ self,
+ file_content: bytes,
+ document: Document,
+ ingestion_config_override: dict,
+ ) -> AsyncGenerator[DocumentChunk, None]:
+ ingestion_config = copy(
+ {
+ **self.config.to_ingestion_request(),
+ **(ingestion_config_override or {}),
+ }
+ )
+ # cleanup extra fields
+ ingestion_config.pop("provider", None)
+ ingestion_config.pop("excluded_parsers", None)
+
+ t0 = time.time()
+ parser_overrides = ingestion_config_override.get(
+ "parser_overrides", {}
+ )
+ elements = []
+
+ # TODO - Cleanup this approach to be less hardcoded
+ # TODO - Remove code duplication between Unstructured & R2R
+ if document.document_type.value in parser_overrides:
+ logger.info(
+ f"Using parser_override for {document.document_type} with input value {parser_overrides[document.document_type.value]}"
+ )
+ async for element in self.parse_fallback(
+ file_content,
+ ingestion_config=ingestion_config,
+ parser_name=f"zerox_{DocumentType.PDF.value}",
+ ):
+ elements.append(element)
+
+ elif document.document_type in self.R2R_FALLBACK_PARSERS.keys():
+ logger.info(
+ f"Parsing {document.document_type}: {document.id} with fallback parser"
+ )
+ async for element in self.parse_fallback(
+ file_content,
+ ingestion_config=ingestion_config,
+ parser_name=document.document_type,
+ ):
+ elements.append(element)
+ else:
+ logger.info(
+ f"Parsing {document.document_type}: {document.id} with unstructured"
+ )
+ if isinstance(file_content, bytes):
+ file_content = BytesIO(file_content) # type: ignore
+
+ # TODO - Include check on excluded parsers here.
+ if self.config.provider == "unstructured_api":
+ logger.info(f"Using API to parse document {document.id}")
+ files = self.shared.Files(
+ content=file_content.read(), # type: ignore
+ file_name=document.metadata.get("title", "unknown_file"),
+ )
+
+ ingestion_config.pop("app", None)
+ ingestion_config.pop("extra_parsers", None)
+
+ req = self.operations.PartitionRequest(
+ self.shared.PartitionParameters(
+ files=files,
+ **ingestion_config,
+ )
+ )
+ elements = self.client.general.partition(req) # type: ignore
+ elements = list(elements.elements) # type: ignore
+
+ else:
+ logger.info(
+ f"Using local unstructured fastapi server to parse document {document.id}"
+ )
+ # Base64 encode the file content
+ encoded_content = base64.b64encode(file_content.read()).decode( # type: ignore
+ "utf-8"
+ )
+
+ logger.info(
+ f"Sending a request to {self.local_unstructured_url}/partition"
+ )
+
+ response = await self.client.post(
+ f"{self.local_unstructured_url}/partition",
+ json={
+ "file_content": encoded_content, # Use encoded string
+ "ingestion_config": ingestion_config,
+ "filename": document.metadata.get("title", None),
+ },
+ timeout=3600, # Adjust timeout as needed
+ )
+
+ if response.status_code != 200:
+ logger.error(f"Error partitioning file: {response.text}")
+ raise ValueError(
+ f"Error partitioning file: {response.text}"
+ )
+ elements = response.json().get("elements", [])
+
+ iteration = 0 # if there are no chunks
+ for iteration, element in enumerate(elements):
+ if isinstance(element, FallbackElement):
+ text = element.text
+ metadata = copy(document.metadata)
+ metadata.update(element.metadata)
+ else:
+ element_dict = (
+ element if isinstance(element, dict) else element.to_dict()
+ )
+ text = element_dict.get("text", "")
+ if text == "":
+ continue
+
+ metadata = copy(document.metadata)
+ for key, value in element_dict.items():
+ if key == "metadata":
+ for k, v in value.items():
+ if k not in metadata and k != "orig_elements":
+ metadata[f"unstructured_{k}"] = v
+
+ # indicate that the document was chunked using unstructured
+ # nullifies the need for chunking in the pipeline
+ metadata["partitioned_by_unstructured"] = True
+ metadata["chunk_order"] = iteration
+ # creating the text extraction
+ yield DocumentChunk(
+ id=generate_extraction_id(document.id, iteration),
+ document_id=document.id,
+ owner_id=document.owner_id,
+ collection_ids=document.collection_ids,
+ data=text,
+ metadata=metadata,
+ )
+
+ # TODO: explore why this is throwing inadvertedly
+ # if iteration == 0:
+ # raise ValueError(f"No chunks found for document {document.id}")
+
+ logger.debug(
+ f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, "
+ f"user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} "
+ f"into {iteration + 1} extractions in t={time.time() - t0:.2f} seconds."
+ )
+
+ def get_parser_for_document_type(self, doc_type: DocumentType) -> str:
+ return "unstructured_local"