about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/core/parsers
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/parsers')
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/__init__.py35
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py26
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py74
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py78
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py108
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py38
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py281
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py60
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py363
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py88
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py40
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py45
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/structured/__init__.py28
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/structured/csv_parser.py108
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/structured/eml_parser.py63
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/structured/epub_parser.py121
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/structured/json_parser.py94
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/structured/msg_parser.py65
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/structured/org_parser.py72
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/structured/p7s_parser.py178
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/structured/rst_parser.py58
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/structured/tsv_parser.py109
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/structured/xls_parser.py140
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/structured/xlsx_parser.py100
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/text/__init__.py10
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/text/html_parser.py32
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/text/md_parser.py39
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/text/text_parser.py30
28 files changed, 2483 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/__init__.py b/.venv/lib/python3.12/site-packages/core/parsers/__init__.py
new file mode 100644
index 00000000..8a7d5bbe
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/__init__.py
@@ -0,0 +1,35 @@
+from .media import *
+from .structured import *
+from .text import *
+
+__all__ = [
+    "AudioParser",
+    "BMPParser",
+    "DOCParser",
+    "DOCXParser",
+    "ImageParser",
+    "ODTParser",
+    "VLMPDFParser",
+    "BasicPDFParser",
+    "PDFParserUnstructured",
+    "VLMPDFParser",
+    "PPTParser",
+    "PPTXParser",
+    "RTFParser",
+    "CSVParser",
+    "CSVParserAdvanced",
+    "EMLParser",
+    "EPUBParser",
+    "JSONParser",
+    "MSGParser",
+    "ORGParser",
+    "P7SParser",
+    "RSTParser",
+    "TSVParser",
+    "XLSParser",
+    "XLSXParser",
+    "XLSXParserAdvanced",
+    "MDParser",
+    "HTMLParser",
+    "TextParser",
+]
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py b/.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py
new file mode 100644
index 00000000..c268b673
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py
@@ -0,0 +1,26 @@
+# type: ignore
+from .audio_parser import AudioParser
+from .bmp_parser import BMPParser
+from .doc_parser import DOCParser
+from .docx_parser import DOCXParser
+from .img_parser import ImageParser
+from .odt_parser import ODTParser
+from .pdf_parser import BasicPDFParser, PDFParserUnstructured, VLMPDFParser
+from .ppt_parser import PPTParser
+from .pptx_parser import PPTXParser
+from .rtf_parser import RTFParser
+
+__all__ = [
+    "AudioParser",
+    "BMPParser",
+    "DOCParser",
+    "DOCXParser",
+    "ImageParser",
+    "ODTParser",
+    "VLMPDFParser",
+    "BasicPDFParser",
+    "PDFParserUnstructured",
+    "PPTParser",
+    "PPTXParser",
+    "RTFParser",
+]
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py
new file mode 100644
index 00000000..7d5f9f1d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py
@@ -0,0 +1,74 @@
+# type: ignore
+import logging
+import os
+import tempfile
+from typing import AsyncGenerator
+
+from litellm import atranscription
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+logger = logging.getLogger()
+
+
+class AudioParser(AsyncParser[bytes]):
+    """A parser for audio data using Whisper transcription."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.atranscription = atranscription
+
+    async def ingest(  # type: ignore
+        self, data: bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest audio data and yield a transcription using Whisper via
+        LiteLLM.
+
+        Args:
+            data: Raw audio bytes
+            *args, **kwargs: Additional arguments passed to the transcription call
+
+        Yields:
+            Chunks of transcribed text
+        """
+        try:
+            # Create a temporary file to store the audio data
+            with tempfile.NamedTemporaryFile(
+                suffix=".wav", delete=False
+            ) as temp_file:
+                temp_file.write(data)
+                temp_file_path = temp_file.name
+
+            # Call Whisper transcription
+            response = await self.atranscription(
+                model=self.config.audio_transcription_model
+                or self.config.app.audio_lm,
+                file=open(temp_file_path, "rb"),
+                **kwargs,
+            )
+
+            # The response should contain the transcribed text directly
+            yield response.text
+
+        except Exception as e:
+            logger.error(f"Error processing audio with Whisper: {str(e)}")
+            raise
+
+        finally:
+            # Clean up the temporary file
+            try:
+                os.unlink(temp_file_path)
+            except Exception as e:
+                logger.warning(f"Failed to delete temporary file: {str(e)}")
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py
new file mode 100644
index 00000000..78646da7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py
@@ -0,0 +1,78 @@
+# type: ignore
+from typing import AsyncGenerator
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class BMPParser(AsyncParser[str | bytes]):
+    """A parser for BMP image data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+
+        import struct
+
+        self.struct = struct
+
+    async def extract_bmp_metadata(self, data: bytes) -> dict:
+        """Extract metadata from BMP file header."""
+        try:
+            # BMP header format
+            header_format = "<2sIHHI"
+            header_size = self.struct.calcsize(header_format)
+
+            # Unpack header data
+            (
+                signature,
+                file_size,
+                reserved,
+                reserved2,
+                data_offset,
+            ) = self.struct.unpack(header_format, data[:header_size])
+
+            # DIB header
+            dib_format = "<IiiHHIIiiII"
+            dib_size = self.struct.calcsize(dib_format)
+            dib_data = self.struct.unpack(dib_format, data[14 : 14 + dib_size])
+
+            width = dib_data[1]
+            height = abs(dib_data[2])  # Height can be negative
+            bits_per_pixel = dib_data[4]
+            compression = dib_data[5]
+
+            return {
+                "width": width,
+                "height": height,
+                "bits_per_pixel": bits_per_pixel,
+                "file_size": file_size,
+                "compression": compression,
+            }
+        except Exception as e:
+            return {"error": f"Failed to parse BMP header: {str(e)}"}
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest BMP data and yield metadata description."""
+        if isinstance(data, str):
+            # Convert base64 string to bytes if needed
+            import base64
+
+            data = base64.b64decode(data)
+
+        metadata = await self.extract_bmp_metadata(data)
+
+        # Generate description of the BMP file
+        yield f"BMP image with dimensions {metadata.get('width', 'unknown')}x{metadata.get('height', 'unknown')} pixels, {metadata.get('bits_per_pixel', 'unknown')} bits per pixel, file size: {metadata.get('file_size', 'unknown')} bytes"
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py
new file mode 100644
index 00000000..5b49e2cc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py
@@ -0,0 +1,108 @@
+# type: ignore
+import re
+from io import BytesIO
+from typing import AsyncGenerator
+
+import olefile
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class DOCParser(AsyncParser[str | bytes]):
+    """A parser for DOC (legacy Microsoft Word) data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.olefile = olefile
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest DOC data and yield text from the document."""
+        if isinstance(data, str):
+            raise ValueError("DOC data must be in bytes format.")
+
+        # Create BytesIO object from the data
+        file_obj = BytesIO(data)
+
+        try:
+            # Open the DOC file using olefile
+            ole = self.olefile.OleFileIO(file_obj)
+
+            # Check if it's a Word document
+            if not ole.exists("WordDocument"):
+                raise ValueError("Not a valid Word document")
+
+            # Read the WordDocument stream
+            word_stream = ole.openstream("WordDocument").read()
+
+            # Read the text from the 0Table or 1Table stream (contains the text)
+            if ole.exists("1Table"):
+                table_stream = ole.openstream("1Table").read()
+            elif ole.exists("0Table"):
+                table_stream = ole.openstream("0Table").read()
+            else:
+                table_stream = b""
+
+            # Extract text content
+            text = self._extract_text(word_stream, table_stream)
+
+            # Clean and split the text
+            paragraphs = self._clean_text(text)
+
+            # Yield non-empty paragraphs
+            for paragraph in paragraphs:
+                if paragraph.strip():
+                    yield paragraph.strip()
+
+        except Exception as e:
+            raise ValueError(f"Error processing DOC file: {str(e)}") from e
+        finally:
+            ole.close()
+            file_obj.close()
+
+    def _extract_text(self, word_stream: bytes, table_stream: bytes) -> str:
+        """Extract text from Word document streams."""
+        try:
+            text = word_stream.replace(b"\x00", b"").decode(
+                "utf-8", errors="ignore"
+            )
+
+            # If table_stream exists, try to extract additional text
+            if table_stream:
+                table_text = table_stream.replace(b"\x00", b"").decode(
+                    "utf-8", errors="ignore"
+                )
+                text += table_text
+
+            return text
+        except Exception as e:
+            raise ValueError(f"Error extracting text: {str(e)}") from e
+
+    def _clean_text(self, text: str) -> list[str]:
+        """Clean and split the extracted text into paragraphs."""
+        # Remove binary artifacts and control characters
+        text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\xFF]", "", text)
+
+        # Remove multiple spaces and newlines
+        text = re.sub(r"\s+", " ", text)
+
+        # Split into paragraphs on double newlines or other common separators
+        paragraphs = re.split(r"\n\n|\r\n\r\n|\f", text)
+
+        # Remove empty or whitespace-only paragraphs
+        paragraphs = [p.strip() for p in paragraphs if p.strip()]
+
+        return paragraphs
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py
new file mode 100644
index 00000000..988f8341
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py
@@ -0,0 +1,38 @@
+# type: ignore
+from io import BytesIO
+from typing import AsyncGenerator
+
+from docx import Document
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class DOCXParser(AsyncParser[str | bytes]):
+    """A parser for DOCX data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.Document = Document
+
+    async def ingest(
+        self, data: str | bytes, *args, **kwargs
+    ) -> AsyncGenerator[str, None]:  # type: ignore
+        """Ingest DOCX data and yield text from each paragraph."""
+        if isinstance(data, str):
+            raise ValueError("DOCX data must be in bytes format.")
+
+        doc = self.Document(BytesIO(data))
+        for paragraph in doc.paragraphs:
+            yield paragraph.text
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py
new file mode 100644
index 00000000..bcb37eab
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py
@@ -0,0 +1,281 @@
+# type: ignore
+import base64
+import logging
+from io import BytesIO
+from typing import AsyncGenerator, Optional
+
+import filetype
+import pillow_heif
+from PIL import Image
+
+from core.base.abstractions import GenerationConfig
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+logger = logging.getLogger()
+
+
+class ImageParser(AsyncParser[str | bytes]):
+    # Mapping of file extensions to MIME types
+    MIME_TYPE_MAPPING = {
+        "bmp": "image/bmp",
+        "gif": "image/gif",
+        "heic": "image/heic",
+        "jpeg": "image/jpeg",
+        "jpg": "image/jpeg",
+        "png": "image/png",
+        "tiff": "image/tiff",
+        "tif": "image/tiff",
+        "webp": "image/webp",
+    }
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.vision_prompt_text = None
+        self.Image = Image
+        self.pillow_heif = pillow_heif
+        self.pillow_heif.register_heif_opener()
+
+    def _is_heic(self, data: bytes) -> bool:
+        """Detect HEIC format using magic numbers and patterns."""
+        heic_patterns = [
+            b"ftyp",
+            b"heic",
+            b"heix",
+            b"hevc",
+            b"HEIC",
+            b"mif1",
+            b"msf1",
+            b"hevc",
+            b"hevx",
+        ]
+
+        try:
+            header = data[:32]  # Get first 32 bytes
+            return any(pattern in header for pattern in heic_patterns)
+        except Exception as e:
+            logger.error(f"Error checking for HEIC format: {str(e)}")
+            return False
+
+    async def _convert_heic_to_jpeg(self, data: bytes) -> bytes:
+        """Convert HEIC image to JPEG format."""
+        try:
+            # Create BytesIO object for input
+            input_buffer = BytesIO(data)
+
+            # Load HEIC image using pillow_heif
+            heif_file = self.pillow_heif.read_heif(input_buffer)
+
+            # Get the primary image - API changed, need to get first image
+            heif_image = heif_file[0]  # Get first image in the container
+
+            # Convert to PIL Image directly from the HEIF image
+            pil_image = heif_image.to_pillow()
+
+            # Convert to RGB if needed
+            if pil_image.mode != "RGB":
+                pil_image = pil_image.convert("RGB")
+
+            # Save as JPEG
+            output_buffer = BytesIO()
+            pil_image.save(output_buffer, format="JPEG", quality=95)
+            return output_buffer.getvalue()
+
+        except Exception as e:
+            logger.error(f"Error converting HEIC to JPEG: {str(e)}")
+            raise
+
+    def _is_jpeg(self, data: bytes) -> bool:
+        """Detect JPEG format using magic numbers."""
+        return len(data) >= 2 and data[0] == 0xFF and data[1] == 0xD8
+
+    def _is_png(self, data: bytes) -> bool:
+        """Detect PNG format using magic numbers."""
+        png_signature = b"\x89PNG\r\n\x1a\n"
+        return data.startswith(png_signature)
+
+    def _is_bmp(self, data: bytes) -> bool:
+        """Detect BMP format using magic numbers."""
+        return data.startswith(b"BM")
+
+    def _is_tiff(self, data: bytes) -> bool:
+        """Detect TIFF format using magic numbers."""
+        return (
+            data.startswith(b"II*\x00")  # Little-endian
+            or data.startswith(b"MM\x00*")
+        )  # Big-endian
+
+    def _get_image_media_type(
+        self, data: bytes, filename: Optional[str] = None
+    ) -> str:
+        """
+        Determine the correct media type based on image data and/or filename.
+
+        Args:
+            data: The binary image data
+            filename: Optional filename which may contain extension information
+
+        Returns:
+            str: The MIME type for the image
+        """
+        try:
+            # First, try format-specific detection functions
+            if self._is_heic(data):
+                return "image/heic"
+            if self._is_jpeg(data):
+                return "image/jpeg"
+            if self._is_png(data):
+                return "image/png"
+            if self._is_bmp(data):
+                return "image/bmp"
+            if self._is_tiff(data):
+                return "image/tiff"
+
+            # Try using filetype as a fallback
+            img_type = filetype.guess(data)
+            if img_type:
+                # Map the detected type to a MIME type
+                return self.MIME_TYPE_MAPPING.get(
+                    img_type, f"image/{img_type}"
+                )
+
+            # If we have a filename, try to get the type from the extension
+            if filename:
+                extension = filename.split(".")[-1].lower()
+                if extension in self.MIME_TYPE_MAPPING:
+                    return self.MIME_TYPE_MAPPING[extension]
+
+            # If all else fails, default to octet-stream (generic binary)
+            logger.warning(
+                "Could not determine image type, using application/octet-stream"
+            )
+            return "application/octet-stream"
+
+        except Exception as e:
+            logger.error(f"Error determining image media type: {str(e)}")
+            return "application/octet-stream"  # Default to generic binary as fallback
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        if not self.vision_prompt_text:
+            self.vision_prompt_text = (
+                await self.database_provider.prompts_handler.get_cached_prompt(
+                    prompt_name=self.config.vision_img_prompt_name
+                )
+            )
+        try:
+            filename = kwargs.get("filename", None)
+            # Whether to convert HEIC to JPEG (default: True for backward compatibility)
+            convert_heic = kwargs.get("convert_heic", True)
+
+            if isinstance(data, bytes):
+                try:
+                    # First detect the original media type
+                    original_media_type = self._get_image_media_type(
+                        data, filename
+                    )
+                    logger.debug(
+                        f"Detected original image type: {original_media_type}"
+                    )
+
+                    # Determine if we need to convert HEIC
+                    is_heic_format = self._is_heic(data)
+
+                    # Handle HEIC images
+                    if is_heic_format and convert_heic:
+                        logger.debug(
+                            "Detected HEIC format, converting to JPEG"
+                        )
+                        data = await self._convert_heic_to_jpeg(data)
+                        media_type = "image/jpeg"
+                    else:
+                        # Keep original format and media type
+                        media_type = original_media_type
+
+                    # Encode the data to base64
+                    image_data = base64.b64encode(data).decode("utf-8")
+
+                except Exception as e:
+                    logger.error(f"Error processing image data: {str(e)}")
+                    raise
+            else:
+                # If data is already a string (base64), we assume it has a reliable content type
+                # from the source that encoded it
+                image_data = data
+
+                # Try to determine the media type from the context if available
+                media_type = kwargs.get(
+                    "media_type", "application/octet-stream"
+                )
+
+            # Get the model from kwargs or config
+            model = kwargs.get("vlm", None) or self.config.app.vlm
+
+            generation_config = GenerationConfig(
+                model=model,
+                stream=False,
+            )
+
+            logger.debug(f"Using model: {model}, media_type: {media_type}")
+
+            if "anthropic" in model:
+                messages = [
+                    {
+                        "role": "user",
+                        "content": [
+                            {"type": "text", "text": self.vision_prompt_text},
+                            {
+                                "type": "image",
+                                "source": {
+                                    "type": "base64",
+                                    "media_type": media_type,
+                                    "data": image_data,
+                                },
+                            },
+                        ],
+                    }
+                ]
+            else:
+                # For OpenAI-style APIs, use their format
+                messages = [
+                    {
+                        "role": "user",
+                        "content": [
+                            {"type": "text", "text": self.vision_prompt_text},
+                            {
+                                "type": "image_url",
+                                "image_url": {
+                                    "url": f"data:{media_type};base64,{image_data}"
+                                },
+                            },
+                        ],
+                    }
+                ]
+
+            response = await self.llm_provider.aget_completion(
+                messages=messages, generation_config=generation_config
+            )
+
+            if response.choices and response.choices[0].message:
+                content = response.choices[0].message.content
+                if not content:
+                    raise ValueError("No content in response")
+                yield content
+            else:
+                raise ValueError("No response content")
+
+        except Exception as e:
+            logger.error(f"Error processing image with vision model: {str(e)}")
+            raise
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py
new file mode 100644
index 00000000..cb146464
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py
@@ -0,0 +1,60 @@
+# type: ignore
+import xml.etree.ElementTree as ET
+import zipfile
+from typing import AsyncGenerator
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class ODTParser(AsyncParser[str | bytes]):
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.zipfile = zipfile
+        self.ET = ET
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        if isinstance(data, str):
+            raise ValueError("ODT data must be in bytes format.")
+
+        from io import BytesIO
+
+        file_obj = BytesIO(data)
+
+        try:
+            with self.zipfile.ZipFile(file_obj) as odt:
+                # ODT files are zip archives containing content.xml
+                content = odt.read("content.xml")
+                root = self.ET.fromstring(content)
+
+                # ODT XML namespace
+                ns = {"text": "urn:oasis:names:tc:opendocument:xmlns:text:1.0"}
+
+                # Extract paragraphs and headers
+                for p in root.findall(".//text:p", ns):
+                    text = "".join(p.itertext())
+                    if text.strip():
+                        yield text.strip()
+
+                for h in root.findall(".//text:h", ns):
+                    text = "".join(h.itertext())
+                    if text.strip():
+                        yield text.strip()
+
+        except Exception as e:
+            raise ValueError(f"Error processing ODT file: {str(e)}") from e
+        finally:
+            file_obj.close()
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py
new file mode 100644
index 00000000..b33ccb63
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py
@@ -0,0 +1,363 @@
+# type: ignore
+import asyncio
+import base64
+import json
+import logging
+import string
+import time
+import unicodedata
+from io import BytesIO
+from typing import AsyncGenerator
+
+from pdf2image import convert_from_bytes, convert_from_path
+from pdf2image.exceptions import PDFInfoNotInstalledError
+from PIL import Image
+from pypdf import PdfReader
+
+from core.base.abstractions import GenerationConfig
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+from shared.abstractions import PDFParsingError, PopplerNotFoundError
+
+logger = logging.getLogger()
+
+
+class VLMPDFParser(AsyncParser[str | bytes]):
+    """A parser for PDF documents using vision models for page processing."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.vision_prompt_text = None
+
+    async def convert_pdf_to_images(
+        self, data: str | bytes
+    ) -> list[Image.Image]:
+        """Convert PDF pages to images asynchronously using in-memory
+        conversion."""
+        logger.info("Starting PDF conversion to images.")
+        start_time = time.perf_counter()
+        options = {
+            "dpi": 300,  # You can make this configurable via self.config if needed
+            "fmt": "jpeg",
+            "thread_count": 4,
+            "paths_only": False,  # Return PIL Image objects instead of writing to disk
+        }
+        try:
+            if isinstance(data, bytes):
+                images = await asyncio.to_thread(
+                    convert_from_bytes, data, **options
+                )
+            else:
+                images = await asyncio.to_thread(
+                    convert_from_path, data, **options
+                )
+            elapsed = time.perf_counter() - start_time
+            logger.info(
+                f"PDF conversion completed in {elapsed:.2f} seconds, total pages: {len(images)}"
+            )
+            return images
+        except PDFInfoNotInstalledError as e:
+            logger.error(
+                "PDFInfoNotInstalledError encountered during PDF conversion."
+            )
+            raise PopplerNotFoundError() from e
+        except Exception as err:
+            logger.error(
+                f"Error converting PDF to images: {err} type: {type(err)}"
+            )
+            raise PDFParsingError(
+                f"Failed to process PDF: {str(err)}", err
+            ) from err
+
+    async def process_page(
+        self, image: Image.Image, page_num: int
+    ) -> dict[str, str]:
+        """Process a single PDF page using the vision model."""
+        page_start = time.perf_counter()
+        try:
+            # Convert PIL image to JPEG bytes in-memory
+            buf = BytesIO()
+            image.save(buf, format="JPEG")
+            buf.seek(0)
+            image_data = buf.read()
+            image_base64 = base64.b64encode(image_data).decode("utf-8")
+
+            model = self.config.app.vlm
+
+            # Configure generation parameters
+            generation_config = GenerationConfig(
+                model=self.config.app.vlm,
+                stream=False,
+            )
+
+            is_anthropic = model and "anthropic/" in model
+
+            # FIXME: This is a hacky fix to handle the different formats
+            # that was causing an outage. This logic really needs to be refactored
+            # and cleaned up such that it handles providers more robustly.
+
+            # Prepare message with image content
+            if is_anthropic:
+                messages = [
+                    {
+                        "role": "user",
+                        "content": [
+                            {"type": "text", "text": self.vision_prompt_text},
+                            {
+                                "type": "image",
+                                "source": {
+                                    "type": "base64",
+                                    "media_type": "image/jpeg",
+                                    "data": image_base64,
+                                },
+                            },
+                        ],
+                    }
+                ]
+            else:
+                # Use OpenAI format
+                messages = [
+                    {
+                        "role": "user",
+                        "content": [
+                            {"type": "text", "text": self.vision_prompt_text},
+                            {
+                                "type": "image_url",
+                                "image_url": {
+                                    "url": f"data:image/jpeg;base64,{image_base64}"
+                                },
+                            },
+                        ],
+                    }
+                ]
+
+            logger.debug(f"Sending page {page_num} to vision model.")
+            req_start = time.perf_counter()
+            if is_anthropic:
+                response = await self.llm_provider.aget_completion(
+                    messages=messages,
+                    generation_config=generation_config,
+                    tools=[
+                        {
+                            "name": "parse_pdf_page",
+                            "description": "Parse text content from a PDF page",
+                            "input_schema": {
+                                "type": "object",
+                                "properties": {
+                                    "page_content": {
+                                        "type": "string",
+                                        "description": "Extracted text from the PDF page",
+                                    },
+                                },
+                                "required": ["page_content"],
+                            },
+                        }
+                    ],
+                    tool_choice={"type": "tool", "name": "parse_pdf_page"},
+                )
+
+                if (
+                    response.choices
+                    and response.choices[0].message
+                    and response.choices[0].message.tool_calls
+                ):
+                    tool_call = response.choices[0].message.tool_calls[0]
+                    args = json.loads(tool_call.function.arguments)
+                    content = args.get("page_content", "")
+                    page_elapsed = time.perf_counter() - page_start
+                    logger.debug(
+                        f"Processed page {page_num} in {page_elapsed:.2f} seconds."
+                    )
+
+                    return {"page": str(page_num), "content": content}
+                else:
+                    logger.warning(
+                        f"No valid tool call in response for page {page_num}, document might be missing text."
+                    )
+            else:
+                response = await self.llm_provider.aget_completion(
+                    messages=messages, generation_config=generation_config
+                )
+                req_elapsed = time.perf_counter() - req_start
+                logger.debug(
+                    f"Vision model response for page {page_num} received in {req_elapsed:.2f} seconds."
+                )
+
+                if response.choices and response.choices[0].message:
+                    content = response.choices[0].message.content
+                    page_elapsed = time.perf_counter() - page_start
+                    logger.debug(
+                        f"Processed page {page_num} in {page_elapsed:.2f} seconds."
+                    )
+                    return {"page": str(page_num), "content": content}
+                else:
+                    msg = f"No response content for page {page_num}"
+                    logger.error(msg)
+                    raise ValueError(msg)
+        except Exception as e:
+            logger.error(
+                f"Error processing page {page_num} with vision model: {str(e)}"
+            )
+            raise
+
+    async def ingest(
+        self, data: str | bytes, maintain_order: bool = True, **kwargs
+    ) -> AsyncGenerator[dict[str, str | int], None]:
+        """Ingest PDF data and yield the text description for each page using
+        the vision model.
+
+        (This version yields a string per page rather than a dictionary.)
+        """
+        ingest_start = time.perf_counter()
+        logger.info("Starting PDF ingestion using VLMPDFParser.")
+        if not self.vision_prompt_text:
+            self.vision_prompt_text = (
+                await self.database_provider.prompts_handler.get_cached_prompt(
+                    prompt_name=self.config.vision_pdf_prompt_name
+                )
+            )
+            logger.info("Retrieved vision prompt text from database.")
+
+        try:
+            # Convert PDF to images (in-memory)
+            images = await self.convert_pdf_to_images(data)
+
+            # Create asynchronous tasks for processing each page
+            tasks = {
+                asyncio.create_task(
+                    self.process_page(image, page_num)
+                ): page_num
+                for page_num, image in enumerate(images, 1)
+            }
+
+            if maintain_order:
+                pending = set(tasks.keys())
+                results = {}
+                next_page = 1
+                while pending:
+                    done, pending = await asyncio.wait(
+                        pending, return_when=asyncio.FIRST_COMPLETED
+                    )
+                    for task in done:
+                        result = await task
+                        page_num = int(result["page"])
+                        results[page_num] = result
+                        while next_page in results:
+                            yield {
+                                "content": results[next_page]["content"] or "",
+                                "page_number": next_page,
+                            }
+                            results.pop(next_page)
+                            next_page += 1
+            else:
+                # Yield results as tasks complete
+                for coro in asyncio.as_completed(tasks.keys()):
+                    result = await coro
+                    yield {
+                        "content": result["content"],
+                        "page_number": int(result["page"]),
+                    }
+            total_elapsed = time.perf_counter() - ingest_start
+            logger.info(
+                f"Completed PDF ingestion in {total_elapsed:.2f} seconds using VLMPDFParser."
+            )
+        except Exception as e:
+            logger.error(f"Error processing PDF: {str(e)}")
+            raise
+
+
+class BasicPDFParser(AsyncParser[str | bytes]):
+    """A parser for PDF data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.PdfReader = PdfReader
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest PDF data and yield text from each page."""
+        if isinstance(data, str):
+            raise ValueError("PDF data must be in bytes format.")
+        pdf = self.PdfReader(BytesIO(data))
+        for page in pdf.pages:
+            page_text = page.extract_text()
+            if page_text is not None:
+                page_text = "".join(
+                    filter(
+                        lambda x: (
+                            unicodedata.category(x)
+                            in [
+                                "Ll",
+                                "Lu",
+                                "Lt",
+                                "Lm",
+                                "Lo",
+                                "Nl",
+                                "No",
+                            ]  # Keep letters and numbers
+                            or "\u4e00" <= x <= "\u9fff"  # Chinese characters
+                            or "\u0600" <= x <= "\u06ff"  # Arabic characters
+                            or "\u0400" <= x <= "\u04ff"  # Cyrillic letters
+                            or "\u0370" <= x <= "\u03ff"  # Greek letters
+                            or "\u0e00" <= x <= "\u0e7f"  # Thai
+                            or "\u3040" <= x <= "\u309f"  # Japanese Hiragana
+                            or "\u30a0" <= x <= "\u30ff"  # Katakana
+                            or x in string.printable
+                        ),
+                        page_text,
+                    )
+                )  # Keep characters in common languages ; # Filter out non-printable characters
+                yield page_text
+
+
+class PDFParserUnstructured(AsyncParser[str | bytes]):
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        try:
+            from unstructured.partition.pdf import partition_pdf
+
+            self.partition_pdf = partition_pdf
+
+        except ImportError as e:
+            logger.error("PDFParserUnstructured ImportError :  ", e)
+
+    async def ingest(
+        self,
+        data: str | bytes,
+        partition_strategy: str = "hi_res",
+        chunking_strategy="by_title",
+    ) -> AsyncGenerator[str, None]:
+        # partition the pdf
+        elements = self.partition_pdf(
+            file=BytesIO(data),
+            partition_strategy=partition_strategy,
+            chunking_strategy=chunking_strategy,
+        )
+        for element in elements:
+            yield element.text
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py
new file mode 100644
index 00000000..c8bbaa55
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py
@@ -0,0 +1,88 @@
+# type: ignore
+import struct
+from io import BytesIO
+from typing import AsyncGenerator
+
+import olefile
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class PPTParser(AsyncParser[str | bytes]):
+    """A parser for legacy PPT (PowerPoint 97-2003) files."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.olefile = olefile
+
+    def _extract_text_from_record(self, data: bytes) -> str:
+        """Extract text from a PPT text record."""
+        try:
+            # Skip record header
+            text_data = data[8:]
+            # Convert from UTF-16-LE
+            return text_data.decode("utf-16-le", errors="ignore").strip()
+        except Exception:
+            return ""
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest PPT data and yield text from each slide."""
+        if isinstance(data, str):
+            raise ValueError("PPT data must be in bytes format.")
+
+        try:
+            ole = self.olefile.OleFileIO(BytesIO(data))
+
+            # PPT stores text in PowerPoint Document stream
+            if not ole.exists("PowerPoint Document"):
+                raise ValueError("Not a valid PowerPoint file")
+
+            # Read PowerPoint Document stream
+            ppt_stream = ole.openstream("PowerPoint Document")
+            content = ppt_stream.read()
+
+            # Text records start with 0x0FA0 or 0x0FD0
+            text_markers = [b"\xa0\x0f", b"\xd0\x0f"]
+
+            current_position = 0
+            while current_position < len(content):
+                # Look for text markers
+                for marker in text_markers:
+                    marker_pos = content.find(marker, current_position)
+                    if marker_pos != -1:
+                        # Get record size from header (4 bytes after marker)
+                        size_bytes = content[marker_pos + 2 : marker_pos + 6]
+                        record_size = struct.unpack("<I", size_bytes)[0]
+
+                        # Extract record data
+                        record_data = content[
+                            marker_pos : marker_pos + record_size + 8
+                        ]
+                        text = self._extract_text_from_record(record_data)
+
+                        if text.strip():
+                            yield text.strip()
+
+                        current_position = marker_pos + record_size + 8
+                        break
+                else:
+                    current_position += 1
+
+        except Exception as e:
+            raise ValueError(f"Error processing PPT file: {str(e)}") from e
+        finally:
+            ole.close()
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py
new file mode 100644
index 00000000..8685c8fb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py
@@ -0,0 +1,40 @@
+# type: ignore
+from io import BytesIO
+from typing import AsyncGenerator
+
+from pptx import Presentation
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class PPTXParser(AsyncParser[str | bytes]):
+    """A parser for PPT data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.Presentation = Presentation
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:  # type: ignore
+        """Ingest PPT data and yield text from each slide."""
+        if isinstance(data, str):
+            raise ValueError("PPT data must be in bytes format.")
+
+        prs = self.Presentation(BytesIO(data))
+        for slide in prs.slides:
+            for shape in slide.shapes:
+                if hasattr(shape, "text"):
+                    yield shape.text
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py
new file mode 100644
index 00000000..6be12076
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py
@@ -0,0 +1,45 @@
+# type: ignore
+from typing import AsyncGenerator
+
+from striprtf.striprtf import rtf_to_text
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class RTFParser(AsyncParser[str | bytes]):
+    """Parser for Rich Text Format (.rtf) files."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.striprtf = rtf_to_text
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        if isinstance(data, bytes):
+            data = data.decode("utf-8", errors="ignore")
+
+        try:
+            # Convert RTF to plain text
+            plain_text = self.striprtf(data)
+
+            # Split into paragraphs and yield non-empty ones
+            paragraphs = plain_text.split("\n\n")
+            for paragraph in paragraphs:
+                if paragraph.strip():
+                    yield paragraph.strip()
+
+        except Exception as e:
+            raise ValueError(f"Error processing RTF file: {str(e)}") from e
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/__init__.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/__init__.py
new file mode 100644
index 00000000..a770502e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/__init__.py
@@ -0,0 +1,28 @@
+# type: ignore
+from .csv_parser import CSVParser, CSVParserAdvanced
+from .eml_parser import EMLParser
+from .epub_parser import EPUBParser
+from .json_parser import JSONParser
+from .msg_parser import MSGParser
+from .org_parser import ORGParser
+from .p7s_parser import P7SParser
+from .rst_parser import RSTParser
+from .tsv_parser import TSVParser
+from .xls_parser import XLSParser
+from .xlsx_parser import XLSXParser, XLSXParserAdvanced
+
+__all__ = [
+    "CSVParser",
+    "CSVParserAdvanced",
+    "EMLParser",
+    "EPUBParser",
+    "JSONParser",
+    "MSGParser",
+    "ORGParser",
+    "P7SParser",
+    "RSTParser",
+    "TSVParser",
+    "XLSParser",
+    "XLSXParser",
+    "XLSXParserAdvanced",
+]
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/csv_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/csv_parser.py
new file mode 100644
index 00000000..d80d5d07
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/csv_parser.py
@@ -0,0 +1,108 @@
+# type: ignore
+from typing import IO, AsyncGenerator, Optional
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class CSVParser(AsyncParser[str | bytes]):
+    """A parser for CSV data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+
+        import csv
+        from io import StringIO
+
+        self.csv = csv
+        self.StringIO = StringIO
+
+    async def ingest(
+        self, data: str | bytes, *args, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest CSV data and yield text from each row."""
+        if isinstance(data, bytes):
+            data = data.decode("utf-8")
+        csv_reader = self.csv.reader(self.StringIO(data))
+        for row in csv_reader:
+            yield ", ".join(row)
+
+
+class CSVParserAdvanced(AsyncParser[str | bytes]):
+    """A parser for CSV data."""
+
+    def __init__(
+        self, config: IngestionConfig, llm_provider: CompletionProvider
+    ):
+        self.llm_provider = llm_provider
+        self.config = config
+
+        import csv
+        from io import StringIO
+
+        self.csv = csv
+        self.StringIO = StringIO
+
+    def get_delimiter(
+        self, file_path: Optional[str] = None, file: Optional[IO[bytes]] = None
+    ):
+        sniffer = self.csv.Sniffer()
+        num_bytes = 65536
+
+        if file:
+            lines = file.readlines(num_bytes)
+            file.seek(0)
+            data = "\n".join(ln.decode("utf-8") for ln in lines)
+        elif file_path is not None:
+            with open(file_path) as f:
+                data = "\n".join(f.readlines(num_bytes))
+
+        return sniffer.sniff(data, delimiters=",;").delimiter
+
+    async def ingest(
+        self,
+        data: str | bytes,
+        num_col_times_num_rows: int = 100,
+        *args,
+        **kwargs,
+    ) -> AsyncGenerator[str, None]:
+        """Ingest CSV data and yield text from each row."""
+        if isinstance(data, bytes):
+            data = data.decode("utf-8")
+        # let the first row be the header
+        delimiter = self.get_delimiter(file=self.StringIO(data))
+
+        csv_reader = self.csv.reader(self.StringIO(data), delimiter=delimiter)
+
+        header = next(csv_reader)
+        num_cols = len(header.split(delimiter))
+        num_rows = num_col_times_num_rows // num_cols
+
+        chunk_rows = []
+        for row_num, row in enumerate(csv_reader):
+            chunk_rows.append(row)
+            if row_num % num_rows == 0:
+                yield (
+                    ", ".join(header)
+                    + "\n"
+                    + "\n".join([", ".join(row) for row in chunk_rows])
+                )
+                chunk_rows = []
+
+        if chunk_rows:
+            yield (
+                ", ".join(header)
+                + "\n"
+                + "\n".join([", ".join(row) for row in chunk_rows])
+            )
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/eml_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/eml_parser.py
new file mode 100644
index 00000000..57a5ceab
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/eml_parser.py
@@ -0,0 +1,63 @@
+# type: ignore
+from email import message_from_bytes, policy
+from typing import AsyncGenerator
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class EMLParser(AsyncParser[str | bytes]):
+    """Parser for EML (email) files."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest EML data and yield email content."""
+        if isinstance(data, str):
+            raise ValueError("EML data must be in bytes format.")
+
+        # Parse email with policy for modern email handling
+        email_message = message_from_bytes(data, policy=policy.default)
+
+        # Extract and yield email metadata
+        metadata = []
+        if email_message["Subject"]:
+            metadata.append(f"Subject: {email_message['Subject']}")
+        if email_message["From"]:
+            metadata.append(f"From: {email_message['From']}")
+        if email_message["To"]:
+            metadata.append(f"To: {email_message['To']}")
+        if email_message["Date"]:
+            metadata.append(f"Date: {email_message['Date']}")
+
+        if metadata:
+            yield "\n".join(metadata)
+
+        # Extract and yield email body
+        if email_message.is_multipart():
+            for part in email_message.walk():
+                if part.get_content_type() == "text/plain":
+                    text = part.get_content()
+                    if text.strip():
+                        yield text.strip()
+                elif part.get_content_type() == "text/html":
+                    # Could add HTML parsing here if needed
+                    continue
+        else:
+            body = email_message.get_content()
+            if body.strip():
+                yield body.strip()
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/epub_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/epub_parser.py
new file mode 100644
index 00000000..ff51fb86
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/epub_parser.py
@@ -0,0 +1,121 @@
+# type: ignore
+import logging
+from typing import AsyncGenerator
+
+import epub
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class EPUBParser(AsyncParser[str | bytes]):
+    """Parser for EPUB electronic book files."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.epub = epub
+
+    def _safe_get_metadata(self, book, field: str) -> str | None:
+        """Safely extract metadata field from epub book."""
+        try:
+            return getattr(book, field, None) or getattr(book.opf, field, None)
+        except Exception as e:
+            logger.debug(f"Error getting {field} metadata: {e}")
+            return None
+
+    def _clean_text(self, content: bytes) -> str:
+        """Clean HTML content and return plain text."""
+        try:
+            import re
+
+            text = content.decode("utf-8", errors="ignore")
+            # Remove HTML tags
+            text = re.sub(r"<[^>]+>", " ", text)
+            # Normalize whitespace
+            text = re.sub(r"\s+", " ", text)
+            # Remove any remaining HTML entities
+            text = re.sub(r"&[^;]+;", " ", text)
+            return text.strip()
+        except Exception as e:
+            logger.warning(f"Error cleaning text: {e}")
+            return ""
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest EPUB data and yield book content."""
+        if isinstance(data, str):
+            raise ValueError("EPUB data must be in bytes format.")
+
+        from io import BytesIO
+
+        file_obj = BytesIO(data)
+
+        try:
+            book = self.epub.open_epub(file_obj)
+
+            # Safely extract metadata
+            metadata = []
+            for field, label in [
+                ("title", "Title"),
+                ("creator", "Author"),
+                ("language", "Language"),
+                ("publisher", "Publisher"),
+                ("date", "Date"),
+            ]:
+                if value := self._safe_get_metadata(book, field):
+                    metadata.append(f"{label}: {value}")
+
+            if metadata:
+                yield "\n".join(metadata)
+
+            # Extract content from items
+            try:
+                manifest = getattr(book.opf, "manifest", {}) or {}
+                for item in manifest.values():
+                    try:
+                        if (
+                            getattr(item, "mime_type", "")
+                            == "application/xhtml+xml"
+                        ):
+                            if content := book.read_item(item):
+                                if cleaned_text := self._clean_text(content):
+                                    yield cleaned_text
+                    except Exception as e:
+                        logger.warning(f"Error processing item: {e}")
+                        continue
+
+            except Exception as e:
+                logger.warning(f"Error accessing manifest: {e}")
+                # Fallback: try to get content directly
+                if hasattr(book, "read_item"):
+                    for item_id in getattr(book, "items", []):
+                        try:
+                            if content := book.read_item(item_id):
+                                if cleaned_text := self._clean_text(content):
+                                    yield cleaned_text
+                        except Exception as e:
+                            logger.warning(f"Error in fallback reading: {e}")
+                            continue
+
+        except Exception as e:
+            logger.error(f"Error processing EPUB file: {str(e)}")
+            raise ValueError(f"Error processing EPUB file: {str(e)}") from e
+        finally:
+            try:
+                file_obj.close()
+            except Exception as e:
+                logger.warning(f"Error closing file: {e}")
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/json_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/json_parser.py
new file mode 100644
index 00000000..3948e4de
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/json_parser.py
@@ -0,0 +1,94 @@
+# type: ignore
+import asyncio
+import json
+from typing import AsyncGenerator
+
+from core.base import R2RException
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class JSONParser(AsyncParser[str | bytes]):
+    """A parser for JSON data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+
+    async def ingest(
+        self, data: str | bytes, *args, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest JSON data and yield a formatted text representation.
+
+        :param data: The JSON data to parse.
+        :param kwargs: Additional keyword arguments.
+        """
+        if isinstance(data, bytes):
+            data = data.decode("utf-8")
+
+        loop = asyncio.get_event_loop()
+
+        try:
+            parsed_json = await loop.run_in_executor(None, json.loads, data)
+            formatted_text = await loop.run_in_executor(
+                None, self._parse_json, parsed_json
+            )
+        except json.JSONDecodeError as e:
+            raise R2RException(
+                message=f"Failed to parse JSON data, likely due to invalid JSON: {str(e)}",
+                status_code=400,
+            ) from e
+
+        chunk_size = kwargs.get("chunk_size")
+        if chunk_size and isinstance(chunk_size, int):
+            # If chunk_size is provided and is an integer, yield the formatted text in chunks
+            for i in range(0, len(formatted_text), chunk_size):
+                yield formatted_text[i : i + chunk_size]
+                await asyncio.sleep(0)
+        else:
+            # If no valid chunk_size is provided, yield the entire formatted text
+            yield formatted_text
+
+    def _parse_json(self, data: dict) -> str:
+        def remove_objects_with_null(obj):
+            if not isinstance(obj, dict):
+                return obj
+            result = obj.copy()
+            for key, value in obj.items():
+                if isinstance(value, dict):
+                    result[key] = remove_objects_with_null(value)
+                elif value is None:
+                    del result[key]
+            return result
+
+        def format_json_as_text(obj, indent=0):
+            lines = []
+            indent_str = " " * indent
+
+            if isinstance(obj, dict):
+                for key, value in obj.items():
+                    if isinstance(value, (dict, list)):
+                        nested = format_json_as_text(value, indent + 2)
+                        lines.append(f"{indent_str}{key}:\n{nested}")
+                    else:
+                        lines.append(f"{indent_str}{key}: {value}")
+            elif isinstance(obj, list):
+                for item in obj:
+                    nested = format_json_as_text(item, indent + 2)
+                    lines.append(f"{nested}")
+            else:
+                return f"{indent_str}{obj}"
+
+            return "\n".join(lines)
+
+        return format_json_as_text(remove_objects_with_null(data))
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/msg_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/msg_parser.py
new file mode 100644
index 00000000..4a024ecf
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/msg_parser.py
@@ -0,0 +1,65 @@
+# type: ignore
+import os
+import tempfile
+from typing import AsyncGenerator
+
+from msg_parser import MsOxMessage
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class MSGParser(AsyncParser[str | bytes]):
+    """Parser for MSG (Outlook Message) files using msg_parser."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest MSG data and yield email content."""
+        if isinstance(data, str):
+            raise ValueError("MSG data must be in bytes format.")
+
+        tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".msg")
+        try:
+            tmp_file.write(data)
+            tmp_file.close()
+
+            msg = MsOxMessage(tmp_file.name)
+
+            metadata = []
+
+            if msg.subject:
+                metadata.append(f"Subject: {msg.subject}")
+            if msg.sender:
+                metadata.append(f"From: {msg.sender}")
+            if msg.to:
+                metadata.append(f"To: {', '.join(msg.to)}")
+            if msg.sent_date:
+                metadata.append(f"Date: {msg.sent_date}")
+            if metadata:
+                yield "\n".join(metadata)
+            if msg.body:
+                yield msg.body.strip()
+
+            for attachment in msg.attachments:
+                if attachment.Filename:
+                    yield f"\nAttachment: {attachment.Filename}"
+
+        except Exception as e:
+            raise ValueError(f"Error processing MSG file: {str(e)}") from e
+        finally:
+            os.remove(tmp_file.name)
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/org_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/org_parser.py
new file mode 100644
index 00000000..2ea3f857
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/org_parser.py
@@ -0,0 +1,72 @@
+# type: ignore
+from typing import AsyncGenerator
+
+import orgparse
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class ORGParser(AsyncParser[str | bytes]):
+    """Parser for ORG (Emacs Org-mode) files."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.orgparse = orgparse
+
+    def _process_node(self, node) -> list[str]:
+        """Process an org-mode node and return its content."""
+        contents = []
+
+        # Add heading with proper level of asterisks
+        if node.level > 0:
+            contents.append(f"{'*' * node.level} {node.heading}")
+
+        # Add body content if exists
+        if node.body:
+            contents.append(node.body.strip())
+
+        return contents
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest ORG data and yield document content."""
+        if isinstance(data, bytes):
+            data = data.decode("utf-8")
+
+        try:
+            # Create a temporary file-like object for orgparse
+            from io import StringIO
+
+            file_obj = StringIO(data)
+
+            # Parse the org file
+            root = self.orgparse.load(file_obj)
+
+            # Process root node if it has content
+            if root.body:
+                yield root.body.strip()
+
+            # Process all nodes
+            for node in root[1:]:  # Skip root node in iteration
+                contents = self._process_node(node)
+                for content in contents:
+                    if content.strip():
+                        yield content.strip()
+
+        except Exception as e:
+            raise ValueError(f"Error processing ORG file: {str(e)}") from e
+        finally:
+            file_obj.close()
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/p7s_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/p7s_parser.py
new file mode 100644
index 00000000..84983494
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/p7s_parser.py
@@ -0,0 +1,178 @@
+# type: ignore
+import email
+import logging
+from base64 import b64decode
+from datetime import datetime
+from email.message import Message
+from typing import AsyncGenerator
+
+from cryptography import x509
+from cryptography.hazmat.primitives.serialization import pkcs7
+from cryptography.x509.oid import NameOID
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class P7SParser(AsyncParser[str | bytes]):
+    """Parser for S/MIME messages containing a P7S (PKCS#7 Signature) file."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.x509 = x509
+        self.pkcs7 = pkcs7
+        self.NameOID = NameOID
+
+    def _format_datetime(self, dt: datetime) -> str:
+        """Format datetime in a readable way."""
+        return dt.strftime("%Y-%m-%d %H:%M:%S UTC")
+
+    def _get_name_attribute(self, name, oid):
+        """Safely get name attribute."""
+        try:
+            return name.get_attributes_for_oid(oid)[0].value
+        except (IndexError, ValueError):
+            return None
+
+    def _extract_cert_info(self, cert) -> dict:
+        """Extract relevant information from a certificate."""
+        try:
+            subject = cert.subject
+            issuer = cert.issuer
+
+            info = {
+                "common_name": self._get_name_attribute(
+                    subject, self.NameOID.COMMON_NAME
+                ),
+                "organization": self._get_name_attribute(
+                    subject, self.NameOID.ORGANIZATION_NAME
+                ),
+                "email": self._get_name_attribute(
+                    subject, self.NameOID.EMAIL_ADDRESS
+                ),
+                "issuer_common_name": self._get_name_attribute(
+                    issuer, self.NameOID.COMMON_NAME
+                ),
+                "issuer_organization": self._get_name_attribute(
+                    issuer, self.NameOID.ORGANIZATION_NAME
+                ),
+                "serial_number": hex(cert.serial_number)[2:],
+                "not_valid_before": self._format_datetime(
+                    cert.not_valid_before
+                ),
+                "not_valid_after": self._format_datetime(cert.not_valid_after),
+                "version": cert.version.name,
+            }
+
+            return {k: v for k, v in info.items() if v is not None}
+
+        except Exception as e:
+            logger.warning(f"Error extracting certificate info: {e}")
+            return {}
+
+    def _try_parse_signature(self, data: bytes):
+        """Try to parse the signature data as PKCS7 containing certificates."""
+        exceptions = []
+
+        # Try DER format PKCS7
+        try:
+            certs = self.pkcs7.load_der_pkcs7_certificates(data)
+            if certs is not None:
+                return certs
+        except Exception as e:
+            exceptions.append(f"DER PKCS7 parsing failed: {str(e)}")
+
+        # Try PEM format PKCS7
+        try:
+            certs = self.pkcs7.load_pem_pkcs7_certificates(data)
+            if certs is not None:
+                return certs
+        except Exception as e:
+            exceptions.append(f"PEM PKCS7 parsing failed: {str(e)}")
+
+        raise ValueError(
+            "Unable to parse signature file as PKCS7 with certificates. Attempted methods:\n"
+            + "\n".join(exceptions)
+        )
+
+    def _extract_p7s_data_from_mime(self, raw_data: bytes) -> bytes:
+        """Extract the raw PKCS#7 signature data from a MIME message."""
+        msg: Message = email.message_from_bytes(raw_data)
+
+        # If the message is multipart, find the part with application/x-pkcs7-signature
+        if msg.is_multipart():
+            for part in msg.walk():
+                ctype = part.get_content_type()
+                if ctype == "application/x-pkcs7-signature":
+                    # Get the base64 encoded data from the payload
+                    payload = part.get_payload(decode=False)
+                    # payload at this stage is a base64 string
+                    try:
+                        return b64decode(payload)
+                    except Exception as e:
+                        raise ValueError(
+                            f"Failed to decode base64 PKCS#7 signature: {str(e)}"
+                        ) from e
+            # If we reach here, no PKCS#7 part was found
+            raise ValueError(
+                "No application/x-pkcs7-signature part found in the MIME message."
+            )
+        else:
+            # Not multipart, try to parse directly if it's just a raw P7S
+            # This scenario is less common; usually it's multipart.
+            if msg.get_content_type() == "application/x-pkcs7-signature":
+                payload = msg.get_payload(decode=False)
+                return b64decode(payload)
+
+            raise ValueError(
+                "The provided data does not contain a valid S/MIME signed message."
+            )
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest an S/MIME message and extract the PKCS#7 signature
+        information."""
+        # If data is a string, it might be base64 encoded, or it might be the raw MIME text.
+        # We should assume it's raw MIME text here because the input includes MIME headers.
+        if isinstance(data, str):
+            # Convert to bytes (raw MIME)
+            data = data.encode("utf-8")
+
+        try:
+            # Extract the raw PKCS#7 data (der/pem) from the MIME message
+            p7s_data = self._extract_p7s_data_from_mime(data)
+
+            # Parse the PKCS#7 data for certificates
+            certificates = self._try_parse_signature(p7s_data)
+
+            if not certificates:
+                yield "No certificates found in the provided P7S file."
+                return
+
+            # Process each certificate
+            for i, cert in enumerate(certificates, 1):
+                if cert_info := self._extract_cert_info(cert):
+                    yield f"Certificate {i}:"
+                    for key, value in cert_info.items():
+                        if value:
+                            yield f"{key.replace('_', ' ').title()}: {value}"
+                    yield ""  # Empty line between certificates
+                else:
+                    yield f"Certificate {i}: No detailed information extracted."
+
+        except Exception as e:
+            raise ValueError(f"Error processing P7S file: {str(e)}") from e
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/rst_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/rst_parser.py
new file mode 100644
index 00000000..76390655
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/rst_parser.py
@@ -0,0 +1,58 @@
+# type: ignore
+from typing import AsyncGenerator
+
+from docutils.core import publish_string
+from docutils.writers import html5_polyglot
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class RSTParser(AsyncParser[str | bytes]):
+    """Parser for reStructuredText (.rst) files."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.publish_string = publish_string
+        self.html5_polyglot = html5_polyglot
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        if isinstance(data, bytes):
+            data = data.decode("utf-8")
+
+        try:
+            # Convert RST to HTML
+            html = self.publish_string(
+                source=data,
+                writer=self.html5_polyglot.Writer(),
+                settings_overrides={"report_level": 5},
+            )
+
+            # Basic HTML cleanup
+            import re
+
+            text = html.decode("utf-8")
+            text = re.sub(r"<[^>]+>", " ", text)
+            text = re.sub(r"\s+", " ", text)
+
+            # Split into paragraphs and yield non-empty ones
+            paragraphs = text.split("\n\n")
+            for paragraph in paragraphs:
+                if paragraph.strip():
+                    yield paragraph.strip()
+
+        except Exception as e:
+            raise ValueError(f"Error processing RST file: {str(e)}") from e
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/tsv_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/tsv_parser.py
new file mode 100644
index 00000000..35478360
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/tsv_parser.py
@@ -0,0 +1,109 @@
+# type: ignore
+from typing import IO, AsyncGenerator
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class TSVParser(AsyncParser[str | bytes]):
+    """A parser for TSV (Tab Separated Values) data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+
+        import csv
+        from io import StringIO
+
+        self.csv = csv
+        self.StringIO = StringIO
+
+    async def ingest(
+        self, data: str | bytes, *args, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest TSV data and yield text from each row."""
+        if isinstance(data, bytes):
+            data = data.decode("utf-8")
+        tsv_reader = self.csv.reader(self.StringIO(data), delimiter="\t")
+        for row in tsv_reader:
+            yield ", ".join(row)  # Still join with comma for readability
+
+
+class TSVParserAdvanced(AsyncParser[str | bytes]):
+    """An advanced parser for TSV data with chunking support."""
+
+    def __init__(
+        self, config: IngestionConfig, llm_provider: CompletionProvider
+    ):
+        self.llm_provider = llm_provider
+        self.config = config
+
+        import csv
+        from io import StringIO
+
+        self.csv = csv
+        self.StringIO = StringIO
+
+    def validate_tsv(self, file: IO[bytes]) -> bool:
+        """Validate if the file is actually tab-delimited."""
+        num_bytes = 65536
+        lines = file.readlines(num_bytes)
+        file.seek(0)
+
+        if not lines:
+            return False
+
+        # Check if tabs exist in first few lines
+        sample = "\n".join(ln.decode("utf-8") for ln in lines[:5])
+        return "\t" in sample
+
+    async def ingest(
+        self,
+        data: str | bytes,
+        num_col_times_num_rows: int = 100,
+        *args,
+        **kwargs,
+    ) -> AsyncGenerator[str, None]:
+        """Ingest TSV data and yield text in chunks."""
+        if isinstance(data, bytes):
+            data = data.decode("utf-8")
+
+        # Validate TSV format
+        if not self.validate_tsv(self.StringIO(data)):
+            raise ValueError("File does not appear to be tab-delimited")
+
+        tsv_reader = self.csv.reader(self.StringIO(data), delimiter="\t")
+
+        # Get header
+        header = next(tsv_reader)
+        num_cols = len(header)
+        num_rows = num_col_times_num_rows // num_cols
+
+        chunk_rows = []
+        for row_num, row in enumerate(tsv_reader):
+            chunk_rows.append(row)
+            if row_num % num_rows == 0:
+                yield (
+                    ", ".join(header)
+                    + "\n"
+                    + "\n".join([", ".join(row) for row in chunk_rows])
+                )
+                chunk_rows = []
+
+        # Yield remaining rows
+        if chunk_rows:
+            yield (
+                ", ".join(header)
+                + "\n"
+                + "\n".join([", ".join(row) for row in chunk_rows])
+            )
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/xls_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/xls_parser.py
new file mode 100644
index 00000000..0bda9510
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/xls_parser.py
@@ -0,0 +1,140 @@
+# type: ignore
+from typing import AsyncGenerator
+
+import networkx as nx
+import numpy as np
+import xlrd
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class XLSParser(AsyncParser[str | bytes]):
+    """A parser for XLS (Excel 97-2003) data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.xlrd = xlrd
+
+    async def ingest(
+        self, data: bytes, *args, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest XLS data and yield text from each row."""
+        if isinstance(data, str):
+            raise ValueError("XLS data must be in bytes format.")
+
+        wb = self.xlrd.open_workbook(file_contents=data)
+        for sheet in wb.sheets():
+            for row_idx in range(sheet.nrows):
+                # Get all values in the row
+                row_values = []
+                for col_idx in range(sheet.ncols):
+                    cell = sheet.cell(row_idx, col_idx)
+                    # Handle different cell types
+                    if cell.ctype == self.xlrd.XL_CELL_DATE:
+                        try:
+                            value = self.xlrd.xldate_as_datetime(
+                                cell.value, wb.datemode
+                            ).strftime("%Y-%m-%d")
+                        except Exception:
+                            value = str(cell.value)
+                    elif cell.ctype == self.xlrd.XL_CELL_BOOLEAN:
+                        value = str(bool(cell.value)).lower()
+                    elif cell.ctype == self.xlrd.XL_CELL_ERROR:
+                        value = "#ERROR#"
+                    else:
+                        value = str(cell.value).strip()
+
+                    row_values.append(value)
+
+                # Yield non-empty rows
+                if any(val.strip() for val in row_values):
+                    yield ", ".join(row_values)
+
+
+class XLSParserAdvanced(AsyncParser[str | bytes]):
+    """An advanced parser for XLS data with chunking support."""
+
+    def __init__(
+        self, config: IngestionConfig, llm_provider: CompletionProvider
+    ):
+        self.llm_provider = llm_provider
+        self.config = config
+        self.nx = nx
+        self.np = np
+        self.xlrd = xlrd
+
+    def connected_components(self, arr):
+        g = self.nx.grid_2d_graph(len(arr), len(arr[0]))
+        empty_cell_indices = list(zip(*self.np.where(arr == ""), strict=False))
+        g.remove_nodes_from(empty_cell_indices)
+        components = self.nx.connected_components(g)
+        for component in components:
+            rows, cols = zip(*component, strict=False)
+            min_row, max_row = min(rows), max(rows)
+            min_col, max_col = min(cols), max(cols)
+            yield arr[min_row : max_row + 1, min_col : max_col + 1]
+
+    def get_cell_value(self, cell, workbook):
+        """Extract cell value handling different data types."""
+        if cell.ctype == self.xlrd.XL_CELL_DATE:
+            try:
+                return self.xlrd.xldate_as_datetime(
+                    cell.value, workbook.datemode
+                ).strftime("%Y-%m-%d")
+            except Exception:
+                return str(cell.value)
+        elif cell.ctype == self.xlrd.XL_CELL_BOOLEAN:
+            return str(bool(cell.value)).lower()
+        elif cell.ctype == self.xlrd.XL_CELL_ERROR:
+            return "#ERROR#"
+        else:
+            return str(cell.value).strip()
+
+    async def ingest(
+        self, data: bytes, num_col_times_num_rows: int = 100, *args, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest XLS data and yield text from each connected component."""
+        if isinstance(data, str):
+            raise ValueError("XLS data must be in bytes format.")
+
+        workbook = self.xlrd.open_workbook(file_contents=data)
+
+        for sheet in workbook.sheets():
+            # Convert sheet to numpy array with proper value handling
+            ws_data = self.np.array(
+                [
+                    [
+                        self.get_cell_value(sheet.cell(row, col), workbook)
+                        for col in range(sheet.ncols)
+                    ]
+                    for row in range(sheet.nrows)
+                ]
+            )
+
+            for table in self.connected_components(ws_data):
+                if len(table) <= 1:
+                    continue
+
+                num_rows = len(table)
+                num_rows_per_chunk = num_col_times_num_rows // num_rows
+                headers = ", ".join(table[0])
+
+                for i in range(1, num_rows, num_rows_per_chunk):
+                    chunk = table[i : i + num_rows_per_chunk]
+                    yield (
+                        headers
+                        + "\n"
+                        + "\n".join([", ".join(row) for row in chunk])
+                    )
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/xlsx_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/xlsx_parser.py
new file mode 100644
index 00000000..4c303177
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/xlsx_parser.py
@@ -0,0 +1,100 @@
+# type: ignore
+from io import BytesIO
+from typing import AsyncGenerator
+
+import networkx as nx
+import numpy as np
+from openpyxl import load_workbook
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class XLSXParser(AsyncParser[str | bytes]):
+    """A parser for XLSX data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.load_workbook = load_workbook
+
+    async def ingest(
+        self, data: bytes, *args, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest XLSX data and yield text from each row."""
+        if isinstance(data, str):
+            raise ValueError("XLSX data must be in bytes format.")
+
+        wb = self.load_workbook(filename=BytesIO(data))
+        for sheet in wb.worksheets:
+            for row in sheet.iter_rows(values_only=True):
+                yield ", ".join(map(str, row))
+
+
+class XLSXParserAdvanced(AsyncParser[str | bytes]):
+    """A parser for XLSX data."""
+
+    # identifies connected components in the excel graph and extracts data from each component
+    def __init__(
+        self, config: IngestionConfig, llm_provider: CompletionProvider
+    ):
+        self.llm_provider = llm_provider
+        self.config = config
+        self.nx = nx
+        self.np = np
+        self.load_workbook = load_workbook
+
+    def connected_components(self, arr):
+        g = self.nx.grid_2d_graph(len(arr), len(arr[0]))
+        empty_cell_indices = list(
+            zip(*self.np.where(arr is None), strict=False)
+        )
+        g.remove_nodes_from(empty_cell_indices)
+        components = self.nx.connected_components(g)
+        for component in components:
+            rows, cols = zip(*component, strict=False)
+            min_row, max_row = min(rows), max(rows)
+            min_col, max_col = min(cols), max(cols)
+            yield arr[min_row : max_row + 1, min_col : max_col + 1].astype(
+                "str"
+            )
+
+    async def ingest(
+        self, data: bytes, num_col_times_num_rows: int = 100, *args, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest XLSX data and yield text from each connected component."""
+        if isinstance(data, str):
+            raise ValueError("XLSX data must be in bytes format.")
+
+        workbook = self.load_workbook(filename=BytesIO(data))
+
+        for ws in workbook.worksheets:
+            ws_data = self.np.array(
+                [[cell.value for cell in row] for row in ws.iter_rows()]
+            )
+            for table in self.connected_components(ws_data):
+                # parse like a csv parser, assumes that the first row has column names
+                if len(table) <= 1:
+                    continue
+
+                num_rows = len(table)
+                num_rows_per_chunk = num_col_times_num_rows // num_rows
+                headers = ", ".join(table[0])
+                # add header to each one
+                for i in range(1, num_rows, num_rows_per_chunk):
+                    chunk = table[i : i + num_rows_per_chunk]
+                    yield (
+                        headers
+                        + "\n"
+                        + "\n".join([", ".join(row) for row in chunk])
+                    )
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/text/__init__.py b/.venv/lib/python3.12/site-packages/core/parsers/text/__init__.py
new file mode 100644
index 00000000..8f85d046
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/text/__init__.py
@@ -0,0 +1,10 @@
+# type: ignore
+from .html_parser import HTMLParser
+from .md_parser import MDParser
+from .text_parser import TextParser
+
+__all__ = [
+    "MDParser",
+    "HTMLParser",
+    "TextParser",
+]
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/text/html_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/text/html_parser.py
new file mode 100644
index 00000000..a04331e0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/text/html_parser.py
@@ -0,0 +1,32 @@
+# type: ignore
+from typing import AsyncGenerator
+
+from bs4 import BeautifulSoup
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class HTMLParser(AsyncParser[str | bytes]):
+    """A parser for HTML data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+
+    async def ingest(
+        self, data: str | bytes, *args, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest HTML data and yield text."""
+        soup = BeautifulSoup(data, "html.parser")
+        yield soup.get_text()
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/text/md_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/text/md_parser.py
new file mode 100644
index 00000000..7ab11d92
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/text/md_parser.py
@@ -0,0 +1,39 @@
+# type: ignore
+from typing import AsyncGenerator
+
+from bs4 import BeautifulSoup
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class MDParser(AsyncParser[str | bytes]):
+    """A parser for Markdown data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+
+        import markdown
+
+        self.markdown = markdown
+
+    async def ingest(
+        self, data: str | bytes, *args, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        """Ingest Markdown data and yield text."""
+        if isinstance(data, bytes):
+            data = data.decode("utf-8")
+        html = self.markdown.markdown(data)
+        soup = BeautifulSoup(html, "html.parser")
+        yield soup.get_text()
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/text/text_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/text/text_parser.py
new file mode 100644
index 00000000..51ff1cbd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/text/text_parser.py
@@ -0,0 +1,30 @@
+# type: ignore
+from typing import AsyncGenerator
+
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+
+class TextParser(AsyncParser[str | bytes]):
+    """A parser for raw text data."""
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+
+    async def ingest(
+        self, data: str | bytes, *args, **kwargs
+    ) -> AsyncGenerator[str | bytes, None]:
+        if isinstance(data, bytes):
+            data = data.decode("utf-8")
+        yield data