diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /R2R/r2r/pipes/ingestion/parsing_pipe.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz |
Diffstat (limited to 'R2R/r2r/pipes/ingestion/parsing_pipe.py')
-rwxr-xr-x | R2R/r2r/pipes/ingestion/parsing_pipe.py | 211 |
1 files changed, 211 insertions, 0 deletions
diff --git a/R2R/r2r/pipes/ingestion/parsing_pipe.py b/R2R/r2r/pipes/ingestion/parsing_pipe.py new file mode 100755 index 00000000..f3c81ca0 --- /dev/null +++ b/R2R/r2r/pipes/ingestion/parsing_pipe.py @@ -0,0 +1,211 @@ +""" +This module contains the `DocumentParsingPipe` class, which is responsible for parsing incoming documents into plaintext. +""" + +import asyncio +import logging +import time +import uuid +from typing import AsyncGenerator, Optional, Union + +from r2r.base import ( + AsyncParser, + AsyncState, + Document, + DocumentType, + Extraction, + ExtractionType, + KVLoggingSingleton, + PipeType, + generate_id_from_label, +) +from r2r.base.abstractions.exception import R2RDocumentProcessingError +from r2r.base.pipes.base_pipe import AsyncPipe +from r2r.parsers.media.audio_parser import AudioParser +from r2r.parsers.media.docx_parser import DOCXParser +from r2r.parsers.media.img_parser import ImageParser +from r2r.parsers.media.movie_parser import MovieParser +from r2r.parsers.media.pdf_parser import PDFParser +from r2r.parsers.media.ppt_parser import PPTParser +from r2r.parsers.structured.csv_parser import CSVParser +from r2r.parsers.structured.json_parser import JSONParser +from r2r.parsers.structured.xlsx_parser import XLSXParser +from r2r.parsers.text.html_parser import HTMLParser +from r2r.parsers.text.md_parser import MDParser +from r2r.parsers.text.text_parser import TextParser + +logger = logging.getLogger(__name__) + + +class ParsingPipe(AsyncPipe): + """ + Processes incoming documents into plaintext based on their data type. + Supports TXT, JSON, HTML, and PDF formats. + """ + + class Input(AsyncPipe.Input): + message: AsyncGenerator[Document, None] + + AVAILABLE_PARSERS = { + DocumentType.CSV: CSVParser, + DocumentType.DOCX: DOCXParser, + DocumentType.HTML: HTMLParser, + DocumentType.JSON: JSONParser, + DocumentType.MD: MDParser, + DocumentType.PDF: PDFParser, + DocumentType.PPTX: PPTParser, + DocumentType.TXT: TextParser, + DocumentType.XLSX: XLSXParser, + DocumentType.GIF: ImageParser, + DocumentType.JPEG: ImageParser, + DocumentType.JPG: ImageParser, + DocumentType.PNG: ImageParser, + DocumentType.SVG: ImageParser, + DocumentType.MP3: AudioParser, + DocumentType.MP4: MovieParser, + } + + IMAGE_TYPES = { + DocumentType.GIF, + DocumentType.JPG, + DocumentType.JPEG, + DocumentType.PNG, + DocumentType.SVG, + } + + def __init__( + self, + excluded_parsers: list[DocumentType], + override_parsers: Optional[dict[DocumentType, AsyncParser]] = None, + pipe_logger: Optional[KVLoggingSingleton] = None, + type: PipeType = PipeType.INGESTOR, + config: Optional[AsyncPipe.PipeConfig] = None, + *args, + **kwargs, + ): + super().__init__( + pipe_logger=pipe_logger, + type=type, + config=config + or AsyncPipe.PipeConfig(name="default_document_parsing_pipe"), + *args, + **kwargs, + ) + + self.parsers = {} + + if not override_parsers: + override_parsers = {} + + # Apply overrides if specified + for doc_type, parser in override_parsers.items(): + self.parsers[doc_type] = parser + + for doc_type, parser_info in self.AVAILABLE_PARSERS.items(): + if ( + doc_type not in excluded_parsers + and doc_type not in self.parsers + ): + self.parsers[doc_type] = parser_info() + + @property + def supported_types(self) -> list[str]: + """ + Lists the data types supported by the pipe. + """ + return [entry_type for entry_type in DocumentType] + + async def _parse( + self, + document: Document, + run_id: uuid.UUID, + version: str, + ) -> AsyncGenerator[Union[R2RDocumentProcessingError, Extraction], None]: + if document.type not in self.parsers: + yield R2RDocumentProcessingError( + document_id=document.id, + error_message=f"Parser for {document.type} not found in `ParsingPipe`.", + ) + return + parser = self.parsers[document.type] + texts = parser.ingest(document.data) + extraction_type = ExtractionType.TXT + t0 = time.time() + if document.type in self.IMAGE_TYPES: + extraction_type = ExtractionType.IMG + document.metadata["image_type"] = document.type.value + # SAVE IMAGE DATA + # try: + # import base64 + # sanitized_data = base64.b64encode(document.data).decode('utf-8') + # except Exception as e: + # sanitized_data = document.data + + # document.metadata["image_data"] = sanitized_data + elif document.type == DocumentType.MP4: + extraction_type = ExtractionType.MOV + document.metadata["audio_type"] = document.type.value + + iteration = 0 + async for text in texts: + extraction_id = generate_id_from_label( + f"{document.id}-{iteration}-{version}" + ) + document.metadata["version"] = version + extraction = Extraction( + id=extraction_id, + data=text, + metadata=document.metadata, + document_id=document.id, + type=extraction_type, + ) + yield extraction + # TODO - Add settings to enable extraction logging + # extraction_dict = extraction.dict() + # await self.enqueue_log( + # run_id=run_id, + # key="extraction", + # value=json.dumps( + # { + # "data": extraction_dict["data"], + # "document_id": str(extraction_dict["document_id"]), + # "extraction_id": str(extraction_dict["id"]), + # } + # ), + # ) + iteration += 1 + logger.debug( + f"Parsed document with id={document.id}, title={document.metadata.get('title', None)}, user_id={document.metadata.get('user_id', None)}, metadata={document.metadata} into {iteration} extractions in t={time.time() - t0:.2f} seconds." + ) + + async def _run_logic( + self, + input: Input, + state: AsyncState, + run_id: uuid.UUID, + versions: Optional[list[str]] = None, + *args, + **kwargs, + ) -> AsyncGenerator[Extraction, None]: + parse_tasks = [] + + iteration = 0 + async for document in input.message: + version = versions[iteration] if versions else "v0" + iteration += 1 + parse_tasks.append( + self._handle_parse_task(document, version, run_id) + ) + + # Await all tasks and yield results concurrently + for parse_task in asyncio.as_completed(parse_tasks): + for extraction in await parse_task: + yield extraction + + async def _handle_parse_task( + self, document: Document, version: str, run_id: uuid.UUID + ) -> AsyncGenerator[Extraction, None]: + extractions = [] + async for extraction in self._parse(document, run_id, version): + extractions.append(extraction) + return extractions |