aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/core/parsers/media
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/parsers/media')
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py26
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py74
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py78
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py108
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py38
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py281
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py60
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py363
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py88
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py40
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py45
11 files changed, 1201 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py b/.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py
new file mode 100644
index 00000000..c268b673
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py
@@ -0,0 +1,26 @@
+# type: ignore
+from .audio_parser import AudioParser
+from .bmp_parser import BMPParser
+from .doc_parser import DOCParser
+from .docx_parser import DOCXParser
+from .img_parser import ImageParser
+from .odt_parser import ODTParser
+from .pdf_parser import BasicPDFParser, PDFParserUnstructured, VLMPDFParser
+from .ppt_parser import PPTParser
+from .pptx_parser import PPTXParser
+from .rtf_parser import RTFParser
+
+__all__ = [
+ "AudioParser",
+ "BMPParser",
+ "DOCParser",
+ "DOCXParser",
+ "ImageParser",
+ "ODTParser",
+ "VLMPDFParser",
+ "BasicPDFParser",
+ "PDFParserUnstructured",
+ "PPTParser",
+ "PPTXParser",
+ "RTFParser",
+]
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py
new file mode 100644
index 00000000..7d5f9f1d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py
@@ -0,0 +1,74 @@
+# type: ignore
+import logging
+import os
+import tempfile
+from typing import AsyncGenerator
+
+from litellm import atranscription
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+ CompletionProvider,
+ DatabaseProvider,
+ IngestionConfig,
+)
+
+logger = logging.getLogger()
+
+
+class AudioParser(AsyncParser[bytes]):
+ """A parser for audio data using Whisper transcription."""
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.atranscription = atranscription
+
+ async def ingest( # type: ignore
+ self, data: bytes, **kwargs
+ ) -> AsyncGenerator[str, None]:
+ """Ingest audio data and yield a transcription using Whisper via
+ LiteLLM.
+
+ Args:
+ data: Raw audio bytes
+ *args, **kwargs: Additional arguments passed to the transcription call
+
+ Yields:
+ Chunks of transcribed text
+ """
+ try:
+ # Create a temporary file to store the audio data
+ with tempfile.NamedTemporaryFile(
+ suffix=".wav", delete=False
+ ) as temp_file:
+ temp_file.write(data)
+ temp_file_path = temp_file.name
+
+ # Call Whisper transcription
+ response = await self.atranscription(
+ model=self.config.audio_transcription_model
+ or self.config.app.audio_lm,
+ file=open(temp_file_path, "rb"),
+ **kwargs,
+ )
+
+ # The response should contain the transcribed text directly
+ yield response.text
+
+ except Exception as e:
+ logger.error(f"Error processing audio with Whisper: {str(e)}")
+ raise
+
+ finally:
+ # Clean up the temporary file
+ try:
+ os.unlink(temp_file_path)
+ except Exception as e:
+ logger.warning(f"Failed to delete temporary file: {str(e)}")
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py
new file mode 100644
index 00000000..78646da7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py
@@ -0,0 +1,78 @@
+# type: ignore
+from typing import AsyncGenerator
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+ CompletionProvider,
+ DatabaseProvider,
+ IngestionConfig,
+)
+
+
+class BMPParser(AsyncParser[str | bytes]):
+ """A parser for BMP image data."""
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+
+ import struct
+
+ self.struct = struct
+
+ async def extract_bmp_metadata(self, data: bytes) -> dict:
+ """Extract metadata from BMP file header."""
+ try:
+ # BMP header format
+ header_format = "<2sIHHI"
+ header_size = self.struct.calcsize(header_format)
+
+ # Unpack header data
+ (
+ signature,
+ file_size,
+ reserved,
+ reserved2,
+ data_offset,
+ ) = self.struct.unpack(header_format, data[:header_size])
+
+ # DIB header
+ dib_format = "<IiiHHIIiiII"
+ dib_size = self.struct.calcsize(dib_format)
+ dib_data = self.struct.unpack(dib_format, data[14 : 14 + dib_size])
+
+ width = dib_data[1]
+ height = abs(dib_data[2]) # Height can be negative
+ bits_per_pixel = dib_data[4]
+ compression = dib_data[5]
+
+ return {
+ "width": width,
+ "height": height,
+ "bits_per_pixel": bits_per_pixel,
+ "file_size": file_size,
+ "compression": compression,
+ }
+ except Exception as e:
+ return {"error": f"Failed to parse BMP header: {str(e)}"}
+
+ async def ingest(
+ self, data: str | bytes, **kwargs
+ ) -> AsyncGenerator[str, None]:
+ """Ingest BMP data and yield metadata description."""
+ if isinstance(data, str):
+ # Convert base64 string to bytes if needed
+ import base64
+
+ data = base64.b64decode(data)
+
+ metadata = await self.extract_bmp_metadata(data)
+
+ # Generate description of the BMP file
+ yield f"BMP image with dimensions {metadata.get('width', 'unknown')}x{metadata.get('height', 'unknown')} pixels, {metadata.get('bits_per_pixel', 'unknown')} bits per pixel, file size: {metadata.get('file_size', 'unknown')} bytes"
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py
new file mode 100644
index 00000000..5b49e2cc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py
@@ -0,0 +1,108 @@
+# type: ignore
+import re
+from io import BytesIO
+from typing import AsyncGenerator
+
+import olefile
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+ CompletionProvider,
+ DatabaseProvider,
+ IngestionConfig,
+)
+
+
+class DOCParser(AsyncParser[str | bytes]):
+ """A parser for DOC (legacy Microsoft Word) data."""
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.olefile = olefile
+
+ async def ingest(
+ self, data: str | bytes, **kwargs
+ ) -> AsyncGenerator[str, None]:
+ """Ingest DOC data and yield text from the document."""
+ if isinstance(data, str):
+ raise ValueError("DOC data must be in bytes format.")
+
+ # Create BytesIO object from the data
+ file_obj = BytesIO(data)
+
+ try:
+ # Open the DOC file using olefile
+ ole = self.olefile.OleFileIO(file_obj)
+
+ # Check if it's a Word document
+ if not ole.exists("WordDocument"):
+ raise ValueError("Not a valid Word document")
+
+ # Read the WordDocument stream
+ word_stream = ole.openstream("WordDocument").read()
+
+ # Read the text from the 0Table or 1Table stream (contains the text)
+ if ole.exists("1Table"):
+ table_stream = ole.openstream("1Table").read()
+ elif ole.exists("0Table"):
+ table_stream = ole.openstream("0Table").read()
+ else:
+ table_stream = b""
+
+ # Extract text content
+ text = self._extract_text(word_stream, table_stream)
+
+ # Clean and split the text
+ paragraphs = self._clean_text(text)
+
+ # Yield non-empty paragraphs
+ for paragraph in paragraphs:
+ if paragraph.strip():
+ yield paragraph.strip()
+
+ except Exception as e:
+ raise ValueError(f"Error processing DOC file: {str(e)}") from e
+ finally:
+ ole.close()
+ file_obj.close()
+
+ def _extract_text(self, word_stream: bytes, table_stream: bytes) -> str:
+ """Extract text from Word document streams."""
+ try:
+ text = word_stream.replace(b"\x00", b"").decode(
+ "utf-8", errors="ignore"
+ )
+
+ # If table_stream exists, try to extract additional text
+ if table_stream:
+ table_text = table_stream.replace(b"\x00", b"").decode(
+ "utf-8", errors="ignore"
+ )
+ text += table_text
+
+ return text
+ except Exception as e:
+ raise ValueError(f"Error extracting text: {str(e)}") from e
+
+ def _clean_text(self, text: str) -> list[str]:
+ """Clean and split the extracted text into paragraphs."""
+ # Remove binary artifacts and control characters
+ text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\xFF]", "", text)
+
+ # Remove multiple spaces and newlines
+ text = re.sub(r"\s+", " ", text)
+
+ # Split into paragraphs on double newlines or other common separators
+ paragraphs = re.split(r"\n\n|\r\n\r\n|\f", text)
+
+ # Remove empty or whitespace-only paragraphs
+ paragraphs = [p.strip() for p in paragraphs if p.strip()]
+
+ return paragraphs
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py
new file mode 100644
index 00000000..988f8341
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py
@@ -0,0 +1,38 @@
+# type: ignore
+from io import BytesIO
+from typing import AsyncGenerator
+
+from docx import Document
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+ CompletionProvider,
+ DatabaseProvider,
+ IngestionConfig,
+)
+
+
+class DOCXParser(AsyncParser[str | bytes]):
+ """A parser for DOCX data."""
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.Document = Document
+
+ async def ingest(
+ self, data: str | bytes, *args, **kwargs
+ ) -> AsyncGenerator[str, None]: # type: ignore
+ """Ingest DOCX data and yield text from each paragraph."""
+ if isinstance(data, str):
+ raise ValueError("DOCX data must be in bytes format.")
+
+ doc = self.Document(BytesIO(data))
+ for paragraph in doc.paragraphs:
+ yield paragraph.text
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py
new file mode 100644
index 00000000..bcb37eab
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py
@@ -0,0 +1,281 @@
+# type: ignore
+import base64
+import logging
+from io import BytesIO
+from typing import AsyncGenerator, Optional
+
+import filetype
+import pillow_heif
+from PIL import Image
+
+from core.base.abstractions import GenerationConfig
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+ CompletionProvider,
+ DatabaseProvider,
+ IngestionConfig,
+)
+
+logger = logging.getLogger()
+
+
+class ImageParser(AsyncParser[str | bytes]):
+ # Mapping of file extensions to MIME types
+ MIME_TYPE_MAPPING = {
+ "bmp": "image/bmp",
+ "gif": "image/gif",
+ "heic": "image/heic",
+ "jpeg": "image/jpeg",
+ "jpg": "image/jpeg",
+ "png": "image/png",
+ "tiff": "image/tiff",
+ "tif": "image/tiff",
+ "webp": "image/webp",
+ }
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.vision_prompt_text = None
+ self.Image = Image
+ self.pillow_heif = pillow_heif
+ self.pillow_heif.register_heif_opener()
+
+ def _is_heic(self, data: bytes) -> bool:
+ """Detect HEIC format using magic numbers and patterns."""
+ heic_patterns = [
+ b"ftyp",
+ b"heic",
+ b"heix",
+ b"hevc",
+ b"HEIC",
+ b"mif1",
+ b"msf1",
+ b"hevc",
+ b"hevx",
+ ]
+
+ try:
+ header = data[:32] # Get first 32 bytes
+ return any(pattern in header for pattern in heic_patterns)
+ except Exception as e:
+ logger.error(f"Error checking for HEIC format: {str(e)}")
+ return False
+
+ async def _convert_heic_to_jpeg(self, data: bytes) -> bytes:
+ """Convert HEIC image to JPEG format."""
+ try:
+ # Create BytesIO object for input
+ input_buffer = BytesIO(data)
+
+ # Load HEIC image using pillow_heif
+ heif_file = self.pillow_heif.read_heif(input_buffer)
+
+ # Get the primary image - API changed, need to get first image
+ heif_image = heif_file[0] # Get first image in the container
+
+ # Convert to PIL Image directly from the HEIF image
+ pil_image = heif_image.to_pillow()
+
+ # Convert to RGB if needed
+ if pil_image.mode != "RGB":
+ pil_image = pil_image.convert("RGB")
+
+ # Save as JPEG
+ output_buffer = BytesIO()
+ pil_image.save(output_buffer, format="JPEG", quality=95)
+ return output_buffer.getvalue()
+
+ except Exception as e:
+ logger.error(f"Error converting HEIC to JPEG: {str(e)}")
+ raise
+
+ def _is_jpeg(self, data: bytes) -> bool:
+ """Detect JPEG format using magic numbers."""
+ return len(data) >= 2 and data[0] == 0xFF and data[1] == 0xD8
+
+ def _is_png(self, data: bytes) -> bool:
+ """Detect PNG format using magic numbers."""
+ png_signature = b"\x89PNG\r\n\x1a\n"
+ return data.startswith(png_signature)
+
+ def _is_bmp(self, data: bytes) -> bool:
+ """Detect BMP format using magic numbers."""
+ return data.startswith(b"BM")
+
+ def _is_tiff(self, data: bytes) -> bool:
+ """Detect TIFF format using magic numbers."""
+ return (
+ data.startswith(b"II*\x00") # Little-endian
+ or data.startswith(b"MM\x00*")
+ ) # Big-endian
+
+ def _get_image_media_type(
+ self, data: bytes, filename: Optional[str] = None
+ ) -> str:
+ """
+ Determine the correct media type based on image data and/or filename.
+
+ Args:
+ data: The binary image data
+ filename: Optional filename which may contain extension information
+
+ Returns:
+ str: The MIME type for the image
+ """
+ try:
+ # First, try format-specific detection functions
+ if self._is_heic(data):
+ return "image/heic"
+ if self._is_jpeg(data):
+ return "image/jpeg"
+ if self._is_png(data):
+ return "image/png"
+ if self._is_bmp(data):
+ return "image/bmp"
+ if self._is_tiff(data):
+ return "image/tiff"
+
+ # Try using filetype as a fallback
+ img_type = filetype.guess(data)
+ if img_type:
+ # Map the detected type to a MIME type
+ return self.MIME_TYPE_MAPPING.get(
+ img_type, f"image/{img_type}"
+ )
+
+ # If we have a filename, try to get the type from the extension
+ if filename:
+ extension = filename.split(".")[-1].lower()
+ if extension in self.MIME_TYPE_MAPPING:
+ return self.MIME_TYPE_MAPPING[extension]
+
+ # If all else fails, default to octet-stream (generic binary)
+ logger.warning(
+ "Could not determine image type, using application/octet-stream"
+ )
+ return "application/octet-stream"
+
+ except Exception as e:
+ logger.error(f"Error determining image media type: {str(e)}")
+ return "application/octet-stream" # Default to generic binary as fallback
+
+ async def ingest(
+ self, data: str | bytes, **kwargs
+ ) -> AsyncGenerator[str, None]:
+ if not self.vision_prompt_text:
+ self.vision_prompt_text = (
+ await self.database_provider.prompts_handler.get_cached_prompt(
+ prompt_name=self.config.vision_img_prompt_name
+ )
+ )
+ try:
+ filename = kwargs.get("filename", None)
+ # Whether to convert HEIC to JPEG (default: True for backward compatibility)
+ convert_heic = kwargs.get("convert_heic", True)
+
+ if isinstance(data, bytes):
+ try:
+ # First detect the original media type
+ original_media_type = self._get_image_media_type(
+ data, filename
+ )
+ logger.debug(
+ f"Detected original image type: {original_media_type}"
+ )
+
+ # Determine if we need to convert HEIC
+ is_heic_format = self._is_heic(data)
+
+ # Handle HEIC images
+ if is_heic_format and convert_heic:
+ logger.debug(
+ "Detected HEIC format, converting to JPEG"
+ )
+ data = await self._convert_heic_to_jpeg(data)
+ media_type = "image/jpeg"
+ else:
+ # Keep original format and media type
+ media_type = original_media_type
+
+ # Encode the data to base64
+ image_data = base64.b64encode(data).decode("utf-8")
+
+ except Exception as e:
+ logger.error(f"Error processing image data: {str(e)}")
+ raise
+ else:
+ # If data is already a string (base64), we assume it has a reliable content type
+ # from the source that encoded it
+ image_data = data
+
+ # Try to determine the media type from the context if available
+ media_type = kwargs.get(
+ "media_type", "application/octet-stream"
+ )
+
+ # Get the model from kwargs or config
+ model = kwargs.get("vlm", None) or self.config.app.vlm
+
+ generation_config = GenerationConfig(
+ model=model,
+ stream=False,
+ )
+
+ logger.debug(f"Using model: {model}, media_type: {media_type}")
+
+ if "anthropic" in model:
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": self.vision_prompt_text},
+ {
+ "type": "image",
+ "source": {
+ "type": "base64",
+ "media_type": media_type,
+ "data": image_data,
+ },
+ },
+ ],
+ }
+ ]
+ else:
+ # For OpenAI-style APIs, use their format
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": self.vision_prompt_text},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:{media_type};base64,{image_data}"
+ },
+ },
+ ],
+ }
+ ]
+
+ response = await self.llm_provider.aget_completion(
+ messages=messages, generation_config=generation_config
+ )
+
+ if response.choices and response.choices[0].message:
+ content = response.choices[0].message.content
+ if not content:
+ raise ValueError("No content in response")
+ yield content
+ else:
+ raise ValueError("No response content")
+
+ except Exception as e:
+ logger.error(f"Error processing image with vision model: {str(e)}")
+ raise
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py
new file mode 100644
index 00000000..cb146464
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py
@@ -0,0 +1,60 @@
+# type: ignore
+import xml.etree.ElementTree as ET
+import zipfile
+from typing import AsyncGenerator
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+ CompletionProvider,
+ DatabaseProvider,
+ IngestionConfig,
+)
+
+
+class ODTParser(AsyncParser[str | bytes]):
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.zipfile = zipfile
+ self.ET = ET
+
+ async def ingest(
+ self, data: str | bytes, **kwargs
+ ) -> AsyncGenerator[str, None]:
+ if isinstance(data, str):
+ raise ValueError("ODT data must be in bytes format.")
+
+ from io import BytesIO
+
+ file_obj = BytesIO(data)
+
+ try:
+ with self.zipfile.ZipFile(file_obj) as odt:
+ # ODT files are zip archives containing content.xml
+ content = odt.read("content.xml")
+ root = self.ET.fromstring(content)
+
+ # ODT XML namespace
+ ns = {"text": "urn:oasis:names:tc:opendocument:xmlns:text:1.0"}
+
+ # Extract paragraphs and headers
+ for p in root.findall(".//text:p", ns):
+ text = "".join(p.itertext())
+ if text.strip():
+ yield text.strip()
+
+ for h in root.findall(".//text:h", ns):
+ text = "".join(h.itertext())
+ if text.strip():
+ yield text.strip()
+
+ except Exception as e:
+ raise ValueError(f"Error processing ODT file: {str(e)}") from e
+ finally:
+ file_obj.close()
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py
new file mode 100644
index 00000000..b33ccb63
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py
@@ -0,0 +1,363 @@
+# type: ignore
+import asyncio
+import base64
+import json
+import logging
+import string
+import time
+import unicodedata
+from io import BytesIO
+from typing import AsyncGenerator
+
+from pdf2image import convert_from_bytes, convert_from_path
+from pdf2image.exceptions import PDFInfoNotInstalledError
+from PIL import Image
+from pypdf import PdfReader
+
+from core.base.abstractions import GenerationConfig
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+ CompletionProvider,
+ DatabaseProvider,
+ IngestionConfig,
+)
+from shared.abstractions import PDFParsingError, PopplerNotFoundError
+
+logger = logging.getLogger()
+
+
+class VLMPDFParser(AsyncParser[str | bytes]):
+ """A parser for PDF documents using vision models for page processing."""
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.vision_prompt_text = None
+
+ async def convert_pdf_to_images(
+ self, data: str | bytes
+ ) -> list[Image.Image]:
+ """Convert PDF pages to images asynchronously using in-memory
+ conversion."""
+ logger.info("Starting PDF conversion to images.")
+ start_time = time.perf_counter()
+ options = {
+ "dpi": 300, # You can make this configurable via self.config if needed
+ "fmt": "jpeg",
+ "thread_count": 4,
+ "paths_only": False, # Return PIL Image objects instead of writing to disk
+ }
+ try:
+ if isinstance(data, bytes):
+ images = await asyncio.to_thread(
+ convert_from_bytes, data, **options
+ )
+ else:
+ images = await asyncio.to_thread(
+ convert_from_path, data, **options
+ )
+ elapsed = time.perf_counter() - start_time
+ logger.info(
+ f"PDF conversion completed in {elapsed:.2f} seconds, total pages: {len(images)}"
+ )
+ return images
+ except PDFInfoNotInstalledError as e:
+ logger.error(
+ "PDFInfoNotInstalledError encountered during PDF conversion."
+ )
+ raise PopplerNotFoundError() from e
+ except Exception as err:
+ logger.error(
+ f"Error converting PDF to images: {err} type: {type(err)}"
+ )
+ raise PDFParsingError(
+ f"Failed to process PDF: {str(err)}", err
+ ) from err
+
+ async def process_page(
+ self, image: Image.Image, page_num: int
+ ) -> dict[str, str]:
+ """Process a single PDF page using the vision model."""
+ page_start = time.perf_counter()
+ try:
+ # Convert PIL image to JPEG bytes in-memory
+ buf = BytesIO()
+ image.save(buf, format="JPEG")
+ buf.seek(0)
+ image_data = buf.read()
+ image_base64 = base64.b64encode(image_data).decode("utf-8")
+
+ model = self.config.app.vlm
+
+ # Configure generation parameters
+ generation_config = GenerationConfig(
+ model=self.config.app.vlm,
+ stream=False,
+ )
+
+ is_anthropic = model and "anthropic/" in model
+
+ # FIXME: This is a hacky fix to handle the different formats
+ # that was causing an outage. This logic really needs to be refactored
+ # and cleaned up such that it handles providers more robustly.
+
+ # Prepare message with image content
+ if is_anthropic:
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": self.vision_prompt_text},
+ {
+ "type": "image",
+ "source": {
+ "type": "base64",
+ "media_type": "image/jpeg",
+ "data": image_base64,
+ },
+ },
+ ],
+ }
+ ]
+ else:
+ # Use OpenAI format
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": self.vision_prompt_text},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:image/jpeg;base64,{image_base64}"
+ },
+ },
+ ],
+ }
+ ]
+
+ logger.debug(f"Sending page {page_num} to vision model.")
+ req_start = time.perf_counter()
+ if is_anthropic:
+ response = await self.llm_provider.aget_completion(
+ messages=messages,
+ generation_config=generation_config,
+ tools=[
+ {
+ "name": "parse_pdf_page",
+ "description": "Parse text content from a PDF page",
+ "input_schema": {
+ "type": "object",
+ "properties": {
+ "page_content": {
+ "type": "string",
+ "description": "Extracted text from the PDF page",
+ },
+ },
+ "required": ["page_content"],
+ },
+ }
+ ],
+ tool_choice={"type": "tool", "name": "parse_pdf_page"},
+ )
+
+ if (
+ response.choices
+ and response.choices[0].message
+ and response.choices[0].message.tool_calls
+ ):
+ tool_call = response.choices[0].message.tool_calls[0]
+ args = json.loads(tool_call.function.arguments)
+ content = args.get("page_content", "")
+ page_elapsed = time.perf_counter() - page_start
+ logger.debug(
+ f"Processed page {page_num} in {page_elapsed:.2f} seconds."
+ )
+
+ return {"page": str(page_num), "content": content}
+ else:
+ logger.warning(
+ f"No valid tool call in response for page {page_num}, document might be missing text."
+ )
+ else:
+ response = await self.llm_provider.aget_completion(
+ messages=messages, generation_config=generation_config
+ )
+ req_elapsed = time.perf_counter() - req_start
+ logger.debug(
+ f"Vision model response for page {page_num} received in {req_elapsed:.2f} seconds."
+ )
+
+ if response.choices and response.choices[0].message:
+ content = response.choices[0].message.content
+ page_elapsed = time.perf_counter() - page_start
+ logger.debug(
+ f"Processed page {page_num} in {page_elapsed:.2f} seconds."
+ )
+ return {"page": str(page_num), "content": content}
+ else:
+ msg = f"No response content for page {page_num}"
+ logger.error(msg)
+ raise ValueError(msg)
+ except Exception as e:
+ logger.error(
+ f"Error processing page {page_num} with vision model: {str(e)}"
+ )
+ raise
+
+ async def ingest(
+ self, data: str | bytes, maintain_order: bool = True, **kwargs
+ ) -> AsyncGenerator[dict[str, str | int], None]:
+ """Ingest PDF data and yield the text description for each page using
+ the vision model.
+
+ (This version yields a string per page rather than a dictionary.)
+ """
+ ingest_start = time.perf_counter()
+ logger.info("Starting PDF ingestion using VLMPDFParser.")
+ if not self.vision_prompt_text:
+ self.vision_prompt_text = (
+ await self.database_provider.prompts_handler.get_cached_prompt(
+ prompt_name=self.config.vision_pdf_prompt_name
+ )
+ )
+ logger.info("Retrieved vision prompt text from database.")
+
+ try:
+ # Convert PDF to images (in-memory)
+ images = await self.convert_pdf_to_images(data)
+
+ # Create asynchronous tasks for processing each page
+ tasks = {
+ asyncio.create_task(
+ self.process_page(image, page_num)
+ ): page_num
+ for page_num, image in enumerate(images, 1)
+ }
+
+ if maintain_order:
+ pending = set(tasks.keys())
+ results = {}
+ next_page = 1
+ while pending:
+ done, pending = await asyncio.wait(
+ pending, return_when=asyncio.FIRST_COMPLETED
+ )
+ for task in done:
+ result = await task
+ page_num = int(result["page"])
+ results[page_num] = result
+ while next_page in results:
+ yield {
+ "content": results[next_page]["content"] or "",
+ "page_number": next_page,
+ }
+ results.pop(next_page)
+ next_page += 1
+ else:
+ # Yield results as tasks complete
+ for coro in asyncio.as_completed(tasks.keys()):
+ result = await coro
+ yield {
+ "content": result["content"],
+ "page_number": int(result["page"]),
+ }
+ total_elapsed = time.perf_counter() - ingest_start
+ logger.info(
+ f"Completed PDF ingestion in {total_elapsed:.2f} seconds using VLMPDFParser."
+ )
+ except Exception as e:
+ logger.error(f"Error processing PDF: {str(e)}")
+ raise
+
+
+class BasicPDFParser(AsyncParser[str | bytes]):
+ """A parser for PDF data."""
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.PdfReader = PdfReader
+
+ async def ingest(
+ self, data: str | bytes, **kwargs
+ ) -> AsyncGenerator[str, None]:
+ """Ingest PDF data and yield text from each page."""
+ if isinstance(data, str):
+ raise ValueError("PDF data must be in bytes format.")
+ pdf = self.PdfReader(BytesIO(data))
+ for page in pdf.pages:
+ page_text = page.extract_text()
+ if page_text is not None:
+ page_text = "".join(
+ filter(
+ lambda x: (
+ unicodedata.category(x)
+ in [
+ "Ll",
+ "Lu",
+ "Lt",
+ "Lm",
+ "Lo",
+ "Nl",
+ "No",
+ ] # Keep letters and numbers
+ or "\u4e00" <= x <= "\u9fff" # Chinese characters
+ or "\u0600" <= x <= "\u06ff" # Arabic characters
+ or "\u0400" <= x <= "\u04ff" # Cyrillic letters
+ or "\u0370" <= x <= "\u03ff" # Greek letters
+ or "\u0e00" <= x <= "\u0e7f" # Thai
+ or "\u3040" <= x <= "\u309f" # Japanese Hiragana
+ or "\u30a0" <= x <= "\u30ff" # Katakana
+ or x in string.printable
+ ),
+ page_text,
+ )
+ ) # Keep characters in common languages ; # Filter out non-printable characters
+ yield page_text
+
+
+class PDFParserUnstructured(AsyncParser[str | bytes]):
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ try:
+ from unstructured.partition.pdf import partition_pdf
+
+ self.partition_pdf = partition_pdf
+
+ except ImportError as e:
+ logger.error("PDFParserUnstructured ImportError : ", e)
+
+ async def ingest(
+ self,
+ data: str | bytes,
+ partition_strategy: str = "hi_res",
+ chunking_strategy="by_title",
+ ) -> AsyncGenerator[str, None]:
+ # partition the pdf
+ elements = self.partition_pdf(
+ file=BytesIO(data),
+ partition_strategy=partition_strategy,
+ chunking_strategy=chunking_strategy,
+ )
+ for element in elements:
+ yield element.text
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py
new file mode 100644
index 00000000..c8bbaa55
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py
@@ -0,0 +1,88 @@
+# type: ignore
+import struct
+from io import BytesIO
+from typing import AsyncGenerator
+
+import olefile
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+ CompletionProvider,
+ DatabaseProvider,
+ IngestionConfig,
+)
+
+
+class PPTParser(AsyncParser[str | bytes]):
+ """A parser for legacy PPT (PowerPoint 97-2003) files."""
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.olefile = olefile
+
+ def _extract_text_from_record(self, data: bytes) -> str:
+ """Extract text from a PPT text record."""
+ try:
+ # Skip record header
+ text_data = data[8:]
+ # Convert from UTF-16-LE
+ return text_data.decode("utf-16-le", errors="ignore").strip()
+ except Exception:
+ return ""
+
+ async def ingest(
+ self, data: str | bytes, **kwargs
+ ) -> AsyncGenerator[str, None]:
+ """Ingest PPT data and yield text from each slide."""
+ if isinstance(data, str):
+ raise ValueError("PPT data must be in bytes format.")
+
+ try:
+ ole = self.olefile.OleFileIO(BytesIO(data))
+
+ # PPT stores text in PowerPoint Document stream
+ if not ole.exists("PowerPoint Document"):
+ raise ValueError("Not a valid PowerPoint file")
+
+ # Read PowerPoint Document stream
+ ppt_stream = ole.openstream("PowerPoint Document")
+ content = ppt_stream.read()
+
+ # Text records start with 0x0FA0 or 0x0FD0
+ text_markers = [b"\xa0\x0f", b"\xd0\x0f"]
+
+ current_position = 0
+ while current_position < len(content):
+ # Look for text markers
+ for marker in text_markers:
+ marker_pos = content.find(marker, current_position)
+ if marker_pos != -1:
+ # Get record size from header (4 bytes after marker)
+ size_bytes = content[marker_pos + 2 : marker_pos + 6]
+ record_size = struct.unpack("<I", size_bytes)[0]
+
+ # Extract record data
+ record_data = content[
+ marker_pos : marker_pos + record_size + 8
+ ]
+ text = self._extract_text_from_record(record_data)
+
+ if text.strip():
+ yield text.strip()
+
+ current_position = marker_pos + record_size + 8
+ break
+ else:
+ current_position += 1
+
+ except Exception as e:
+ raise ValueError(f"Error processing PPT file: {str(e)}") from e
+ finally:
+ ole.close()
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py
new file mode 100644
index 00000000..8685c8fb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py
@@ -0,0 +1,40 @@
+# type: ignore
+from io import BytesIO
+from typing import AsyncGenerator
+
+from pptx import Presentation
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+ CompletionProvider,
+ DatabaseProvider,
+ IngestionConfig,
+)
+
+
+class PPTXParser(AsyncParser[str | bytes]):
+ """A parser for PPT data."""
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.Presentation = Presentation
+
+ async def ingest(
+ self, data: str | bytes, **kwargs
+ ) -> AsyncGenerator[str, None]: # type: ignore
+ """Ingest PPT data and yield text from each slide."""
+ if isinstance(data, str):
+ raise ValueError("PPT data must be in bytes format.")
+
+ prs = self.Presentation(BytesIO(data))
+ for slide in prs.slides:
+ for shape in slide.shapes:
+ if hasattr(shape, "text"):
+ yield shape.text
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py
new file mode 100644
index 00000000..6be12076
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py
@@ -0,0 +1,45 @@
+# type: ignore
+from typing import AsyncGenerator
+
+from striprtf.striprtf import rtf_to_text
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+ CompletionProvider,
+ DatabaseProvider,
+ IngestionConfig,
+)
+
+
+class RTFParser(AsyncParser[str | bytes]):
+ """Parser for Rich Text Format (.rtf) files."""
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.striprtf = rtf_to_text
+
+ async def ingest(
+ self, data: str | bytes, **kwargs
+ ) -> AsyncGenerator[str, None]:
+ if isinstance(data, bytes):
+ data = data.decode("utf-8", errors="ignore")
+
+ try:
+ # Convert RTF to plain text
+ plain_text = self.striprtf(data)
+
+ # Split into paragraphs and yield non-empty ones
+ paragraphs = plain_text.split("\n\n")
+ for paragraph in paragraphs:
+ if paragraph.strip():
+ yield paragraph.strip()
+
+ except Exception as e:
+ raise ValueError(f"Error processing RTF file: {str(e)}") from e