diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/parsers/media | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/parsers/media')
11 files changed, 1201 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py b/.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py new file mode 100644 index 00000000..c268b673 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py @@ -0,0 +1,26 @@ +# type: ignore +from .audio_parser import AudioParser +from .bmp_parser import BMPParser +from .doc_parser import DOCParser +from .docx_parser import DOCXParser +from .img_parser import ImageParser +from .odt_parser import ODTParser +from .pdf_parser import BasicPDFParser, PDFParserUnstructured, VLMPDFParser +from .ppt_parser import PPTParser +from .pptx_parser import PPTXParser +from .rtf_parser import RTFParser + +__all__ = [ + "AudioParser", + "BMPParser", + "DOCParser", + "DOCXParser", + "ImageParser", + "ODTParser", + "VLMPDFParser", + "BasicPDFParser", + "PDFParserUnstructured", + "PPTParser", + "PPTXParser", + "RTFParser", +] diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py new file mode 100644 index 00000000..7d5f9f1d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py @@ -0,0 +1,74 @@ +# type: ignore +import logging +import os +import tempfile +from typing import AsyncGenerator + +from litellm import atranscription + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + +logger = logging.getLogger() + + +class AudioParser(AsyncParser[bytes]): + """A parser for audio data using Whisper transcription.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.atranscription = atranscription + + async def ingest( # type: ignore + self, data: bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest audio data and yield a transcription using Whisper via + LiteLLM. + + Args: + data: Raw audio bytes + *args, **kwargs: Additional arguments passed to the transcription call + + Yields: + Chunks of transcribed text + """ + try: + # Create a temporary file to store the audio data + with tempfile.NamedTemporaryFile( + suffix=".wav", delete=False + ) as temp_file: + temp_file.write(data) + temp_file_path = temp_file.name + + # Call Whisper transcription + response = await self.atranscription( + model=self.config.audio_transcription_model + or self.config.app.audio_lm, + file=open(temp_file_path, "rb"), + **kwargs, + ) + + # The response should contain the transcribed text directly + yield response.text + + except Exception as e: + logger.error(f"Error processing audio with Whisper: {str(e)}") + raise + + finally: + # Clean up the temporary file + try: + os.unlink(temp_file_path) + except Exception as e: + logger.warning(f"Failed to delete temporary file: {str(e)}") diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py new file mode 100644 index 00000000..78646da7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py @@ -0,0 +1,78 @@ +# type: ignore +from typing import AsyncGenerator + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class BMPParser(AsyncParser[str | bytes]): + """A parser for BMP image data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + import struct + + self.struct = struct + + async def extract_bmp_metadata(self, data: bytes) -> dict: + """Extract metadata from BMP file header.""" + try: + # BMP header format + header_format = "<2sIHHI" + header_size = self.struct.calcsize(header_format) + + # Unpack header data + ( + signature, + file_size, + reserved, + reserved2, + data_offset, + ) = self.struct.unpack(header_format, data[:header_size]) + + # DIB header + dib_format = "<IiiHHIIiiII" + dib_size = self.struct.calcsize(dib_format) + dib_data = self.struct.unpack(dib_format, data[14 : 14 + dib_size]) + + width = dib_data[1] + height = abs(dib_data[2]) # Height can be negative + bits_per_pixel = dib_data[4] + compression = dib_data[5] + + return { + "width": width, + "height": height, + "bits_per_pixel": bits_per_pixel, + "file_size": file_size, + "compression": compression, + } + except Exception as e: + return {"error": f"Failed to parse BMP header: {str(e)}"} + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest BMP data and yield metadata description.""" + if isinstance(data, str): + # Convert base64 string to bytes if needed + import base64 + + data = base64.b64decode(data) + + metadata = await self.extract_bmp_metadata(data) + + # Generate description of the BMP file + yield f"BMP image with dimensions {metadata.get('width', 'unknown')}x{metadata.get('height', 'unknown')} pixels, {metadata.get('bits_per_pixel', 'unknown')} bits per pixel, file size: {metadata.get('file_size', 'unknown')} bytes" diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py new file mode 100644 index 00000000..5b49e2cc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py @@ -0,0 +1,108 @@ +# type: ignore +import re +from io import BytesIO +from typing import AsyncGenerator + +import olefile + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class DOCParser(AsyncParser[str | bytes]): + """A parser for DOC (legacy Microsoft Word) data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.olefile = olefile + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest DOC data and yield text from the document.""" + if isinstance(data, str): + raise ValueError("DOC data must be in bytes format.") + + # Create BytesIO object from the data + file_obj = BytesIO(data) + + try: + # Open the DOC file using olefile + ole = self.olefile.OleFileIO(file_obj) + + # Check if it's a Word document + if not ole.exists("WordDocument"): + raise ValueError("Not a valid Word document") + + # Read the WordDocument stream + word_stream = ole.openstream("WordDocument").read() + + # Read the text from the 0Table or 1Table stream (contains the text) + if ole.exists("1Table"): + table_stream = ole.openstream("1Table").read() + elif ole.exists("0Table"): + table_stream = ole.openstream("0Table").read() + else: + table_stream = b"" + + # Extract text content + text = self._extract_text(word_stream, table_stream) + + # Clean and split the text + paragraphs = self._clean_text(text) + + # Yield non-empty paragraphs + for paragraph in paragraphs: + if paragraph.strip(): + yield paragraph.strip() + + except Exception as e: + raise ValueError(f"Error processing DOC file: {str(e)}") from e + finally: + ole.close() + file_obj.close() + + def _extract_text(self, word_stream: bytes, table_stream: bytes) -> str: + """Extract text from Word document streams.""" + try: + text = word_stream.replace(b"\x00", b"").decode( + "utf-8", errors="ignore" + ) + + # If table_stream exists, try to extract additional text + if table_stream: + table_text = table_stream.replace(b"\x00", b"").decode( + "utf-8", errors="ignore" + ) + text += table_text + + return text + except Exception as e: + raise ValueError(f"Error extracting text: {str(e)}") from e + + def _clean_text(self, text: str) -> list[str]: + """Clean and split the extracted text into paragraphs.""" + # Remove binary artifacts and control characters + text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\xFF]", "", text) + + # Remove multiple spaces and newlines + text = re.sub(r"\s+", " ", text) + + # Split into paragraphs on double newlines or other common separators + paragraphs = re.split(r"\n\n|\r\n\r\n|\f", text) + + # Remove empty or whitespace-only paragraphs + paragraphs = [p.strip() for p in paragraphs if p.strip()] + + return paragraphs diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py new file mode 100644 index 00000000..988f8341 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py @@ -0,0 +1,38 @@ +# type: ignore +from io import BytesIO +from typing import AsyncGenerator + +from docx import Document + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class DOCXParser(AsyncParser[str | bytes]): + """A parser for DOCX data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.Document = Document + + async def ingest( + self, data: str | bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: # type: ignore + """Ingest DOCX data and yield text from each paragraph.""" + if isinstance(data, str): + raise ValueError("DOCX data must be in bytes format.") + + doc = self.Document(BytesIO(data)) + for paragraph in doc.paragraphs: + yield paragraph.text diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py new file mode 100644 index 00000000..bcb37eab --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py @@ -0,0 +1,281 @@ +# type: ignore +import base64 +import logging +from io import BytesIO +from typing import AsyncGenerator, Optional + +import filetype +import pillow_heif +from PIL import Image + +from core.base.abstractions import GenerationConfig +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + +logger = logging.getLogger() + + +class ImageParser(AsyncParser[str | bytes]): + # Mapping of file extensions to MIME types + MIME_TYPE_MAPPING = { + "bmp": "image/bmp", + "gif": "image/gif", + "heic": "image/heic", + "jpeg": "image/jpeg", + "jpg": "image/jpeg", + "png": "image/png", + "tiff": "image/tiff", + "tif": "image/tiff", + "webp": "image/webp", + } + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.vision_prompt_text = None + self.Image = Image + self.pillow_heif = pillow_heif + self.pillow_heif.register_heif_opener() + + def _is_heic(self, data: bytes) -> bool: + """Detect HEIC format using magic numbers and patterns.""" + heic_patterns = [ + b"ftyp", + b"heic", + b"heix", + b"hevc", + b"HEIC", + b"mif1", + b"msf1", + b"hevc", + b"hevx", + ] + + try: + header = data[:32] # Get first 32 bytes + return any(pattern in header for pattern in heic_patterns) + except Exception as e: + logger.error(f"Error checking for HEIC format: {str(e)}") + return False + + async def _convert_heic_to_jpeg(self, data: bytes) -> bytes: + """Convert HEIC image to JPEG format.""" + try: + # Create BytesIO object for input + input_buffer = BytesIO(data) + + # Load HEIC image using pillow_heif + heif_file = self.pillow_heif.read_heif(input_buffer) + + # Get the primary image - API changed, need to get first image + heif_image = heif_file[0] # Get first image in the container + + # Convert to PIL Image directly from the HEIF image + pil_image = heif_image.to_pillow() + + # Convert to RGB if needed + if pil_image.mode != "RGB": + pil_image = pil_image.convert("RGB") + + # Save as JPEG + output_buffer = BytesIO() + pil_image.save(output_buffer, format="JPEG", quality=95) + return output_buffer.getvalue() + + except Exception as e: + logger.error(f"Error converting HEIC to JPEG: {str(e)}") + raise + + def _is_jpeg(self, data: bytes) -> bool: + """Detect JPEG format using magic numbers.""" + return len(data) >= 2 and data[0] == 0xFF and data[1] == 0xD8 + + def _is_png(self, data: bytes) -> bool: + """Detect PNG format using magic numbers.""" + png_signature = b"\x89PNG\r\n\x1a\n" + return data.startswith(png_signature) + + def _is_bmp(self, data: bytes) -> bool: + """Detect BMP format using magic numbers.""" + return data.startswith(b"BM") + + def _is_tiff(self, data: bytes) -> bool: + """Detect TIFF format using magic numbers.""" + return ( + data.startswith(b"II*\x00") # Little-endian + or data.startswith(b"MM\x00*") + ) # Big-endian + + def _get_image_media_type( + self, data: bytes, filename: Optional[str] = None + ) -> str: + """ + Determine the correct media type based on image data and/or filename. + + Args: + data: The binary image data + filename: Optional filename which may contain extension information + + Returns: + str: The MIME type for the image + """ + try: + # First, try format-specific detection functions + if self._is_heic(data): + return "image/heic" + if self._is_jpeg(data): + return "image/jpeg" + if self._is_png(data): + return "image/png" + if self._is_bmp(data): + return "image/bmp" + if self._is_tiff(data): + return "image/tiff" + + # Try using filetype as a fallback + img_type = filetype.guess(data) + if img_type: + # Map the detected type to a MIME type + return self.MIME_TYPE_MAPPING.get( + img_type, f"image/{img_type}" + ) + + # If we have a filename, try to get the type from the extension + if filename: + extension = filename.split(".")[-1].lower() + if extension in self.MIME_TYPE_MAPPING: + return self.MIME_TYPE_MAPPING[extension] + + # If all else fails, default to octet-stream (generic binary) + logger.warning( + "Could not determine image type, using application/octet-stream" + ) + return "application/octet-stream" + + except Exception as e: + logger.error(f"Error determining image media type: {str(e)}") + return "application/octet-stream" # Default to generic binary as fallback + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + if not self.vision_prompt_text: + self.vision_prompt_text = ( + await self.database_provider.prompts_handler.get_cached_prompt( + prompt_name=self.config.vision_img_prompt_name + ) + ) + try: + filename = kwargs.get("filename", None) + # Whether to convert HEIC to JPEG (default: True for backward compatibility) + convert_heic = kwargs.get("convert_heic", True) + + if isinstance(data, bytes): + try: + # First detect the original media type + original_media_type = self._get_image_media_type( + data, filename + ) + logger.debug( + f"Detected original image type: {original_media_type}" + ) + + # Determine if we need to convert HEIC + is_heic_format = self._is_heic(data) + + # Handle HEIC images + if is_heic_format and convert_heic: + logger.debug( + "Detected HEIC format, converting to JPEG" + ) + data = await self._convert_heic_to_jpeg(data) + media_type = "image/jpeg" + else: + # Keep original format and media type + media_type = original_media_type + + # Encode the data to base64 + image_data = base64.b64encode(data).decode("utf-8") + + except Exception as e: + logger.error(f"Error processing image data: {str(e)}") + raise + else: + # If data is already a string (base64), we assume it has a reliable content type + # from the source that encoded it + image_data = data + + # Try to determine the media type from the context if available + media_type = kwargs.get( + "media_type", "application/octet-stream" + ) + + # Get the model from kwargs or config + model = kwargs.get("vlm", None) or self.config.app.vlm + + generation_config = GenerationConfig( + model=model, + stream=False, + ) + + logger.debug(f"Using model: {model}, media_type: {media_type}") + + if "anthropic" in model: + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": self.vision_prompt_text}, + { + "type": "image", + "source": { + "type": "base64", + "media_type": media_type, + "data": image_data, + }, + }, + ], + } + ] + else: + # For OpenAI-style APIs, use their format + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": self.vision_prompt_text}, + { + "type": "image_url", + "image_url": { + "url": f"data:{media_type};base64,{image_data}" + }, + }, + ], + } + ] + + response = await self.llm_provider.aget_completion( + messages=messages, generation_config=generation_config + ) + + if response.choices and response.choices[0].message: + content = response.choices[0].message.content + if not content: + raise ValueError("No content in response") + yield content + else: + raise ValueError("No response content") + + except Exception as e: + logger.error(f"Error processing image with vision model: {str(e)}") + raise diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py new file mode 100644 index 00000000..cb146464 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py @@ -0,0 +1,60 @@ +# type: ignore +import xml.etree.ElementTree as ET +import zipfile +from typing import AsyncGenerator + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class ODTParser(AsyncParser[str | bytes]): + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.zipfile = zipfile + self.ET = ET + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + if isinstance(data, str): + raise ValueError("ODT data must be in bytes format.") + + from io import BytesIO + + file_obj = BytesIO(data) + + try: + with self.zipfile.ZipFile(file_obj) as odt: + # ODT files are zip archives containing content.xml + content = odt.read("content.xml") + root = self.ET.fromstring(content) + + # ODT XML namespace + ns = {"text": "urn:oasis:names:tc:opendocument:xmlns:text:1.0"} + + # Extract paragraphs and headers + for p in root.findall(".//text:p", ns): + text = "".join(p.itertext()) + if text.strip(): + yield text.strip() + + for h in root.findall(".//text:h", ns): + text = "".join(h.itertext()) + if text.strip(): + yield text.strip() + + except Exception as e: + raise ValueError(f"Error processing ODT file: {str(e)}") from e + finally: + file_obj.close() diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py new file mode 100644 index 00000000..b33ccb63 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py @@ -0,0 +1,363 @@ +# type: ignore +import asyncio +import base64 +import json +import logging +import string +import time +import unicodedata +from io import BytesIO +from typing import AsyncGenerator + +from pdf2image import convert_from_bytes, convert_from_path +from pdf2image.exceptions import PDFInfoNotInstalledError +from PIL import Image +from pypdf import PdfReader + +from core.base.abstractions import GenerationConfig +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) +from shared.abstractions import PDFParsingError, PopplerNotFoundError + +logger = logging.getLogger() + + +class VLMPDFParser(AsyncParser[str | bytes]): + """A parser for PDF documents using vision models for page processing.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.vision_prompt_text = None + + async def convert_pdf_to_images( + self, data: str | bytes + ) -> list[Image.Image]: + """Convert PDF pages to images asynchronously using in-memory + conversion.""" + logger.info("Starting PDF conversion to images.") + start_time = time.perf_counter() + options = { + "dpi": 300, # You can make this configurable via self.config if needed + "fmt": "jpeg", + "thread_count": 4, + "paths_only": False, # Return PIL Image objects instead of writing to disk + } + try: + if isinstance(data, bytes): + images = await asyncio.to_thread( + convert_from_bytes, data, **options + ) + else: + images = await asyncio.to_thread( + convert_from_path, data, **options + ) + elapsed = time.perf_counter() - start_time + logger.info( + f"PDF conversion completed in {elapsed:.2f} seconds, total pages: {len(images)}" + ) + return images + except PDFInfoNotInstalledError as e: + logger.error( + "PDFInfoNotInstalledError encountered during PDF conversion." + ) + raise PopplerNotFoundError() from e + except Exception as err: + logger.error( + f"Error converting PDF to images: {err} type: {type(err)}" + ) + raise PDFParsingError( + f"Failed to process PDF: {str(err)}", err + ) from err + + async def process_page( + self, image: Image.Image, page_num: int + ) -> dict[str, str]: + """Process a single PDF page using the vision model.""" + page_start = time.perf_counter() + try: + # Convert PIL image to JPEG bytes in-memory + buf = BytesIO() + image.save(buf, format="JPEG") + buf.seek(0) + image_data = buf.read() + image_base64 = base64.b64encode(image_data).decode("utf-8") + + model = self.config.app.vlm + + # Configure generation parameters + generation_config = GenerationConfig( + model=self.config.app.vlm, + stream=False, + ) + + is_anthropic = model and "anthropic/" in model + + # FIXME: This is a hacky fix to handle the different formats + # that was causing an outage. This logic really needs to be refactored + # and cleaned up such that it handles providers more robustly. + + # Prepare message with image content + if is_anthropic: + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": self.vision_prompt_text}, + { + "type": "image", + "source": { + "type": "base64", + "media_type": "image/jpeg", + "data": image_base64, + }, + }, + ], + } + ] + else: + # Use OpenAI format + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": self.vision_prompt_text}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{image_base64}" + }, + }, + ], + } + ] + + logger.debug(f"Sending page {page_num} to vision model.") + req_start = time.perf_counter() + if is_anthropic: + response = await self.llm_provider.aget_completion( + messages=messages, + generation_config=generation_config, + tools=[ + { + "name": "parse_pdf_page", + "description": "Parse text content from a PDF page", + "input_schema": { + "type": "object", + "properties": { + "page_content": { + "type": "string", + "description": "Extracted text from the PDF page", + }, + }, + "required": ["page_content"], + }, + } + ], + tool_choice={"type": "tool", "name": "parse_pdf_page"}, + ) + + if ( + response.choices + and response.choices[0].message + and response.choices[0].message.tool_calls + ): + tool_call = response.choices[0].message.tool_calls[0] + args = json.loads(tool_call.function.arguments) + content = args.get("page_content", "") + page_elapsed = time.perf_counter() - page_start + logger.debug( + f"Processed page {page_num} in {page_elapsed:.2f} seconds." + ) + + return {"page": str(page_num), "content": content} + else: + logger.warning( + f"No valid tool call in response for page {page_num}, document might be missing text." + ) + else: + response = await self.llm_provider.aget_completion( + messages=messages, generation_config=generation_config + ) + req_elapsed = time.perf_counter() - req_start + logger.debug( + f"Vision model response for page {page_num} received in {req_elapsed:.2f} seconds." + ) + + if response.choices and response.choices[0].message: + content = response.choices[0].message.content + page_elapsed = time.perf_counter() - page_start + logger.debug( + f"Processed page {page_num} in {page_elapsed:.2f} seconds." + ) + return {"page": str(page_num), "content": content} + else: + msg = f"No response content for page {page_num}" + logger.error(msg) + raise ValueError(msg) + except Exception as e: + logger.error( + f"Error processing page {page_num} with vision model: {str(e)}" + ) + raise + + async def ingest( + self, data: str | bytes, maintain_order: bool = True, **kwargs + ) -> AsyncGenerator[dict[str, str | int], None]: + """Ingest PDF data and yield the text description for each page using + the vision model. + + (This version yields a string per page rather than a dictionary.) + """ + ingest_start = time.perf_counter() + logger.info("Starting PDF ingestion using VLMPDFParser.") + if not self.vision_prompt_text: + self.vision_prompt_text = ( + await self.database_provider.prompts_handler.get_cached_prompt( + prompt_name=self.config.vision_pdf_prompt_name + ) + ) + logger.info("Retrieved vision prompt text from database.") + + try: + # Convert PDF to images (in-memory) + images = await self.convert_pdf_to_images(data) + + # Create asynchronous tasks for processing each page + tasks = { + asyncio.create_task( + self.process_page(image, page_num) + ): page_num + for page_num, image in enumerate(images, 1) + } + + if maintain_order: + pending = set(tasks.keys()) + results = {} + next_page = 1 + while pending: + done, pending = await asyncio.wait( + pending, return_when=asyncio.FIRST_COMPLETED + ) + for task in done: + result = await task + page_num = int(result["page"]) + results[page_num] = result + while next_page in results: + yield { + "content": results[next_page]["content"] or "", + "page_number": next_page, + } + results.pop(next_page) + next_page += 1 + else: + # Yield results as tasks complete + for coro in asyncio.as_completed(tasks.keys()): + result = await coro + yield { + "content": result["content"], + "page_number": int(result["page"]), + } + total_elapsed = time.perf_counter() - ingest_start + logger.info( + f"Completed PDF ingestion in {total_elapsed:.2f} seconds using VLMPDFParser." + ) + except Exception as e: + logger.error(f"Error processing PDF: {str(e)}") + raise + + +class BasicPDFParser(AsyncParser[str | bytes]): + """A parser for PDF data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.PdfReader = PdfReader + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest PDF data and yield text from each page.""" + if isinstance(data, str): + raise ValueError("PDF data must be in bytes format.") + pdf = self.PdfReader(BytesIO(data)) + for page in pdf.pages: + page_text = page.extract_text() + if page_text is not None: + page_text = "".join( + filter( + lambda x: ( + unicodedata.category(x) + in [ + "Ll", + "Lu", + "Lt", + "Lm", + "Lo", + "Nl", + "No", + ] # Keep letters and numbers + or "\u4e00" <= x <= "\u9fff" # Chinese characters + or "\u0600" <= x <= "\u06ff" # Arabic characters + or "\u0400" <= x <= "\u04ff" # Cyrillic letters + or "\u0370" <= x <= "\u03ff" # Greek letters + or "\u0e00" <= x <= "\u0e7f" # Thai + or "\u3040" <= x <= "\u309f" # Japanese Hiragana + or "\u30a0" <= x <= "\u30ff" # Katakana + or x in string.printable + ), + page_text, + ) + ) # Keep characters in common languages ; # Filter out non-printable characters + yield page_text + + +class PDFParserUnstructured(AsyncParser[str | bytes]): + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + try: + from unstructured.partition.pdf import partition_pdf + + self.partition_pdf = partition_pdf + + except ImportError as e: + logger.error("PDFParserUnstructured ImportError : ", e) + + async def ingest( + self, + data: str | bytes, + partition_strategy: str = "hi_res", + chunking_strategy="by_title", + ) -> AsyncGenerator[str, None]: + # partition the pdf + elements = self.partition_pdf( + file=BytesIO(data), + partition_strategy=partition_strategy, + chunking_strategy=chunking_strategy, + ) + for element in elements: + yield element.text diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py new file mode 100644 index 00000000..c8bbaa55 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py @@ -0,0 +1,88 @@ +# type: ignore +import struct +from io import BytesIO +from typing import AsyncGenerator + +import olefile + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class PPTParser(AsyncParser[str | bytes]): + """A parser for legacy PPT (PowerPoint 97-2003) files.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.olefile = olefile + + def _extract_text_from_record(self, data: bytes) -> str: + """Extract text from a PPT text record.""" + try: + # Skip record header + text_data = data[8:] + # Convert from UTF-16-LE + return text_data.decode("utf-16-le", errors="ignore").strip() + except Exception: + return "" + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest PPT data and yield text from each slide.""" + if isinstance(data, str): + raise ValueError("PPT data must be in bytes format.") + + try: + ole = self.olefile.OleFileIO(BytesIO(data)) + + # PPT stores text in PowerPoint Document stream + if not ole.exists("PowerPoint Document"): + raise ValueError("Not a valid PowerPoint file") + + # Read PowerPoint Document stream + ppt_stream = ole.openstream("PowerPoint Document") + content = ppt_stream.read() + + # Text records start with 0x0FA0 or 0x0FD0 + text_markers = [b"\xa0\x0f", b"\xd0\x0f"] + + current_position = 0 + while current_position < len(content): + # Look for text markers + for marker in text_markers: + marker_pos = content.find(marker, current_position) + if marker_pos != -1: + # Get record size from header (4 bytes after marker) + size_bytes = content[marker_pos + 2 : marker_pos + 6] + record_size = struct.unpack("<I", size_bytes)[0] + + # Extract record data + record_data = content[ + marker_pos : marker_pos + record_size + 8 + ] + text = self._extract_text_from_record(record_data) + + if text.strip(): + yield text.strip() + + current_position = marker_pos + record_size + 8 + break + else: + current_position += 1 + + except Exception as e: + raise ValueError(f"Error processing PPT file: {str(e)}") from e + finally: + ole.close() diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py new file mode 100644 index 00000000..8685c8fb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py @@ -0,0 +1,40 @@ +# type: ignore +from io import BytesIO +from typing import AsyncGenerator + +from pptx import Presentation + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class PPTXParser(AsyncParser[str | bytes]): + """A parser for PPT data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.Presentation = Presentation + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: # type: ignore + """Ingest PPT data and yield text from each slide.""" + if isinstance(data, str): + raise ValueError("PPT data must be in bytes format.") + + prs = self.Presentation(BytesIO(data)) + for slide in prs.slides: + for shape in slide.shapes: + if hasattr(shape, "text"): + yield shape.text diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py new file mode 100644 index 00000000..6be12076 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py @@ -0,0 +1,45 @@ +# type: ignore +from typing import AsyncGenerator + +from striprtf.striprtf import rtf_to_text + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class RTFParser(AsyncParser[str | bytes]): + """Parser for Rich Text Format (.rtf) files.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.striprtf = rtf_to_text + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + if isinstance(data, bytes): + data = data.decode("utf-8", errors="ignore") + + try: + # Convert RTF to plain text + plain_text = self.striprtf(data) + + # Split into paragraphs and yield non-empty ones + paragraphs = plain_text.split("\n\n") + for paragraph in paragraphs: + if paragraph.strip(): + yield paragraph.strip() + + except Exception as e: + raise ValueError(f"Error processing RTF file: {str(e)}") from e |