aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py')
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py363
1 files changed, 363 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py
new file mode 100644
index 00000000..b33ccb63
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py
@@ -0,0 +1,363 @@
+# type: ignore
+import asyncio
+import base64
+import json
+import logging
+import string
+import time
+import unicodedata
+from io import BytesIO
+from typing import AsyncGenerator
+
+from pdf2image import convert_from_bytes, convert_from_path
+from pdf2image.exceptions import PDFInfoNotInstalledError
+from PIL import Image
+from pypdf import PdfReader
+
+from core.base.abstractions import GenerationConfig
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+ CompletionProvider,
+ DatabaseProvider,
+ IngestionConfig,
+)
+from shared.abstractions import PDFParsingError, PopplerNotFoundError
+
+logger = logging.getLogger()
+
+
+class VLMPDFParser(AsyncParser[str | bytes]):
+ """A parser for PDF documents using vision models for page processing."""
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.vision_prompt_text = None
+
+ async def convert_pdf_to_images(
+ self, data: str | bytes
+ ) -> list[Image.Image]:
+ """Convert PDF pages to images asynchronously using in-memory
+ conversion."""
+ logger.info("Starting PDF conversion to images.")
+ start_time = time.perf_counter()
+ options = {
+ "dpi": 300, # You can make this configurable via self.config if needed
+ "fmt": "jpeg",
+ "thread_count": 4,
+ "paths_only": False, # Return PIL Image objects instead of writing to disk
+ }
+ try:
+ if isinstance(data, bytes):
+ images = await asyncio.to_thread(
+ convert_from_bytes, data, **options
+ )
+ else:
+ images = await asyncio.to_thread(
+ convert_from_path, data, **options
+ )
+ elapsed = time.perf_counter() - start_time
+ logger.info(
+ f"PDF conversion completed in {elapsed:.2f} seconds, total pages: {len(images)}"
+ )
+ return images
+ except PDFInfoNotInstalledError as e:
+ logger.error(
+ "PDFInfoNotInstalledError encountered during PDF conversion."
+ )
+ raise PopplerNotFoundError() from e
+ except Exception as err:
+ logger.error(
+ f"Error converting PDF to images: {err} type: {type(err)}"
+ )
+ raise PDFParsingError(
+ f"Failed to process PDF: {str(err)}", err
+ ) from err
+
+ async def process_page(
+ self, image: Image.Image, page_num: int
+ ) -> dict[str, str]:
+ """Process a single PDF page using the vision model."""
+ page_start = time.perf_counter()
+ try:
+ # Convert PIL image to JPEG bytes in-memory
+ buf = BytesIO()
+ image.save(buf, format="JPEG")
+ buf.seek(0)
+ image_data = buf.read()
+ image_base64 = base64.b64encode(image_data).decode("utf-8")
+
+ model = self.config.app.vlm
+
+ # Configure generation parameters
+ generation_config = GenerationConfig(
+ model=self.config.app.vlm,
+ stream=False,
+ )
+
+ is_anthropic = model and "anthropic/" in model
+
+ # FIXME: This is a hacky fix to handle the different formats
+ # that was causing an outage. This logic really needs to be refactored
+ # and cleaned up such that it handles providers more robustly.
+
+ # Prepare message with image content
+ if is_anthropic:
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": self.vision_prompt_text},
+ {
+ "type": "image",
+ "source": {
+ "type": "base64",
+ "media_type": "image/jpeg",
+ "data": image_base64,
+ },
+ },
+ ],
+ }
+ ]
+ else:
+ # Use OpenAI format
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": self.vision_prompt_text},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:image/jpeg;base64,{image_base64}"
+ },
+ },
+ ],
+ }
+ ]
+
+ logger.debug(f"Sending page {page_num} to vision model.")
+ req_start = time.perf_counter()
+ if is_anthropic:
+ response = await self.llm_provider.aget_completion(
+ messages=messages,
+ generation_config=generation_config,
+ tools=[
+ {
+ "name": "parse_pdf_page",
+ "description": "Parse text content from a PDF page",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "page_content": {
+ "type": "string",
+ "description": "Extracted text from the PDF page",
+ },
+ },
+ "required": ["page_content"],
+ },
+ }
+ ],
+ tool_choice={"type": "tool", "name": "parse_pdf_page"},
+ )
+
+ if (
+ response.choices
+ and response.choices[0].message
+ and response.choices[0].message.tool_calls
+ ):
+ tool_call = response.choices[0].message.tool_calls[0]
+ args = json.loads(tool_call.function.arguments)
+ content = args.get("page_content", "")
+ page_elapsed = time.perf_counter() - page_start
+ logger.debug(
+ f"Processed page {page_num} in {page_elapsed:.2f} seconds."
+ )
+
+ return {"page": str(page_num), "content": content}
+ else:
+ logger.warning(
+ f"No valid tool call in response for page {page_num}, document might be missing text."
+ )
+ else:
+ response = await self.llm_provider.aget_completion(
+ messages=messages, generation_config=generation_config
+ )
+ req_elapsed = time.perf_counter() - req_start
+ logger.debug(
+ f"Vision model response for page {page_num} received in {req_elapsed:.2f} seconds."
+ )
+
+ if response.choices and response.choices[0].message:
+ content = response.choices[0].message.content
+ page_elapsed = time.perf_counter() - page_start
+ logger.debug(
+ f"Processed page {page_num} in {page_elapsed:.2f} seconds."
+ )
+ return {"page": str(page_num), "content": content}
+ else:
+ msg = f"No response content for page {page_num}"
+ logger.error(msg)
+ raise ValueError(msg)
+ except Exception as e:
+ logger.error(
+ f"Error processing page {page_num} with vision model: {str(e)}"
+ )
+ raise
+
+ async def ingest(
+ self, data: str | bytes, maintain_order: bool = True, **kwargs
+ ) -> AsyncGenerator[dict[str, str | int], None]:
+ """Ingest PDF data and yield the text description for each page using
+ the vision model.
+
+ (This version yields a string per page rather than a dictionary.)
+ """
+ ingest_start = time.perf_counter()
+ logger.info("Starting PDF ingestion using VLMPDFParser.")
+ if not self.vision_prompt_text:
+ self.vision_prompt_text = (
+ await self.database_provider.prompts_handler.get_cached_prompt(
+ prompt_name=self.config.vision_pdf_prompt_name
+ )
+ )
+ logger.info("Retrieved vision prompt text from database.")
+
+ try:
+ # Convert PDF to images (in-memory)
+ images = await self.convert_pdf_to_images(data)
+
+ # Create asynchronous tasks for processing each page
+ tasks = {
+ asyncio.create_task(
+ self.process_page(image, page_num)
+ ): page_num
+ for page_num, image in enumerate(images, 1)
+ }
+
+ if maintain_order:
+ pending = set(tasks.keys())
+ results = {}
+ next_page = 1
+ while pending:
+ done, pending = await asyncio.wait(
+ pending, return_when=asyncio.FIRST_COMPLETED
+ )
+ for task in done:
+ result = await task
+ page_num = int(result["page"])
+ results[page_num] = result
+ while next_page in results:
+ yield {
+ "content": results[next_page]["content"] or "",
+ "page_number": next_page,
+ }
+ results.pop(next_page)
+ next_page += 1
+ else:
+ # Yield results as tasks complete
+ for coro in asyncio.as_completed(tasks.keys()):
+ result = await coro
+ yield {
+ "content": result["content"],
+ "page_number": int(result["page"]),
+ }
+ total_elapsed = time.perf_counter() - ingest_start
+ logger.info(
+ f"Completed PDF ingestion in {total_elapsed:.2f} seconds using VLMPDFParser."
+ )
+ except Exception as e:
+ logger.error(f"Error processing PDF: {str(e)}")
+ raise
+
+
+class BasicPDFParser(AsyncParser[str | bytes]):
+ """A parser for PDF data."""
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.PdfReader = PdfReader
+
+ async def ingest(
+ self, data: str | bytes, **kwargs
+ ) -> AsyncGenerator[str, None]:
+ """Ingest PDF data and yield text from each page."""
+ if isinstance(data, str):
+ raise ValueError("PDF data must be in bytes format.")
+ pdf = self.PdfReader(BytesIO(data))
+ for page in pdf.pages:
+ page_text = page.extract_text()
+ if page_text is not None:
+ page_text = "".join(
+ filter(
+ lambda x: (
+ unicodedata.category(x)
+ in [
+ "Ll",
+ "Lu",
+ "Lt",
+ "Lm",
+ "Lo",
+ "Nl",
+ "No",
+ ] # Keep letters and numbers
+ or "\u4e00" <= x <= "\u9fff" # Chinese characters
+ or "\u0600" <= x <= "\u06ff" # Arabic characters
+ or "\u0400" <= x <= "\u04ff" # Cyrillic letters
+ or "\u0370" <= x <= "\u03ff" # Greek letters
+ or "\u0e00" <= x <= "\u0e7f" # Thai
+ or "\u3040" <= x <= "\u309f" # Japanese Hiragana
+ or "\u30a0" <= x <= "\u30ff" # Katakana
+ or x in string.printable
+ ),
+ page_text,
+ )
+ ) # Keep characters in common languages ; # Filter out non-printable characters
+ yield page_text
+
+
+class PDFParserUnstructured(AsyncParser[str | bytes]):
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ try:
+ from unstructured.partition.pdf import partition_pdf
+
+ self.partition_pdf = partition_pdf
+
+ except ImportError as e:
+ logger.error("PDFParserUnstructured ImportError : ", e)
+
+ async def ingest(
+ self,
+ data: str | bytes,
+ partition_strategy: str = "hi_res",
+ chunking_strategy="by_title",
+ ) -> AsyncGenerator[str, None]:
+ # partition the pdf
+ elements = self.partition_pdf(
+ file=BytesIO(data),
+ partition_strategy=partition_strategy,
+ chunking_strategy=chunking_strategy,
+ )
+ for element in elements:
+ yield element.text