about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py')
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py363
1 files changed, 363 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py
new file mode 100644
index 00000000..b33ccb63
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py
@@ -0,0 +1,363 @@
+# type: ignore
+import asyncio
+import base64
+import json
+import logging
+import string
+import time
+import unicodedata
+from io import BytesIO
+from typing import AsyncGenerator
+
+from pdf2image import convert_from_bytes, convert_from_path
+from pdf2image.exceptions import PDFInfoNotInstalledError
+from PIL import Image
+from pypdf import PdfReader
+
+from core.base.abstractions import GenerationConfig
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+from shared.abstractions import PDFParsingError, PopplerNotFoundError
+
+logger = logging.getLogger()
+
+
+class VLMPDFParser(AsyncParser[str | bytes]):
+    """A parser for PDF documents using vision models for page processing."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.vision_prompt_text = None
+
+    async def convert_pdf_to_images(
+        self, data: str | bytes
+    ) -> list[Image.Image]:
+        """Convert PDF pages to images asynchronously using in-memory
+        conversion."""
+        logger.info("Starting PDF conversion to images.")
+        start_time = time.perf_counter()
+        options = {
+            "dpi": 300,  # You can make this configurable via self.config if needed
+            "fmt": "jpeg",
+            "thread_count": 4,
+            "paths_only": False,  # Return PIL Image objects instead of writing to disk
+        }
+        try:
+            if isinstance(data, bytes):
+                images = await asyncio.to_thread(
+                    convert_from_bytes, data, **options
+                )
+            else:
+                images = await asyncio.to_thread(
+                    convert_from_path, data, **options
+                )
+            elapsed = time.perf_counter() - start_time
+            logger.info(
+                f"PDF conversion completed in {elapsed:.2f} seconds, total pages: {len(images)}"
+            )
+            return images
+        except PDFInfoNotInstalledError as e:
+            logger.error(
+                "PDFInfoNotInstalledError encountered during PDF conversion."
+            )
+            raise PopplerNotFoundError() from e
+        except Exception as err:
+            logger.error(
+                f"Error converting PDF to images: {err} type: {type(err)}"
+            )
+            raise PDFParsingError(
+                f"Failed to process PDF: {str(err)}", err
+            ) from err
+
+    async def process_page(
+        self, image: Image.Image, page_num: int
+    ) -> dict[str, str]:
+        """Process a single PDF page using the vision model."""
+        page_start = time.perf_counter()
+        try:
+            # Convert PIL image to JPEG bytes in-memory
+            buf = BytesIO()
+            image.save(buf, format="JPEG")
+            buf.seek(0)
+            image_data = buf.read()
+            image_base64 = base64.b64encode(image_data).decode("utf-8")
+
+            model = self.config.app.vlm
+
+            # Configure generation parameters
+            generation_config = GenerationConfig(
+                model=self.config.app.vlm,
+                stream=False,
+            )
+
+            is_anthropic = model and "anthropic/" in model
+
+            # FIXME: This is a hacky fix to handle the different formats
+            # that was causing an outage. This logic really needs to be refactored
+            # and cleaned up such that it handles providers more robustly.
+
+            # Prepare message with image content
+            if is_anthropic:
+                messages = [
+                    {
+                        "role": "user",
+                        "content": [
+                            {"type": "text", "text": self.vision_prompt_text},
+                            {
+                                "type": "image",
+                                "source": {
+                                    "type": "base64",
+                                    "media_type": "image/jpeg",
+                                    "data": image_base64,
+                                },
+                            },
+                        ],
+                    }
+                ]
+            else:
+                # Use OpenAI format
+                messages = [
+                    {
+                        "role": "user",
+                        "content": [
+                            {"type": "text", "text": self.vision_prompt_text},
+                            {
+                                "type": "image_url",
+                                "image_url": {
+                                    "url": f"data:image/jpeg;base64,{image_base64}"
+                                },
+                            },
+                        ],
+                    }
+                ]
+
+            logger.debug(f"Sending page {page_num} to vision model.")
+            req_start = time.perf_counter()
+            if is_anthropic:
+                response = await self.llm_provider.aget_completion(
+                    messages=messages,
+                    generation_config=generation_config,
+                    tools=[
+                        {
+                            "name": "parse_pdf_page",
+                            "description": "Parse text content from a PDF page",
+                            "input_schema": {
+                                "type": "object",
+                                "properties": {
+                                    "page_content": {
+                                        "type": "string",
+                                        "description": "Extracted text from the PDF page",
+                                    },
+                                },
+                                "required": ["page_content"],
+                            },
+                        }
+                    ],
+                    tool_choice={"type": "tool", "name": "parse_pdf_page"},
+                )
+
+                if (
+                    response.choices
+                    and response.choices[0].message
+                    and response.choices[0].message.tool_calls
+                ):
+                    tool_call = response.choices[0].message.tool_calls[0]
+                    args = json.loads(tool_call.function.arguments)
+                    content = args.get("page_content", "")
+                    page_elapsed = time.perf_counter() - page_start
+                    logger.debug(
+                        f"Processed page {page_num} in {page_elapsed:.2f} seconds."
+                    )
+
+                    return {"page": str(page_num), "content": content}
+                else:
+                    logger.warning(
+                        f"No valid tool call in response for page {page_num}, document might be missing text."
+                    )
+            else:
+                response = await self.llm_provider.aget_completion(
+                    messages=messages, generation_config=generation_config
+                )
+                req_elapsed = time.perf_counter() - req_start
+                logger.debug(
+                    f"Vision model response for page {page_num} received in {req_elapsed:.2f} seconds."
+                )
+
+                if response.choices and response.choices[0].message:
+                    content = response.choices[0].message.content
+                    page_elapsed = time.perf_counter() - page_start
+                    logger.debug(
+                        f"Processed page {page_num} in {page_elapsed:.2f} seconds."
+                    )
+                    return {"page": str(page_num), "content": content}
+                else:
+                    msg = f"No response content for page {page_num}"
+                    logger.error(msg)
+                    raise ValueError(msg)
+        except Exception as e:
+            logger.error(
+                f"Error processing page {page_num} with vision model: {str(e)}"
+            )
+            raise
+
+    async def ingest(
+        self, data: str | bytes, maintain_order: bool = True, **kwargs
+    ) -> AsyncGenerator[dict[str, str | int], None]:
+        """Ingest PDF data and yield the text description for each page using
+        the vision model.
+
+        (This version yields a string per page rather than a dictionary.)
+        """
+        ingest_start = time.perf_counter()
+        logger.info("Starting PDF ingestion using VLMPDFParser.")
+        if not self.vision_prompt_text:
+            self.vision_prompt_text = (
+                await self.database_provider.prompts_handler.get_cached_prompt(
+                    prompt_name=self.config.vision_pdf_prompt_name
+                )
+            )
+            logger.info("Retrieved vision prompt text from database.")
+
+        try:
+            # Convert PDF to images (in-memory)
+            images = await self.convert_pdf_to_images(data)
+
+            # Create asynchronous tasks for processing each page
+            tasks = {
+                asyncio.create_task(
+                    self.process_page(image, page_num)
+                ): page_num
+                for page_num, image in enumerate(images, 1)
+            }
+
+            if maintain_order:
+                pending = set(tasks.keys())
+                results = {}
+                next_page = 1
+                while pending:
+                    done, pending = await asyncio.wait(
+                        pending, return_when=asyncio.FIRST_COMPLETED
+                    )
+                    for task in done:
+                        result = await task
+                        page_num = int(result["page"])
+                        results[page_num] = result
+                        while next_page in results:
+                            yield {
+                                "content": results[next_page]["content"] or "",
+                                "page_number": next_page,
+                            }
+                            results.pop(next_page)
+                            next_page += 1
+            else:
+                # Yield results as tasks complete
+                for coro in asyncio.as_completed(tasks.keys()):
+                    result = await coro
+                    yield {
+                        "content": result["content"],
+                        "page_number": int(result["page"]),
+                    }
+            total_elapsed = time.perf_counter() - ingest_start
+            logger.info(
+                f"Completed PDF ingestion in {total_elapsed:.2f} seconds using VLMPDFParser."
+            )
+        except Exception as e:
+            logger.error(f"Error processing PDF: {str(e)}")
+            raise
+
+
+class BasicPDFParser(AsyncParser[str | bytes]):
+    """A parser for PDF data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.PdfReader = PdfReader
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest PDF data and yield text from each page."""
+        if isinstance(data, str):
+            raise ValueError("PDF data must be in bytes format.")
+        pdf = self.PdfReader(BytesIO(data))
+        for page in pdf.pages:
+            page_text = page.extract_text()
+            if page_text is not None:
+                page_text = "".join(
+                    filter(
+                        lambda x: (
+                            unicodedata.category(x)
+                            in [
+                                "Ll",
+                                "Lu",
+                                "Lt",
+                                "Lm",
+                                "Lo",
+                                "Nl",
+                                "No",
+                            ]  # Keep letters and numbers
+                            or "\u4e00" <= x <= "\u9fff"  # Chinese characters
+                            or "\u0600" <= x <= "\u06ff"  # Arabic characters
+                            or "\u0400" <= x <= "\u04ff"  # Cyrillic letters
+                            or "\u0370" <= x <= "\u03ff"  # Greek letters
+                            or "\u0e00" <= x <= "\u0e7f"  # Thai
+                            or "\u3040" <= x <= "\u309f"  # Japanese Hiragana
+                            or "\u30a0" <= x <= "\u30ff"  # Katakana
+                            or x in string.printable
+                        ),
+                        page_text,
+                    )
+                )  # Keep characters in common languages ; # Filter out non-printable characters
+                yield page_text
+
+
+class PDFParserUnstructured(AsyncParser[str | bytes]):
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        try:
+            from unstructured.partition.pdf import partition_pdf
+
+            self.partition_pdf = partition_pdf
+
+        except ImportError as e:
+            logger.error("PDFParserUnstructured ImportError :  ", e)
+
+    async def ingest(
+        self,
+        data: str | bytes,
+        partition_strategy: str = "hi_res",
+        chunking_strategy="by_title",
+    ) -> AsyncGenerator[str, None]:
+        # partition the pdf
+        elements = self.partition_pdf(
+            file=BytesIO(data),
+            partition_strategy=partition_strategy,
+            chunking_strategy=chunking_strategy,
+        )
+        for element in elements:
+            yield element.text