diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/parsers')
28 files changed, 2483 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/__init__.py b/.venv/lib/python3.12/site-packages/core/parsers/__init__.py new file mode 100644 index 00000000..8a7d5bbe --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/__init__.py @@ -0,0 +1,35 @@ +from .media import * +from .structured import * +from .text import * + +__all__ = [ + "AudioParser", + "BMPParser", + "DOCParser", + "DOCXParser", + "ImageParser", + "ODTParser", + "VLMPDFParser", + "BasicPDFParser", + "PDFParserUnstructured", + "VLMPDFParser", + "PPTParser", + "PPTXParser", + "RTFParser", + "CSVParser", + "CSVParserAdvanced", + "EMLParser", + "EPUBParser", + "JSONParser", + "MSGParser", + "ORGParser", + "P7SParser", + "RSTParser", + "TSVParser", + "XLSParser", + "XLSXParser", + "XLSXParserAdvanced", + "MDParser", + "HTMLParser", + "TextParser", +] diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py b/.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py new file mode 100644 index 00000000..c268b673 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/__init__.py @@ -0,0 +1,26 @@ +# type: ignore +from .audio_parser import AudioParser +from .bmp_parser import BMPParser +from .doc_parser import DOCParser +from .docx_parser import DOCXParser +from .img_parser import ImageParser +from .odt_parser import ODTParser +from .pdf_parser import BasicPDFParser, PDFParserUnstructured, VLMPDFParser +from .ppt_parser import PPTParser +from .pptx_parser import PPTXParser +from .rtf_parser import RTFParser + +__all__ = [ + "AudioParser", + "BMPParser", + "DOCParser", + "DOCXParser", + "ImageParser", + "ODTParser", + "VLMPDFParser", + "BasicPDFParser", + "PDFParserUnstructured", + "PPTParser", + "PPTXParser", + "RTFParser", +] diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py new file mode 100644 index 00000000..7d5f9f1d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/audio_parser.py @@ -0,0 +1,74 @@ +# type: ignore +import logging +import os +import tempfile +from typing import AsyncGenerator + +from litellm import atranscription + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + +logger = logging.getLogger() + + +class AudioParser(AsyncParser[bytes]): + """A parser for audio data using Whisper transcription.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.atranscription = atranscription + + async def ingest( # type: ignore + self, data: bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest audio data and yield a transcription using Whisper via + LiteLLM. + + Args: + data: Raw audio bytes + *args, **kwargs: Additional arguments passed to the transcription call + + Yields: + Chunks of transcribed text + """ + try: + # Create a temporary file to store the audio data + with tempfile.NamedTemporaryFile( + suffix=".wav", delete=False + ) as temp_file: + temp_file.write(data) + temp_file_path = temp_file.name + + # Call Whisper transcription + response = await self.atranscription( + model=self.config.audio_transcription_model + or self.config.app.audio_lm, + file=open(temp_file_path, "rb"), + **kwargs, + ) + + # The response should contain the transcribed text directly + yield response.text + + except Exception as e: + logger.error(f"Error processing audio with Whisper: {str(e)}") + raise + + finally: + # Clean up the temporary file + try: + os.unlink(temp_file_path) + except Exception as e: + logger.warning(f"Failed to delete temporary file: {str(e)}") diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py new file mode 100644 index 00000000..78646da7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/bmp_parser.py @@ -0,0 +1,78 @@ +# type: ignore +from typing import AsyncGenerator + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class BMPParser(AsyncParser[str | bytes]): + """A parser for BMP image data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + import struct + + self.struct = struct + + async def extract_bmp_metadata(self, data: bytes) -> dict: + """Extract metadata from BMP file header.""" + try: + # BMP header format + header_format = "<2sIHHI" + header_size = self.struct.calcsize(header_format) + + # Unpack header data + ( + signature, + file_size, + reserved, + reserved2, + data_offset, + ) = self.struct.unpack(header_format, data[:header_size]) + + # DIB header + dib_format = "<IiiHHIIiiII" + dib_size = self.struct.calcsize(dib_format) + dib_data = self.struct.unpack(dib_format, data[14 : 14 + dib_size]) + + width = dib_data[1] + height = abs(dib_data[2]) # Height can be negative + bits_per_pixel = dib_data[4] + compression = dib_data[5] + + return { + "width": width, + "height": height, + "bits_per_pixel": bits_per_pixel, + "file_size": file_size, + "compression": compression, + } + except Exception as e: + return {"error": f"Failed to parse BMP header: {str(e)}"} + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest BMP data and yield metadata description.""" + if isinstance(data, str): + # Convert base64 string to bytes if needed + import base64 + + data = base64.b64decode(data) + + metadata = await self.extract_bmp_metadata(data) + + # Generate description of the BMP file + yield f"BMP image with dimensions {metadata.get('width', 'unknown')}x{metadata.get('height', 'unknown')} pixels, {metadata.get('bits_per_pixel', 'unknown')} bits per pixel, file size: {metadata.get('file_size', 'unknown')} bytes" diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py new file mode 100644 index 00000000..5b49e2cc --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/doc_parser.py @@ -0,0 +1,108 @@ +# type: ignore +import re +from io import BytesIO +from typing import AsyncGenerator + +import olefile + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class DOCParser(AsyncParser[str | bytes]): + """A parser for DOC (legacy Microsoft Word) data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.olefile = olefile + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest DOC data and yield text from the document.""" + if isinstance(data, str): + raise ValueError("DOC data must be in bytes format.") + + # Create BytesIO object from the data + file_obj = BytesIO(data) + + try: + # Open the DOC file using olefile + ole = self.olefile.OleFileIO(file_obj) + + # Check if it's a Word document + if not ole.exists("WordDocument"): + raise ValueError("Not a valid Word document") + + # Read the WordDocument stream + word_stream = ole.openstream("WordDocument").read() + + # Read the text from the 0Table or 1Table stream (contains the text) + if ole.exists("1Table"): + table_stream = ole.openstream("1Table").read() + elif ole.exists("0Table"): + table_stream = ole.openstream("0Table").read() + else: + table_stream = b"" + + # Extract text content + text = self._extract_text(word_stream, table_stream) + + # Clean and split the text + paragraphs = self._clean_text(text) + + # Yield non-empty paragraphs + for paragraph in paragraphs: + if paragraph.strip(): + yield paragraph.strip() + + except Exception as e: + raise ValueError(f"Error processing DOC file: {str(e)}") from e + finally: + ole.close() + file_obj.close() + + def _extract_text(self, word_stream: bytes, table_stream: bytes) -> str: + """Extract text from Word document streams.""" + try: + text = word_stream.replace(b"\x00", b"").decode( + "utf-8", errors="ignore" + ) + + # If table_stream exists, try to extract additional text + if table_stream: + table_text = table_stream.replace(b"\x00", b"").decode( + "utf-8", errors="ignore" + ) + text += table_text + + return text + except Exception as e: + raise ValueError(f"Error extracting text: {str(e)}") from e + + def _clean_text(self, text: str) -> list[str]: + """Clean and split the extracted text into paragraphs.""" + # Remove binary artifacts and control characters + text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F-\xFF]", "", text) + + # Remove multiple spaces and newlines + text = re.sub(r"\s+", " ", text) + + # Split into paragraphs on double newlines or other common separators + paragraphs = re.split(r"\n\n|\r\n\r\n|\f", text) + + # Remove empty or whitespace-only paragraphs + paragraphs = [p.strip() for p in paragraphs if p.strip()] + + return paragraphs diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py new file mode 100644 index 00000000..988f8341 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/docx_parser.py @@ -0,0 +1,38 @@ +# type: ignore +from io import BytesIO +from typing import AsyncGenerator + +from docx import Document + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class DOCXParser(AsyncParser[str | bytes]): + """A parser for DOCX data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.Document = Document + + async def ingest( + self, data: str | bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: # type: ignore + """Ingest DOCX data and yield text from each paragraph.""" + if isinstance(data, str): + raise ValueError("DOCX data must be in bytes format.") + + doc = self.Document(BytesIO(data)) + for paragraph in doc.paragraphs: + yield paragraph.text diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py new file mode 100644 index 00000000..bcb37eab --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py @@ -0,0 +1,281 @@ +# type: ignore +import base64 +import logging +from io import BytesIO +from typing import AsyncGenerator, Optional + +import filetype +import pillow_heif +from PIL import Image + +from core.base.abstractions import GenerationConfig +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + +logger = logging.getLogger() + + +class ImageParser(AsyncParser[str | bytes]): + # Mapping of file extensions to MIME types + MIME_TYPE_MAPPING = { + "bmp": "image/bmp", + "gif": "image/gif", + "heic": "image/heic", + "jpeg": "image/jpeg", + "jpg": "image/jpeg", + "png": "image/png", + "tiff": "image/tiff", + "tif": "image/tiff", + "webp": "image/webp", + } + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.vision_prompt_text = None + self.Image = Image + self.pillow_heif = pillow_heif + self.pillow_heif.register_heif_opener() + + def _is_heic(self, data: bytes) -> bool: + """Detect HEIC format using magic numbers and patterns.""" + heic_patterns = [ + b"ftyp", + b"heic", + b"heix", + b"hevc", + b"HEIC", + b"mif1", + b"msf1", + b"hevc", + b"hevx", + ] + + try: + header = data[:32] # Get first 32 bytes + return any(pattern in header for pattern in heic_patterns) + except Exception as e: + logger.error(f"Error checking for HEIC format: {str(e)}") + return False + + async def _convert_heic_to_jpeg(self, data: bytes) -> bytes: + """Convert HEIC image to JPEG format.""" + try: + # Create BytesIO object for input + input_buffer = BytesIO(data) + + # Load HEIC image using pillow_heif + heif_file = self.pillow_heif.read_heif(input_buffer) + + # Get the primary image - API changed, need to get first image + heif_image = heif_file[0] # Get first image in the container + + # Convert to PIL Image directly from the HEIF image + pil_image = heif_image.to_pillow() + + # Convert to RGB if needed + if pil_image.mode != "RGB": + pil_image = pil_image.convert("RGB") + + # Save as JPEG + output_buffer = BytesIO() + pil_image.save(output_buffer, format="JPEG", quality=95) + return output_buffer.getvalue() + + except Exception as e: + logger.error(f"Error converting HEIC to JPEG: {str(e)}") + raise + + def _is_jpeg(self, data: bytes) -> bool: + """Detect JPEG format using magic numbers.""" + return len(data) >= 2 and data[0] == 0xFF and data[1] == 0xD8 + + def _is_png(self, data: bytes) -> bool: + """Detect PNG format using magic numbers.""" + png_signature = b"\x89PNG\r\n\x1a\n" + return data.startswith(png_signature) + + def _is_bmp(self, data: bytes) -> bool: + """Detect BMP format using magic numbers.""" + return data.startswith(b"BM") + + def _is_tiff(self, data: bytes) -> bool: + """Detect TIFF format using magic numbers.""" + return ( + data.startswith(b"II*\x00") # Little-endian + or data.startswith(b"MM\x00*") + ) # Big-endian + + def _get_image_media_type( + self, data: bytes, filename: Optional[str] = None + ) -> str: + """ + Determine the correct media type based on image data and/or filename. + + Args: + data: The binary image data + filename: Optional filename which may contain extension information + + Returns: + str: The MIME type for the image + """ + try: + # First, try format-specific detection functions + if self._is_heic(data): + return "image/heic" + if self._is_jpeg(data): + return "image/jpeg" + if self._is_png(data): + return "image/png" + if self._is_bmp(data): + return "image/bmp" + if self._is_tiff(data): + return "image/tiff" + + # Try using filetype as a fallback + img_type = filetype.guess(data) + if img_type: + # Map the detected type to a MIME type + return self.MIME_TYPE_MAPPING.get( + img_type, f"image/{img_type}" + ) + + # If we have a filename, try to get the type from the extension + if filename: + extension = filename.split(".")[-1].lower() + if extension in self.MIME_TYPE_MAPPING: + return self.MIME_TYPE_MAPPING[extension] + + # If all else fails, default to octet-stream (generic binary) + logger.warning( + "Could not determine image type, using application/octet-stream" + ) + return "application/octet-stream" + + except Exception as e: + logger.error(f"Error determining image media type: {str(e)}") + return "application/octet-stream" # Default to generic binary as fallback + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + if not self.vision_prompt_text: + self.vision_prompt_text = ( + await self.database_provider.prompts_handler.get_cached_prompt( + prompt_name=self.config.vision_img_prompt_name + ) + ) + try: + filename = kwargs.get("filename", None) + # Whether to convert HEIC to JPEG (default: True for backward compatibility) + convert_heic = kwargs.get("convert_heic", True) + + if isinstance(data, bytes): + try: + # First detect the original media type + original_media_type = self._get_image_media_type( + data, filename + ) + logger.debug( + f"Detected original image type: {original_media_type}" + ) + + # Determine if we need to convert HEIC + is_heic_format = self._is_heic(data) + + # Handle HEIC images + if is_heic_format and convert_heic: + logger.debug( + "Detected HEIC format, converting to JPEG" + ) + data = await self._convert_heic_to_jpeg(data) + media_type = "image/jpeg" + else: + # Keep original format and media type + media_type = original_media_type + + # Encode the data to base64 + image_data = base64.b64encode(data).decode("utf-8") + + except Exception as e: + logger.error(f"Error processing image data: {str(e)}") + raise + else: + # If data is already a string (base64), we assume it has a reliable content type + # from the source that encoded it + image_data = data + + # Try to determine the media type from the context if available + media_type = kwargs.get( + "media_type", "application/octet-stream" + ) + + # Get the model from kwargs or config + model = kwargs.get("vlm", None) or self.config.app.vlm + + generation_config = GenerationConfig( + model=model, + stream=False, + ) + + logger.debug(f"Using model: {model}, media_type: {media_type}") + + if "anthropic" in model: + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": self.vision_prompt_text}, + { + "type": "image", + "source": { + "type": "base64", + "media_type": media_type, + "data": image_data, + }, + }, + ], + } + ] + else: + # For OpenAI-style APIs, use their format + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": self.vision_prompt_text}, + { + "type": "image_url", + "image_url": { + "url": f"data:{media_type};base64,{image_data}" + }, + }, + ], + } + ] + + response = await self.llm_provider.aget_completion( + messages=messages, generation_config=generation_config + ) + + if response.choices and response.choices[0].message: + content = response.choices[0].message.content + if not content: + raise ValueError("No content in response") + yield content + else: + raise ValueError("No response content") + + except Exception as e: + logger.error(f"Error processing image with vision model: {str(e)}") + raise diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py new file mode 100644 index 00000000..cb146464 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/odt_parser.py @@ -0,0 +1,60 @@ +# type: ignore +import xml.etree.ElementTree as ET +import zipfile +from typing import AsyncGenerator + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class ODTParser(AsyncParser[str | bytes]): + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.zipfile = zipfile + self.ET = ET + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + if isinstance(data, str): + raise ValueError("ODT data must be in bytes format.") + + from io import BytesIO + + file_obj = BytesIO(data) + + try: + with self.zipfile.ZipFile(file_obj) as odt: + # ODT files are zip archives containing content.xml + content = odt.read("content.xml") + root = self.ET.fromstring(content) + + # ODT XML namespace + ns = {"text": "urn:oasis:names:tc:opendocument:xmlns:text:1.0"} + + # Extract paragraphs and headers + for p in root.findall(".//text:p", ns): + text = "".join(p.itertext()) + if text.strip(): + yield text.strip() + + for h in root.findall(".//text:h", ns): + text = "".join(h.itertext()) + if text.strip(): + yield text.strip() + + except Exception as e: + raise ValueError(f"Error processing ODT file: {str(e)}") from e + finally: + file_obj.close() diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py new file mode 100644 index 00000000..b33ccb63 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/pdf_parser.py @@ -0,0 +1,363 @@ +# type: ignore +import asyncio +import base64 +import json +import logging +import string +import time +import unicodedata +from io import BytesIO +from typing import AsyncGenerator + +from pdf2image import convert_from_bytes, convert_from_path +from pdf2image.exceptions import PDFInfoNotInstalledError +from PIL import Image +from pypdf import PdfReader + +from core.base.abstractions import GenerationConfig +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) +from shared.abstractions import PDFParsingError, PopplerNotFoundError + +logger = logging.getLogger() + + +class VLMPDFParser(AsyncParser[str | bytes]): + """A parser for PDF documents using vision models for page processing.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.vision_prompt_text = None + + async def convert_pdf_to_images( + self, data: str | bytes + ) -> list[Image.Image]: + """Convert PDF pages to images asynchronously using in-memory + conversion.""" + logger.info("Starting PDF conversion to images.") + start_time = time.perf_counter() + options = { + "dpi": 300, # You can make this configurable via self.config if needed + "fmt": "jpeg", + "thread_count": 4, + "paths_only": False, # Return PIL Image objects instead of writing to disk + } + try: + if isinstance(data, bytes): + images = await asyncio.to_thread( + convert_from_bytes, data, **options + ) + else: + images = await asyncio.to_thread( + convert_from_path, data, **options + ) + elapsed = time.perf_counter() - start_time + logger.info( + f"PDF conversion completed in {elapsed:.2f} seconds, total pages: {len(images)}" + ) + return images + except PDFInfoNotInstalledError as e: + logger.error( + "PDFInfoNotInstalledError encountered during PDF conversion." + ) + raise PopplerNotFoundError() from e + except Exception as err: + logger.error( + f"Error converting PDF to images: {err} type: {type(err)}" + ) + raise PDFParsingError( + f"Failed to process PDF: {str(err)}", err + ) from err + + async def process_page( + self, image: Image.Image, page_num: int + ) -> dict[str, str]: + """Process a single PDF page using the vision model.""" + page_start = time.perf_counter() + try: + # Convert PIL image to JPEG bytes in-memory + buf = BytesIO() + image.save(buf, format="JPEG") + buf.seek(0) + image_data = buf.read() + image_base64 = base64.b64encode(image_data).decode("utf-8") + + model = self.config.app.vlm + + # Configure generation parameters + generation_config = GenerationConfig( + model=self.config.app.vlm, + stream=False, + ) + + is_anthropic = model and "anthropic/" in model + + # FIXME: This is a hacky fix to handle the different formats + # that was causing an outage. This logic really needs to be refactored + # and cleaned up such that it handles providers more robustly. + + # Prepare message with image content + if is_anthropic: + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": self.vision_prompt_text}, + { + "type": "image", + "source": { + "type": "base64", + "media_type": "image/jpeg", + "data": image_base64, + }, + }, + ], + } + ] + else: + # Use OpenAI format + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": self.vision_prompt_text}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{image_base64}" + }, + }, + ], + } + ] + + logger.debug(f"Sending page {page_num} to vision model.") + req_start = time.perf_counter() + if is_anthropic: + response = await self.llm_provider.aget_completion( + messages=messages, + generation_config=generation_config, + tools=[ + { + "name": "parse_pdf_page", + "description": "Parse text content from a PDF page", + "input_schema": { + "type": "object", + "properties": { + "page_content": { + "type": "string", + "description": "Extracted text from the PDF page", + }, + }, + "required": ["page_content"], + }, + } + ], + tool_choice={"type": "tool", "name": "parse_pdf_page"}, + ) + + if ( + response.choices + and response.choices[0].message + and response.choices[0].message.tool_calls + ): + tool_call = response.choices[0].message.tool_calls[0] + args = json.loads(tool_call.function.arguments) + content = args.get("page_content", "") + page_elapsed = time.perf_counter() - page_start + logger.debug( + f"Processed page {page_num} in {page_elapsed:.2f} seconds." + ) + + return {"page": str(page_num), "content": content} + else: + logger.warning( + f"No valid tool call in response for page {page_num}, document might be missing text." + ) + else: + response = await self.llm_provider.aget_completion( + messages=messages, generation_config=generation_config + ) + req_elapsed = time.perf_counter() - req_start + logger.debug( + f"Vision model response for page {page_num} received in {req_elapsed:.2f} seconds." + ) + + if response.choices and response.choices[0].message: + content = response.choices[0].message.content + page_elapsed = time.perf_counter() - page_start + logger.debug( + f"Processed page {page_num} in {page_elapsed:.2f} seconds." + ) + return {"page": str(page_num), "content": content} + else: + msg = f"No response content for page {page_num}" + logger.error(msg) + raise ValueError(msg) + except Exception as e: + logger.error( + f"Error processing page {page_num} with vision model: {str(e)}" + ) + raise + + async def ingest( + self, data: str | bytes, maintain_order: bool = True, **kwargs + ) -> AsyncGenerator[dict[str, str | int], None]: + """Ingest PDF data and yield the text description for each page using + the vision model. + + (This version yields a string per page rather than a dictionary.) + """ + ingest_start = time.perf_counter() + logger.info("Starting PDF ingestion using VLMPDFParser.") + if not self.vision_prompt_text: + self.vision_prompt_text = ( + await self.database_provider.prompts_handler.get_cached_prompt( + prompt_name=self.config.vision_pdf_prompt_name + ) + ) + logger.info("Retrieved vision prompt text from database.") + + try: + # Convert PDF to images (in-memory) + images = await self.convert_pdf_to_images(data) + + # Create asynchronous tasks for processing each page + tasks = { + asyncio.create_task( + self.process_page(image, page_num) + ): page_num + for page_num, image in enumerate(images, 1) + } + + if maintain_order: + pending = set(tasks.keys()) + results = {} + next_page = 1 + while pending: + done, pending = await asyncio.wait( + pending, return_when=asyncio.FIRST_COMPLETED + ) + for task in done: + result = await task + page_num = int(result["page"]) + results[page_num] = result + while next_page in results: + yield { + "content": results[next_page]["content"] or "", + "page_number": next_page, + } + results.pop(next_page) + next_page += 1 + else: + # Yield results as tasks complete + for coro in asyncio.as_completed(tasks.keys()): + result = await coro + yield { + "content": result["content"], + "page_number": int(result["page"]), + } + total_elapsed = time.perf_counter() - ingest_start + logger.info( + f"Completed PDF ingestion in {total_elapsed:.2f} seconds using VLMPDFParser." + ) + except Exception as e: + logger.error(f"Error processing PDF: {str(e)}") + raise + + +class BasicPDFParser(AsyncParser[str | bytes]): + """A parser for PDF data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.PdfReader = PdfReader + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest PDF data and yield text from each page.""" + if isinstance(data, str): + raise ValueError("PDF data must be in bytes format.") + pdf = self.PdfReader(BytesIO(data)) + for page in pdf.pages: + page_text = page.extract_text() + if page_text is not None: + page_text = "".join( + filter( + lambda x: ( + unicodedata.category(x) + in [ + "Ll", + "Lu", + "Lt", + "Lm", + "Lo", + "Nl", + "No", + ] # Keep letters and numbers + or "\u4e00" <= x <= "\u9fff" # Chinese characters + or "\u0600" <= x <= "\u06ff" # Arabic characters + or "\u0400" <= x <= "\u04ff" # Cyrillic letters + or "\u0370" <= x <= "\u03ff" # Greek letters + or "\u0e00" <= x <= "\u0e7f" # Thai + or "\u3040" <= x <= "\u309f" # Japanese Hiragana + or "\u30a0" <= x <= "\u30ff" # Katakana + or x in string.printable + ), + page_text, + ) + ) # Keep characters in common languages ; # Filter out non-printable characters + yield page_text + + +class PDFParserUnstructured(AsyncParser[str | bytes]): + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + try: + from unstructured.partition.pdf import partition_pdf + + self.partition_pdf = partition_pdf + + except ImportError as e: + logger.error("PDFParserUnstructured ImportError : ", e) + + async def ingest( + self, + data: str | bytes, + partition_strategy: str = "hi_res", + chunking_strategy="by_title", + ) -> AsyncGenerator[str, None]: + # partition the pdf + elements = self.partition_pdf( + file=BytesIO(data), + partition_strategy=partition_strategy, + chunking_strategy=chunking_strategy, + ) + for element in elements: + yield element.text diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py new file mode 100644 index 00000000..c8bbaa55 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/ppt_parser.py @@ -0,0 +1,88 @@ +# type: ignore +import struct +from io import BytesIO +from typing import AsyncGenerator + +import olefile + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class PPTParser(AsyncParser[str | bytes]): + """A parser for legacy PPT (PowerPoint 97-2003) files.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.olefile = olefile + + def _extract_text_from_record(self, data: bytes) -> str: + """Extract text from a PPT text record.""" + try: + # Skip record header + text_data = data[8:] + # Convert from UTF-16-LE + return text_data.decode("utf-16-le", errors="ignore").strip() + except Exception: + return "" + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest PPT data and yield text from each slide.""" + if isinstance(data, str): + raise ValueError("PPT data must be in bytes format.") + + try: + ole = self.olefile.OleFileIO(BytesIO(data)) + + # PPT stores text in PowerPoint Document stream + if not ole.exists("PowerPoint Document"): + raise ValueError("Not a valid PowerPoint file") + + # Read PowerPoint Document stream + ppt_stream = ole.openstream("PowerPoint Document") + content = ppt_stream.read() + + # Text records start with 0x0FA0 or 0x0FD0 + text_markers = [b"\xa0\x0f", b"\xd0\x0f"] + + current_position = 0 + while current_position < len(content): + # Look for text markers + for marker in text_markers: + marker_pos = content.find(marker, current_position) + if marker_pos != -1: + # Get record size from header (4 bytes after marker) + size_bytes = content[marker_pos + 2 : marker_pos + 6] + record_size = struct.unpack("<I", size_bytes)[0] + + # Extract record data + record_data = content[ + marker_pos : marker_pos + record_size + 8 + ] + text = self._extract_text_from_record(record_data) + + if text.strip(): + yield text.strip() + + current_position = marker_pos + record_size + 8 + break + else: + current_position += 1 + + except Exception as e: + raise ValueError(f"Error processing PPT file: {str(e)}") from e + finally: + ole.close() diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py new file mode 100644 index 00000000..8685c8fb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/pptx_parser.py @@ -0,0 +1,40 @@ +# type: ignore +from io import BytesIO +from typing import AsyncGenerator + +from pptx import Presentation + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class PPTXParser(AsyncParser[str | bytes]): + """A parser for PPT data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.Presentation = Presentation + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: # type: ignore + """Ingest PPT data and yield text from each slide.""" + if isinstance(data, str): + raise ValueError("PPT data must be in bytes format.") + + prs = self.Presentation(BytesIO(data)) + for slide in prs.slides: + for shape in slide.shapes: + if hasattr(shape, "text"): + yield shape.text diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py new file mode 100644 index 00000000..6be12076 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/rtf_parser.py @@ -0,0 +1,45 @@ +# type: ignore +from typing import AsyncGenerator + +from striprtf.striprtf import rtf_to_text + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class RTFParser(AsyncParser[str | bytes]): + """Parser for Rich Text Format (.rtf) files.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.striprtf = rtf_to_text + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + if isinstance(data, bytes): + data = data.decode("utf-8", errors="ignore") + + try: + # Convert RTF to plain text + plain_text = self.striprtf(data) + + # Split into paragraphs and yield non-empty ones + paragraphs = plain_text.split("\n\n") + for paragraph in paragraphs: + if paragraph.strip(): + yield paragraph.strip() + + except Exception as e: + raise ValueError(f"Error processing RTF file: {str(e)}") from e diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/__init__.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/__init__.py new file mode 100644 index 00000000..a770502e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/__init__.py @@ -0,0 +1,28 @@ +# type: ignore +from .csv_parser import CSVParser, CSVParserAdvanced +from .eml_parser import EMLParser +from .epub_parser import EPUBParser +from .json_parser import JSONParser +from .msg_parser import MSGParser +from .org_parser import ORGParser +from .p7s_parser import P7SParser +from .rst_parser import RSTParser +from .tsv_parser import TSVParser +from .xls_parser import XLSParser +from .xlsx_parser import XLSXParser, XLSXParserAdvanced + +__all__ = [ + "CSVParser", + "CSVParserAdvanced", + "EMLParser", + "EPUBParser", + "JSONParser", + "MSGParser", + "ORGParser", + "P7SParser", + "RSTParser", + "TSVParser", + "XLSParser", + "XLSXParser", + "XLSXParserAdvanced", +] diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/csv_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/csv_parser.py new file mode 100644 index 00000000..d80d5d07 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/csv_parser.py @@ -0,0 +1,108 @@ +# type: ignore +from typing import IO, AsyncGenerator, Optional + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class CSVParser(AsyncParser[str | bytes]): + """A parser for CSV data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + import csv + from io import StringIO + + self.csv = csv + self.StringIO = StringIO + + async def ingest( + self, data: str | bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest CSV data and yield text from each row.""" + if isinstance(data, bytes): + data = data.decode("utf-8") + csv_reader = self.csv.reader(self.StringIO(data)) + for row in csv_reader: + yield ", ".join(row) + + +class CSVParserAdvanced(AsyncParser[str | bytes]): + """A parser for CSV data.""" + + def __init__( + self, config: IngestionConfig, llm_provider: CompletionProvider + ): + self.llm_provider = llm_provider + self.config = config + + import csv + from io import StringIO + + self.csv = csv + self.StringIO = StringIO + + def get_delimiter( + self, file_path: Optional[str] = None, file: Optional[IO[bytes]] = None + ): + sniffer = self.csv.Sniffer() + num_bytes = 65536 + + if file: + lines = file.readlines(num_bytes) + file.seek(0) + data = "\n".join(ln.decode("utf-8") for ln in lines) + elif file_path is not None: + with open(file_path) as f: + data = "\n".join(f.readlines(num_bytes)) + + return sniffer.sniff(data, delimiters=",;").delimiter + + async def ingest( + self, + data: str | bytes, + num_col_times_num_rows: int = 100, + *args, + **kwargs, + ) -> AsyncGenerator[str, None]: + """Ingest CSV data and yield text from each row.""" + if isinstance(data, bytes): + data = data.decode("utf-8") + # let the first row be the header + delimiter = self.get_delimiter(file=self.StringIO(data)) + + csv_reader = self.csv.reader(self.StringIO(data), delimiter=delimiter) + + header = next(csv_reader) + num_cols = len(header.split(delimiter)) + num_rows = num_col_times_num_rows // num_cols + + chunk_rows = [] + for row_num, row in enumerate(csv_reader): + chunk_rows.append(row) + if row_num % num_rows == 0: + yield ( + ", ".join(header) + + "\n" + + "\n".join([", ".join(row) for row in chunk_rows]) + ) + chunk_rows = [] + + if chunk_rows: + yield ( + ", ".join(header) + + "\n" + + "\n".join([", ".join(row) for row in chunk_rows]) + ) diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/eml_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/eml_parser.py new file mode 100644 index 00000000..57a5ceab --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/eml_parser.py @@ -0,0 +1,63 @@ +# type: ignore +from email import message_from_bytes, policy +from typing import AsyncGenerator + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class EMLParser(AsyncParser[str | bytes]): + """Parser for EML (email) files.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest EML data and yield email content.""" + if isinstance(data, str): + raise ValueError("EML data must be in bytes format.") + + # Parse email with policy for modern email handling + email_message = message_from_bytes(data, policy=policy.default) + + # Extract and yield email metadata + metadata = [] + if email_message["Subject"]: + metadata.append(f"Subject: {email_message['Subject']}") + if email_message["From"]: + metadata.append(f"From: {email_message['From']}") + if email_message["To"]: + metadata.append(f"To: {email_message['To']}") + if email_message["Date"]: + metadata.append(f"Date: {email_message['Date']}") + + if metadata: + yield "\n".join(metadata) + + # Extract and yield email body + if email_message.is_multipart(): + for part in email_message.walk(): + if part.get_content_type() == "text/plain": + text = part.get_content() + if text.strip(): + yield text.strip() + elif part.get_content_type() == "text/html": + # Could add HTML parsing here if needed + continue + else: + body = email_message.get_content() + if body.strip(): + yield body.strip() diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/epub_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/epub_parser.py new file mode 100644 index 00000000..ff51fb86 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/epub_parser.py @@ -0,0 +1,121 @@ +# type: ignore +import logging +from typing import AsyncGenerator + +import epub + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + +logger = logging.getLogger(__name__) + + +class EPUBParser(AsyncParser[str | bytes]): + """Parser for EPUB electronic book files.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.epub = epub + + def _safe_get_metadata(self, book, field: str) -> str | None: + """Safely extract metadata field from epub book.""" + try: + return getattr(book, field, None) or getattr(book.opf, field, None) + except Exception as e: + logger.debug(f"Error getting {field} metadata: {e}") + return None + + def _clean_text(self, content: bytes) -> str: + """Clean HTML content and return plain text.""" + try: + import re + + text = content.decode("utf-8", errors="ignore") + # Remove HTML tags + text = re.sub(r"<[^>]+>", " ", text) + # Normalize whitespace + text = re.sub(r"\s+", " ", text) + # Remove any remaining HTML entities + text = re.sub(r"&[^;]+;", " ", text) + return text.strip() + except Exception as e: + logger.warning(f"Error cleaning text: {e}") + return "" + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest EPUB data and yield book content.""" + if isinstance(data, str): + raise ValueError("EPUB data must be in bytes format.") + + from io import BytesIO + + file_obj = BytesIO(data) + + try: + book = self.epub.open_epub(file_obj) + + # Safely extract metadata + metadata = [] + for field, label in [ + ("title", "Title"), + ("creator", "Author"), + ("language", "Language"), + ("publisher", "Publisher"), + ("date", "Date"), + ]: + if value := self._safe_get_metadata(book, field): + metadata.append(f"{label}: {value}") + + if metadata: + yield "\n".join(metadata) + + # Extract content from items + try: + manifest = getattr(book.opf, "manifest", {}) or {} + for item in manifest.values(): + try: + if ( + getattr(item, "mime_type", "") + == "application/xhtml+xml" + ): + if content := book.read_item(item): + if cleaned_text := self._clean_text(content): + yield cleaned_text + except Exception as e: + logger.warning(f"Error processing item: {e}") + continue + + except Exception as e: + logger.warning(f"Error accessing manifest: {e}") + # Fallback: try to get content directly + if hasattr(book, "read_item"): + for item_id in getattr(book, "items", []): + try: + if content := book.read_item(item_id): + if cleaned_text := self._clean_text(content): + yield cleaned_text + except Exception as e: + logger.warning(f"Error in fallback reading: {e}") + continue + + except Exception as e: + logger.error(f"Error processing EPUB file: {str(e)}") + raise ValueError(f"Error processing EPUB file: {str(e)}") from e + finally: + try: + file_obj.close() + except Exception as e: + logger.warning(f"Error closing file: {e}") diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/json_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/json_parser.py new file mode 100644 index 00000000..3948e4de --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/json_parser.py @@ -0,0 +1,94 @@ +# type: ignore +import asyncio +import json +from typing import AsyncGenerator + +from core.base import R2RException +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class JSONParser(AsyncParser[str | bytes]): + """A parser for JSON data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + async def ingest( + self, data: str | bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest JSON data and yield a formatted text representation. + + :param data: The JSON data to parse. + :param kwargs: Additional keyword arguments. + """ + if isinstance(data, bytes): + data = data.decode("utf-8") + + loop = asyncio.get_event_loop() + + try: + parsed_json = await loop.run_in_executor(None, json.loads, data) + formatted_text = await loop.run_in_executor( + None, self._parse_json, parsed_json + ) + except json.JSONDecodeError as e: + raise R2RException( + message=f"Failed to parse JSON data, likely due to invalid JSON: {str(e)}", + status_code=400, + ) from e + + chunk_size = kwargs.get("chunk_size") + if chunk_size and isinstance(chunk_size, int): + # If chunk_size is provided and is an integer, yield the formatted text in chunks + for i in range(0, len(formatted_text), chunk_size): + yield formatted_text[i : i + chunk_size] + await asyncio.sleep(0) + else: + # If no valid chunk_size is provided, yield the entire formatted text + yield formatted_text + + def _parse_json(self, data: dict) -> str: + def remove_objects_with_null(obj): + if not isinstance(obj, dict): + return obj + result = obj.copy() + for key, value in obj.items(): + if isinstance(value, dict): + result[key] = remove_objects_with_null(value) + elif value is None: + del result[key] + return result + + def format_json_as_text(obj, indent=0): + lines = [] + indent_str = " " * indent + + if isinstance(obj, dict): + for key, value in obj.items(): + if isinstance(value, (dict, list)): + nested = format_json_as_text(value, indent + 2) + lines.append(f"{indent_str}{key}:\n{nested}") + else: + lines.append(f"{indent_str}{key}: {value}") + elif isinstance(obj, list): + for item in obj: + nested = format_json_as_text(item, indent + 2) + lines.append(f"{nested}") + else: + return f"{indent_str}{obj}" + + return "\n".join(lines) + + return format_json_as_text(remove_objects_with_null(data)) diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/msg_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/msg_parser.py new file mode 100644 index 00000000..4a024ecf --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/msg_parser.py @@ -0,0 +1,65 @@ +# type: ignore +import os +import tempfile +from typing import AsyncGenerator + +from msg_parser import MsOxMessage + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class MSGParser(AsyncParser[str | bytes]): + """Parser for MSG (Outlook Message) files using msg_parser.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest MSG data and yield email content.""" + if isinstance(data, str): + raise ValueError("MSG data must be in bytes format.") + + tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".msg") + try: + tmp_file.write(data) + tmp_file.close() + + msg = MsOxMessage(tmp_file.name) + + metadata = [] + + if msg.subject: + metadata.append(f"Subject: {msg.subject}") + if msg.sender: + metadata.append(f"From: {msg.sender}") + if msg.to: + metadata.append(f"To: {', '.join(msg.to)}") + if msg.sent_date: + metadata.append(f"Date: {msg.sent_date}") + if metadata: + yield "\n".join(metadata) + if msg.body: + yield msg.body.strip() + + for attachment in msg.attachments: + if attachment.Filename: + yield f"\nAttachment: {attachment.Filename}" + + except Exception as e: + raise ValueError(f"Error processing MSG file: {str(e)}") from e + finally: + os.remove(tmp_file.name) diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/org_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/org_parser.py new file mode 100644 index 00000000..2ea3f857 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/org_parser.py @@ -0,0 +1,72 @@ +# type: ignore +from typing import AsyncGenerator + +import orgparse + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class ORGParser(AsyncParser[str | bytes]): + """Parser for ORG (Emacs Org-mode) files.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.orgparse = orgparse + + def _process_node(self, node) -> list[str]: + """Process an org-mode node and return its content.""" + contents = [] + + # Add heading with proper level of asterisks + if node.level > 0: + contents.append(f"{'*' * node.level} {node.heading}") + + # Add body content if exists + if node.body: + contents.append(node.body.strip()) + + return contents + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest ORG data and yield document content.""" + if isinstance(data, bytes): + data = data.decode("utf-8") + + try: + # Create a temporary file-like object for orgparse + from io import StringIO + + file_obj = StringIO(data) + + # Parse the org file + root = self.orgparse.load(file_obj) + + # Process root node if it has content + if root.body: + yield root.body.strip() + + # Process all nodes + for node in root[1:]: # Skip root node in iteration + contents = self._process_node(node) + for content in contents: + if content.strip(): + yield content.strip() + + except Exception as e: + raise ValueError(f"Error processing ORG file: {str(e)}") from e + finally: + file_obj.close() diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/p7s_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/p7s_parser.py new file mode 100644 index 00000000..84983494 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/p7s_parser.py @@ -0,0 +1,178 @@ +# type: ignore +import email +import logging +from base64 import b64decode +from datetime import datetime +from email.message import Message +from typing import AsyncGenerator + +from cryptography import x509 +from cryptography.hazmat.primitives.serialization import pkcs7 +from cryptography.x509.oid import NameOID + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + +logger = logging.getLogger(__name__) + + +class P7SParser(AsyncParser[str | bytes]): + """Parser for S/MIME messages containing a P7S (PKCS#7 Signature) file.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.x509 = x509 + self.pkcs7 = pkcs7 + self.NameOID = NameOID + + def _format_datetime(self, dt: datetime) -> str: + """Format datetime in a readable way.""" + return dt.strftime("%Y-%m-%d %H:%M:%S UTC") + + def _get_name_attribute(self, name, oid): + """Safely get name attribute.""" + try: + return name.get_attributes_for_oid(oid)[0].value + except (IndexError, ValueError): + return None + + def _extract_cert_info(self, cert) -> dict: + """Extract relevant information from a certificate.""" + try: + subject = cert.subject + issuer = cert.issuer + + info = { + "common_name": self._get_name_attribute( + subject, self.NameOID.COMMON_NAME + ), + "organization": self._get_name_attribute( + subject, self.NameOID.ORGANIZATION_NAME + ), + "email": self._get_name_attribute( + subject, self.NameOID.EMAIL_ADDRESS + ), + "issuer_common_name": self._get_name_attribute( + issuer, self.NameOID.COMMON_NAME + ), + "issuer_organization": self._get_name_attribute( + issuer, self.NameOID.ORGANIZATION_NAME + ), + "serial_number": hex(cert.serial_number)[2:], + "not_valid_before": self._format_datetime( + cert.not_valid_before + ), + "not_valid_after": self._format_datetime(cert.not_valid_after), + "version": cert.version.name, + } + + return {k: v for k, v in info.items() if v is not None} + + except Exception as e: + logger.warning(f"Error extracting certificate info: {e}") + return {} + + def _try_parse_signature(self, data: bytes): + """Try to parse the signature data as PKCS7 containing certificates.""" + exceptions = [] + + # Try DER format PKCS7 + try: + certs = self.pkcs7.load_der_pkcs7_certificates(data) + if certs is not None: + return certs + except Exception as e: + exceptions.append(f"DER PKCS7 parsing failed: {str(e)}") + + # Try PEM format PKCS7 + try: + certs = self.pkcs7.load_pem_pkcs7_certificates(data) + if certs is not None: + return certs + except Exception as e: + exceptions.append(f"PEM PKCS7 parsing failed: {str(e)}") + + raise ValueError( + "Unable to parse signature file as PKCS7 with certificates. Attempted methods:\n" + + "\n".join(exceptions) + ) + + def _extract_p7s_data_from_mime(self, raw_data: bytes) -> bytes: + """Extract the raw PKCS#7 signature data from a MIME message.""" + msg: Message = email.message_from_bytes(raw_data) + + # If the message is multipart, find the part with application/x-pkcs7-signature + if msg.is_multipart(): + for part in msg.walk(): + ctype = part.get_content_type() + if ctype == "application/x-pkcs7-signature": + # Get the base64 encoded data from the payload + payload = part.get_payload(decode=False) + # payload at this stage is a base64 string + try: + return b64decode(payload) + except Exception as e: + raise ValueError( + f"Failed to decode base64 PKCS#7 signature: {str(e)}" + ) from e + # If we reach here, no PKCS#7 part was found + raise ValueError( + "No application/x-pkcs7-signature part found in the MIME message." + ) + else: + # Not multipart, try to parse directly if it's just a raw P7S + # This scenario is less common; usually it's multipart. + if msg.get_content_type() == "application/x-pkcs7-signature": + payload = msg.get_payload(decode=False) + return b64decode(payload) + + raise ValueError( + "The provided data does not contain a valid S/MIME signed message." + ) + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest an S/MIME message and extract the PKCS#7 signature + information.""" + # If data is a string, it might be base64 encoded, or it might be the raw MIME text. + # We should assume it's raw MIME text here because the input includes MIME headers. + if isinstance(data, str): + # Convert to bytes (raw MIME) + data = data.encode("utf-8") + + try: + # Extract the raw PKCS#7 data (der/pem) from the MIME message + p7s_data = self._extract_p7s_data_from_mime(data) + + # Parse the PKCS#7 data for certificates + certificates = self._try_parse_signature(p7s_data) + + if not certificates: + yield "No certificates found in the provided P7S file." + return + + # Process each certificate + for i, cert in enumerate(certificates, 1): + if cert_info := self._extract_cert_info(cert): + yield f"Certificate {i}:" + for key, value in cert_info.items(): + if value: + yield f"{key.replace('_', ' ').title()}: {value}" + yield "" # Empty line between certificates + else: + yield f"Certificate {i}: No detailed information extracted." + + except Exception as e: + raise ValueError(f"Error processing P7S file: {str(e)}") from e diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/rst_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/rst_parser.py new file mode 100644 index 00000000..76390655 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/rst_parser.py @@ -0,0 +1,58 @@ +# type: ignore +from typing import AsyncGenerator + +from docutils.core import publish_string +from docutils.writers import html5_polyglot + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class RSTParser(AsyncParser[str | bytes]): + """Parser for reStructuredText (.rst) files.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.publish_string = publish_string + self.html5_polyglot = html5_polyglot + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + if isinstance(data, bytes): + data = data.decode("utf-8") + + try: + # Convert RST to HTML + html = self.publish_string( + source=data, + writer=self.html5_polyglot.Writer(), + settings_overrides={"report_level": 5}, + ) + + # Basic HTML cleanup + import re + + text = html.decode("utf-8") + text = re.sub(r"<[^>]+>", " ", text) + text = re.sub(r"\s+", " ", text) + + # Split into paragraphs and yield non-empty ones + paragraphs = text.split("\n\n") + for paragraph in paragraphs: + if paragraph.strip(): + yield paragraph.strip() + + except Exception as e: + raise ValueError(f"Error processing RST file: {str(e)}") from e diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/tsv_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/tsv_parser.py new file mode 100644 index 00000000..35478360 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/tsv_parser.py @@ -0,0 +1,109 @@ +# type: ignore +from typing import IO, AsyncGenerator + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class TSVParser(AsyncParser[str | bytes]): + """A parser for TSV (Tab Separated Values) data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + import csv + from io import StringIO + + self.csv = csv + self.StringIO = StringIO + + async def ingest( + self, data: str | bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest TSV data and yield text from each row.""" + if isinstance(data, bytes): + data = data.decode("utf-8") + tsv_reader = self.csv.reader(self.StringIO(data), delimiter="\t") + for row in tsv_reader: + yield ", ".join(row) # Still join with comma for readability + + +class TSVParserAdvanced(AsyncParser[str | bytes]): + """An advanced parser for TSV data with chunking support.""" + + def __init__( + self, config: IngestionConfig, llm_provider: CompletionProvider + ): + self.llm_provider = llm_provider + self.config = config + + import csv + from io import StringIO + + self.csv = csv + self.StringIO = StringIO + + def validate_tsv(self, file: IO[bytes]) -> bool: + """Validate if the file is actually tab-delimited.""" + num_bytes = 65536 + lines = file.readlines(num_bytes) + file.seek(0) + + if not lines: + return False + + # Check if tabs exist in first few lines + sample = "\n".join(ln.decode("utf-8") for ln in lines[:5]) + return "\t" in sample + + async def ingest( + self, + data: str | bytes, + num_col_times_num_rows: int = 100, + *args, + **kwargs, + ) -> AsyncGenerator[str, None]: + """Ingest TSV data and yield text in chunks.""" + if isinstance(data, bytes): + data = data.decode("utf-8") + + # Validate TSV format + if not self.validate_tsv(self.StringIO(data)): + raise ValueError("File does not appear to be tab-delimited") + + tsv_reader = self.csv.reader(self.StringIO(data), delimiter="\t") + + # Get header + header = next(tsv_reader) + num_cols = len(header) + num_rows = num_col_times_num_rows // num_cols + + chunk_rows = [] + for row_num, row in enumerate(tsv_reader): + chunk_rows.append(row) + if row_num % num_rows == 0: + yield ( + ", ".join(header) + + "\n" + + "\n".join([", ".join(row) for row in chunk_rows]) + ) + chunk_rows = [] + + # Yield remaining rows + if chunk_rows: + yield ( + ", ".join(header) + + "\n" + + "\n".join([", ".join(row) for row in chunk_rows]) + ) diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/xls_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/xls_parser.py new file mode 100644 index 00000000..0bda9510 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/xls_parser.py @@ -0,0 +1,140 @@ +# type: ignore +from typing import AsyncGenerator + +import networkx as nx +import numpy as np +import xlrd + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class XLSParser(AsyncParser[str | bytes]): + """A parser for XLS (Excel 97-2003) data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.xlrd = xlrd + + async def ingest( + self, data: bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest XLS data and yield text from each row.""" + if isinstance(data, str): + raise ValueError("XLS data must be in bytes format.") + + wb = self.xlrd.open_workbook(file_contents=data) + for sheet in wb.sheets(): + for row_idx in range(sheet.nrows): + # Get all values in the row + row_values = [] + for col_idx in range(sheet.ncols): + cell = sheet.cell(row_idx, col_idx) + # Handle different cell types + if cell.ctype == self.xlrd.XL_CELL_DATE: + try: + value = self.xlrd.xldate_as_datetime( + cell.value, wb.datemode + ).strftime("%Y-%m-%d") + except Exception: + value = str(cell.value) + elif cell.ctype == self.xlrd.XL_CELL_BOOLEAN: + value = str(bool(cell.value)).lower() + elif cell.ctype == self.xlrd.XL_CELL_ERROR: + value = "#ERROR#" + else: + value = str(cell.value).strip() + + row_values.append(value) + + # Yield non-empty rows + if any(val.strip() for val in row_values): + yield ", ".join(row_values) + + +class XLSParserAdvanced(AsyncParser[str | bytes]): + """An advanced parser for XLS data with chunking support.""" + + def __init__( + self, config: IngestionConfig, llm_provider: CompletionProvider + ): + self.llm_provider = llm_provider + self.config = config + self.nx = nx + self.np = np + self.xlrd = xlrd + + def connected_components(self, arr): + g = self.nx.grid_2d_graph(len(arr), len(arr[0])) + empty_cell_indices = list(zip(*self.np.where(arr == ""), strict=False)) + g.remove_nodes_from(empty_cell_indices) + components = self.nx.connected_components(g) + for component in components: + rows, cols = zip(*component, strict=False) + min_row, max_row = min(rows), max(rows) + min_col, max_col = min(cols), max(cols) + yield arr[min_row : max_row + 1, min_col : max_col + 1] + + def get_cell_value(self, cell, workbook): + """Extract cell value handling different data types.""" + if cell.ctype == self.xlrd.XL_CELL_DATE: + try: + return self.xlrd.xldate_as_datetime( + cell.value, workbook.datemode + ).strftime("%Y-%m-%d") + except Exception: + return str(cell.value) + elif cell.ctype == self.xlrd.XL_CELL_BOOLEAN: + return str(bool(cell.value)).lower() + elif cell.ctype == self.xlrd.XL_CELL_ERROR: + return "#ERROR#" + else: + return str(cell.value).strip() + + async def ingest( + self, data: bytes, num_col_times_num_rows: int = 100, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest XLS data and yield text from each connected component.""" + if isinstance(data, str): + raise ValueError("XLS data must be in bytes format.") + + workbook = self.xlrd.open_workbook(file_contents=data) + + for sheet in workbook.sheets(): + # Convert sheet to numpy array with proper value handling + ws_data = self.np.array( + [ + [ + self.get_cell_value(sheet.cell(row, col), workbook) + for col in range(sheet.ncols) + ] + for row in range(sheet.nrows) + ] + ) + + for table in self.connected_components(ws_data): + if len(table) <= 1: + continue + + num_rows = len(table) + num_rows_per_chunk = num_col_times_num_rows // num_rows + headers = ", ".join(table[0]) + + for i in range(1, num_rows, num_rows_per_chunk): + chunk = table[i : i + num_rows_per_chunk] + yield ( + headers + + "\n" + + "\n".join([", ".join(row) for row in chunk]) + ) diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/xlsx_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/xlsx_parser.py new file mode 100644 index 00000000..4c303177 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/xlsx_parser.py @@ -0,0 +1,100 @@ +# type: ignore +from io import BytesIO +from typing import AsyncGenerator + +import networkx as nx +import numpy as np +from openpyxl import load_workbook + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class XLSXParser(AsyncParser[str | bytes]): + """A parser for XLSX data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.load_workbook = load_workbook + + async def ingest( + self, data: bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest XLSX data and yield text from each row.""" + if isinstance(data, str): + raise ValueError("XLSX data must be in bytes format.") + + wb = self.load_workbook(filename=BytesIO(data)) + for sheet in wb.worksheets: + for row in sheet.iter_rows(values_only=True): + yield ", ".join(map(str, row)) + + +class XLSXParserAdvanced(AsyncParser[str | bytes]): + """A parser for XLSX data.""" + + # identifies connected components in the excel graph and extracts data from each component + def __init__( + self, config: IngestionConfig, llm_provider: CompletionProvider + ): + self.llm_provider = llm_provider + self.config = config + self.nx = nx + self.np = np + self.load_workbook = load_workbook + + def connected_components(self, arr): + g = self.nx.grid_2d_graph(len(arr), len(arr[0])) + empty_cell_indices = list( + zip(*self.np.where(arr is None), strict=False) + ) + g.remove_nodes_from(empty_cell_indices) + components = self.nx.connected_components(g) + for component in components: + rows, cols = zip(*component, strict=False) + min_row, max_row = min(rows), max(rows) + min_col, max_col = min(cols), max(cols) + yield arr[min_row : max_row + 1, min_col : max_col + 1].astype( + "str" + ) + + async def ingest( + self, data: bytes, num_col_times_num_rows: int = 100, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest XLSX data and yield text from each connected component.""" + if isinstance(data, str): + raise ValueError("XLSX data must be in bytes format.") + + workbook = self.load_workbook(filename=BytesIO(data)) + + for ws in workbook.worksheets: + ws_data = self.np.array( + [[cell.value for cell in row] for row in ws.iter_rows()] + ) + for table in self.connected_components(ws_data): + # parse like a csv parser, assumes that the first row has column names + if len(table) <= 1: + continue + + num_rows = len(table) + num_rows_per_chunk = num_col_times_num_rows // num_rows + headers = ", ".join(table[0]) + # add header to each one + for i in range(1, num_rows, num_rows_per_chunk): + chunk = table[i : i + num_rows_per_chunk] + yield ( + headers + + "\n" + + "\n".join([", ".join(row) for row in chunk]) + ) diff --git a/.venv/lib/python3.12/site-packages/core/parsers/text/__init__.py b/.venv/lib/python3.12/site-packages/core/parsers/text/__init__.py new file mode 100644 index 00000000..8f85d046 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/text/__init__.py @@ -0,0 +1,10 @@ +# type: ignore +from .html_parser import HTMLParser +from .md_parser import MDParser +from .text_parser import TextParser + +__all__ = [ + "MDParser", + "HTMLParser", + "TextParser", +] diff --git a/.venv/lib/python3.12/site-packages/core/parsers/text/html_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/text/html_parser.py new file mode 100644 index 00000000..a04331e0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/text/html_parser.py @@ -0,0 +1,32 @@ +# type: ignore +from typing import AsyncGenerator + +from bs4 import BeautifulSoup + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class HTMLParser(AsyncParser[str | bytes]): + """A parser for HTML data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + async def ingest( + self, data: str | bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest HTML data and yield text.""" + soup = BeautifulSoup(data, "html.parser") + yield soup.get_text() diff --git a/.venv/lib/python3.12/site-packages/core/parsers/text/md_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/text/md_parser.py new file mode 100644 index 00000000..7ab11d92 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/text/md_parser.py @@ -0,0 +1,39 @@ +# type: ignore +from typing import AsyncGenerator + +from bs4 import BeautifulSoup + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class MDParser(AsyncParser[str | bytes]): + """A parser for Markdown data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + import markdown + + self.markdown = markdown + + async def ingest( + self, data: str | bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest Markdown data and yield text.""" + if isinstance(data, bytes): + data = data.decode("utf-8") + html = self.markdown.markdown(data) + soup = BeautifulSoup(html, "html.parser") + yield soup.get_text() diff --git a/.venv/lib/python3.12/site-packages/core/parsers/text/text_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/text/text_parser.py new file mode 100644 index 00000000..51ff1cbd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/text/text_parser.py @@ -0,0 +1,30 @@ +# type: ignore +from typing import AsyncGenerator + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class TextParser(AsyncParser[str | bytes]): + """A parser for raw text data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + async def ingest( + self, data: str | bytes, *args, **kwargs + ) -> AsyncGenerator[str | bytes, None]: + if isinstance(data, bytes): + data = data.decode("utf-8") + yield data |