aboutsummaryrefslogtreecommitdiff
path: root/R2R/r2r/pipes/ingestion/parsing_pipe.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /R2R/r2r/pipes/ingestion/parsing_pipe.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to 'R2R/r2r/pipes/ingestion/parsing_pipe.py')
-rwxr-xr-xR2R/r2r/pipes/ingestion/parsing_pipe.py211
1 files changed, 211 insertions, 0 deletions
diff --git a/R2R/r2r/pipes/ingestion/parsing_pipe.py b/R2R/r2r/pipes/ingestion/parsing_pipe.py
new file mode 100755
index 00000000..f3c81ca0
--- /dev/null
+++ b/R2R/r2r/pipes/ingestion/parsing_pipe.py
@@ -0,0 +1,211 @@
+"""
+This module contains the `DocumentParsingPipe` class, which is responsible for parsing incoming documents into plaintext.
+"""
+
+import asyncio
+import logging
+import time
+import uuid
+from typing import AsyncGenerator, Optional, Union
+
+from r2r.base import (
+ AsyncParser,
+ AsyncState,
+ Document,
+ DocumentType,
+ Extraction,
+ ExtractionType,
+ KVLoggingSingleton,
+ PipeType,
+ generate_id_from_label,
+)
+from r2r.base.abstractions.exception import R2RDocumentProcessingError
+from r2r.base.pipes.base_pipe import AsyncPipe
+from r2r.parsers.media.audio_parser import AudioParser
+from r2r.parsers.media.docx_parser import DOCXParser
+from r2r.parsers.media.img_parser import ImageParser
+from r2r.parsers.media.movie_parser import MovieParser
+from r2r.parsers.media.pdf_parser import PDFParser
+from r2r.parsers.media.ppt_parser import PPTParser
+from r2r.parsers.structured.csv_parser import CSVParser
+from r2r.parsers.structured.json_parser import JSONParser
+from r2r.parsers.structured.xlsx_parser import XLSXParser
+from r2r.parsers.text.html_parser import HTMLParser
+from r2r.parsers.text.md_parser import MDParser
+from r2r.parsers.text.text_parser import TextParser
+
+logger = logging.getLogger(__name__)
+
+
+class ParsingPipe(AsyncPipe):
+ """
+ Processes incoming documents into plaintext based on their data type.
+ Supports TXT, JSON, HTML, and PDF formats.
+ """
+
+ class Input(AsyncPipe.Input):
+ message: AsyncGenerator[Document, None]
+
+ AVAILABLE_PARSERS = {
+ DocumentType.CSV: CSVParser,
+ DocumentType.DOCX: DOCXParser,
+ DocumentType.HTML: HTMLParser,
+ DocumentType.JSON: JSONParser,
+ DocumentType.MD: MDParser,
+ DocumentType.PDF: PDFParser,
+ DocumentType.PPTX: PPTParser,
+ DocumentType.TXT: TextParser,
+ DocumentType.XLSX: XLSXParser,
+ DocumentType.GIF: ImageParser,
+ DocumentType.JPEG: ImageParser,
+ DocumentType.JPG: ImageParser,
+ DocumentType.PNG: ImageParser,
+ DocumentType.SVG: ImageParser,
+ DocumentType.MP3: AudioParser,
+ DocumentType.MP4: MovieParser,
+ }
+
+ IMAGE_TYPES = {
+ DocumentType.GIF,
+ DocumentType.JPG,
+ DocumentType.JPEG,
+ DocumentType.PNG,
+ DocumentType.SVG,
+ }
+
+ def __init__(
+ self,
+ excluded_parsers: list[DocumentType],
+ override_parsers: Optional[dict[DocumentType, AsyncParser]] = None,
+ pipe_logger: Optional[KVLoggingSingleton] = None,
+ type: PipeType = PipeType.INGESTOR,
+ config: Optional[AsyncPipe.PipeConfig] = None,
+ *args,
+ **kwargs,
+ ):
+ super().__init__(
+ pipe_logger=pipe_logger,
+ type=type,
+ config=config
+ or AsyncPipe.PipeConfig(name="default_document_parsing_pipe"),
+ *args,
+ **kwargs,
+ )
+
+ self.parsers = {}
+
+ if not override_parsers:
+ override_parsers = {}
+
+ # Apply overrides if specified
+ for doc_type, parser in override_parsers.items():
+ self.parsers[doc_type] = parser
+
+ for doc_type, parser_info in self.AVAILABLE_PARSERS.items():
+ if (
+ doc_type not in excluded_parsers
+ and doc_type not in self.parsers
+ ):
+ self.parsers[doc_type] = parser_info()
+
+ @property
+ def supported_types(self) -> list[str]:
+ """
+ Lists the data types supported by the pipe.
+ """
+ return [entry_type for entry_type in DocumentType]
+
+ async def _parse(
+ self,
+ document: Document,
+ run_id: uuid.UUID,
+ version: str,
+ ) -> AsyncGenerator[Union[R2RDocumentProcessingError, Extraction], None]:
+ if document.type not in self.parsers:
+ yield R2RDocumentProcessingError(
+ document_id=document.id,
+ error_message=f"Parser for {document.type} not found in `ParsingPipe`.",
+ )
+ return
+ parser = self.parsers[document.type]
+ texts = parser.ingest(document.data)
+ extraction_type = ExtractionType.TXT
+ t0 = time.time()
+ if document.type in self.IMAGE_TYPES:
+ extraction_type = ExtractionType.IMG
+ document.metadata["image_type"] = document.type.value
+ # SAVE IMAGE DATA
+ # try:
+ # import base64
+ # sanitized_data = base64.b64encode(document.data).decode('utf-8')
+ # except Exception as e:
+ # sanitized_data = document.data
+
+ # document.metadata["image_data"] = sanitized_data
+ elif document.type == DocumentType.MP4:
+ extraction_type = ExtractionType.MOV
+ document.metadata["audio_type"] = document.type.value
+
+ iteration = 0
+ async for text in texts:
+ extraction_id = generate_id_from_label(
+ f"{document.id}-{iteration}-{version}"
+ )
+ document.metadata["version"] = version
+ extraction = Extraction(
+ id=extraction_id,
+ data=text,
+ metadata=document.metadata,
+ document_id=document.id,
+ type=extraction_type,
+ )
+ yield extraction
+ # TODO - Add settings to enable extraction logging
+ # extraction_dict = extraction.dict()
+ # await self.enqueue_log(
+ # run_id=run_id,
+ # key="extraction",
+ # value=json.dumps(
+ # {
+ # "data": extraction_dict["data"],
+ # "document_id": str(extraction_dict["document_id"]),
+ # "extraction_id": str(extraction_dict["id"]),
+ # }
+ # ),
+ # )
+ iteration += 1
+ logger.debug(
+ f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} into {iteration} extractions in t={time.time() - t0:.2f} seconds."
+ )
+
+ async def _run_logic(
+ self,
+ input: Input,
+ state: AsyncState,
+ run_id: uuid.UUID,
+ versions: Optional[list[str]] = None,
+ *args,
+ **kwargs,
+ ) -> AsyncGenerator[Extraction, None]:
+ parse_tasks = []
+
+ iteration = 0
+ async for document in input.message:
+ version = versions[iteration] if versions else "v0"
+ iteration += 1
+ parse_tasks.append(
+ self._handle_parse_task(document, version, run_id)
+ )
+
+ # Await all tasks and yield results concurrently
+ for parse_task in asyncio.as_completed(parse_tasks):
+ for extraction in await parse_task:
+ yield extraction
+
+ async def _handle_parse_task(
+ self, document: Document, version: str, run_id: uuid.UUID
+ ) -> AsyncGenerator[Extraction, None]:
+ extractions = []
+ async for extraction in self._parse(document, run_id, version):
+ extractions.append(extraction)
+ return extractions