diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/parsers/structured | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/parsers/structured')
12 files changed, 1136 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/__init__.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/__init__.py new file mode 100644 index 00000000..a770502e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/__init__.py @@ -0,0 +1,28 @@ +# type: ignore +from .csv_parser import CSVParser, CSVParserAdvanced +from .eml_parser import EMLParser +from .epub_parser import EPUBParser +from .json_parser import JSONParser +from .msg_parser import MSGParser +from .org_parser import ORGParser +from .p7s_parser import P7SParser +from .rst_parser import RSTParser +from .tsv_parser import TSVParser +from .xls_parser import XLSParser +from .xlsx_parser import XLSXParser, XLSXParserAdvanced + +__all__ = [ + "CSVParser", + "CSVParserAdvanced", + "EMLParser", + "EPUBParser", + "JSONParser", + "MSGParser", + "ORGParser", + "P7SParser", + "RSTParser", + "TSVParser", + "XLSParser", + "XLSXParser", + "XLSXParserAdvanced", +] diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/csv_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/csv_parser.py new file mode 100644 index 00000000..d80d5d07 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/csv_parser.py @@ -0,0 +1,108 @@ +# type: ignore +from typing import IO, AsyncGenerator, Optional + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class CSVParser(AsyncParser[str | bytes]): + """A parser for CSV data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + import csv + from io import StringIO + + self.csv = csv + self.StringIO = StringIO + + async def ingest( + self, data: str | bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest CSV data and yield text from each row.""" + if isinstance(data, bytes): + data = data.decode("utf-8") + csv_reader = self.csv.reader(self.StringIO(data)) + for row in csv_reader: + yield ", ".join(row) + + +class CSVParserAdvanced(AsyncParser[str | bytes]): + """A parser for CSV data.""" + + def __init__( + self, config: IngestionConfig, llm_provider: CompletionProvider + ): + self.llm_provider = llm_provider + self.config = config + + import csv + from io import StringIO + + self.csv = csv + self.StringIO = StringIO + + def get_delimiter( + self, file_path: Optional[str] = None, file: Optional[IO[bytes]] = None + ): + sniffer = self.csv.Sniffer() + num_bytes = 65536 + + if file: + lines = file.readlines(num_bytes) + file.seek(0) + data = "\n".join(ln.decode("utf-8") for ln in lines) + elif file_path is not None: + with open(file_path) as f: + data = "\n".join(f.readlines(num_bytes)) + + return sniffer.sniff(data, delimiters=",;").delimiter + + async def ingest( + self, + data: str | bytes, + num_col_times_num_rows: int = 100, + *args, + **kwargs, + ) -> AsyncGenerator[str, None]: + """Ingest CSV data and yield text from each row.""" + if isinstance(data, bytes): + data = data.decode("utf-8") + # let the first row be the header + delimiter = self.get_delimiter(file=self.StringIO(data)) + + csv_reader = self.csv.reader(self.StringIO(data), delimiter=delimiter) + + header = next(csv_reader) + num_cols = len(header.split(delimiter)) + num_rows = num_col_times_num_rows // num_cols + + chunk_rows = [] + for row_num, row in enumerate(csv_reader): + chunk_rows.append(row) + if row_num % num_rows == 0: + yield ( + ", ".join(header) + + "\n" + + "\n".join([", ".join(row) for row in chunk_rows]) + ) + chunk_rows = [] + + if chunk_rows: + yield ( + ", ".join(header) + + "\n" + + "\n".join([", ".join(row) for row in chunk_rows]) + ) diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/eml_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/eml_parser.py new file mode 100644 index 00000000..57a5ceab --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/eml_parser.py @@ -0,0 +1,63 @@ +# type: ignore +from email import message_from_bytes, policy +from typing import AsyncGenerator + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class EMLParser(AsyncParser[str | bytes]): + """Parser for EML (email) files.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest EML data and yield email content.""" + if isinstance(data, str): + raise ValueError("EML data must be in bytes format.") + + # Parse email with policy for modern email handling + email_message = message_from_bytes(data, policy=policy.default) + + # Extract and yield email metadata + metadata = [] + if email_message["Subject"]: + metadata.append(f"Subject: {email_message['Subject']}") + if email_message["From"]: + metadata.append(f"From: {email_message['From']}") + if email_message["To"]: + metadata.append(f"To: {email_message['To']}") + if email_message["Date"]: + metadata.append(f"Date: {email_message['Date']}") + + if metadata: + yield "\n".join(metadata) + + # Extract and yield email body + if email_message.is_multipart(): + for part in email_message.walk(): + if part.get_content_type() == "text/plain": + text = part.get_content() + if text.strip(): + yield text.strip() + elif part.get_content_type() == "text/html": + # Could add HTML parsing here if needed + continue + else: + body = email_message.get_content() + if body.strip(): + yield body.strip() diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/epub_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/epub_parser.py new file mode 100644 index 00000000..ff51fb86 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/epub_parser.py @@ -0,0 +1,121 @@ +# type: ignore +import logging +from typing import AsyncGenerator + +import epub + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + +logger = logging.getLogger(__name__) + + +class EPUBParser(AsyncParser[str | bytes]): + """Parser for EPUB electronic book files.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.epub = epub + + def _safe_get_metadata(self, book, field: str) -> str | None: + """Safely extract metadata field from epub book.""" + try: + return getattr(book, field, None) or getattr(book.opf, field, None) + except Exception as e: + logger.debug(f"Error getting {field} metadata: {e}") + return None + + def _clean_text(self, content: bytes) -> str: + """Clean HTML content and return plain text.""" + try: + import re + + text = content.decode("utf-8", errors="ignore") + # Remove HTML tags + text = re.sub(r"<[^>]+>", " ", text) + # Normalize whitespace + text = re.sub(r"\s+", " ", text) + # Remove any remaining HTML entities + text = re.sub(r"&[^;]+;", " ", text) + return text.strip() + except Exception as e: + logger.warning(f"Error cleaning text: {e}") + return "" + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest EPUB data and yield book content.""" + if isinstance(data, str): + raise ValueError("EPUB data must be in bytes format.") + + from io import BytesIO + + file_obj = BytesIO(data) + + try: + book = self.epub.open_epub(file_obj) + + # Safely extract metadata + metadata = [] + for field, label in [ + ("title", "Title"), + ("creator", "Author"), + ("language", "Language"), + ("publisher", "Publisher"), + ("date", "Date"), + ]: + if value := self._safe_get_metadata(book, field): + metadata.append(f"{label}: {value}") + + if metadata: + yield "\n".join(metadata) + + # Extract content from items + try: + manifest = getattr(book.opf, "manifest", {}) or {} + for item in manifest.values(): + try: + if ( + getattr(item, "mime_type", "") + == "application/xhtml+xml" + ): + if content := book.read_item(item): + if cleaned_text := self._clean_text(content): + yield cleaned_text + except Exception as e: + logger.warning(f"Error processing item: {e}") + continue + + except Exception as e: + logger.warning(f"Error accessing manifest: {e}") + # Fallback: try to get content directly + if hasattr(book, "read_item"): + for item_id in getattr(book, "items", []): + try: + if content := book.read_item(item_id): + if cleaned_text := self._clean_text(content): + yield cleaned_text + except Exception as e: + logger.warning(f"Error in fallback reading: {e}") + continue + + except Exception as e: + logger.error(f"Error processing EPUB file: {str(e)}") + raise ValueError(f"Error processing EPUB file: {str(e)}") from e + finally: + try: + file_obj.close() + except Exception as e: + logger.warning(f"Error closing file: {e}") diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/json_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/json_parser.py new file mode 100644 index 00000000..3948e4de --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/json_parser.py @@ -0,0 +1,94 @@ +# type: ignore +import asyncio +import json +from typing import AsyncGenerator + +from core.base import R2RException +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class JSONParser(AsyncParser[str | bytes]): + """A parser for JSON data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + async def ingest( + self, data: str | bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest JSON data and yield a formatted text representation. + + :param data: The JSON data to parse. + :param kwargs: Additional keyword arguments. + """ + if isinstance(data, bytes): + data = data.decode("utf-8") + + loop = asyncio.get_event_loop() + + try: + parsed_json = await loop.run_in_executor(None, json.loads, data) + formatted_text = await loop.run_in_executor( + None, self._parse_json, parsed_json + ) + except json.JSONDecodeError as e: + raise R2RException( + message=f"Failed to parse JSON data, likely due to invalid JSON: {str(e)}", + status_code=400, + ) from e + + chunk_size = kwargs.get("chunk_size") + if chunk_size and isinstance(chunk_size, int): + # If chunk_size is provided and is an integer, yield the formatted text in chunks + for i in range(0, len(formatted_text), chunk_size): + yield formatted_text[i : i + chunk_size] + await asyncio.sleep(0) + else: + # If no valid chunk_size is provided, yield the entire formatted text + yield formatted_text + + def _parse_json(self, data: dict) -> str: + def remove_objects_with_null(obj): + if not isinstance(obj, dict): + return obj + result = obj.copy() + for key, value in obj.items(): + if isinstance(value, dict): + result[key] = remove_objects_with_null(value) + elif value is None: + del result[key] + return result + + def format_json_as_text(obj, indent=0): + lines = [] + indent_str = " " * indent + + if isinstance(obj, dict): + for key, value in obj.items(): + if isinstance(value, (dict, list)): + nested = format_json_as_text(value, indent + 2) + lines.append(f"{indent_str}{key}:\n{nested}") + else: + lines.append(f"{indent_str}{key}: {value}") + elif isinstance(obj, list): + for item in obj: + nested = format_json_as_text(item, indent + 2) + lines.append(f"{nested}") + else: + return f"{indent_str}{obj}" + + return "\n".join(lines) + + return format_json_as_text(remove_objects_with_null(data)) diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/msg_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/msg_parser.py new file mode 100644 index 00000000..4a024ecf --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/msg_parser.py @@ -0,0 +1,65 @@ +# type: ignore +import os +import tempfile +from typing import AsyncGenerator + +from msg_parser import MsOxMessage + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class MSGParser(AsyncParser[str | bytes]): + """Parser for MSG (Outlook Message) files using msg_parser.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest MSG data and yield email content.""" + if isinstance(data, str): + raise ValueError("MSG data must be in bytes format.") + + tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".msg") + try: + tmp_file.write(data) + tmp_file.close() + + msg = MsOxMessage(tmp_file.name) + + metadata = [] + + if msg.subject: + metadata.append(f"Subject: {msg.subject}") + if msg.sender: + metadata.append(f"From: {msg.sender}") + if msg.to: + metadata.append(f"To: {', '.join(msg.to)}") + if msg.sent_date: + metadata.append(f"Date: {msg.sent_date}") + if metadata: + yield "\n".join(metadata) + if msg.body: + yield msg.body.strip() + + for attachment in msg.attachments: + if attachment.Filename: + yield f"\nAttachment: {attachment.Filename}" + + except Exception as e: + raise ValueError(f"Error processing MSG file: {str(e)}") from e + finally: + os.remove(tmp_file.name) diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/org_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/org_parser.py new file mode 100644 index 00000000..2ea3f857 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/org_parser.py @@ -0,0 +1,72 @@ +# type: ignore +from typing import AsyncGenerator + +import orgparse + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class ORGParser(AsyncParser[str | bytes]): + """Parser for ORG (Emacs Org-mode) files.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.orgparse = orgparse + + def _process_node(self, node) -> list[str]: + """Process an org-mode node and return its content.""" + contents = [] + + # Add heading with proper level of asterisks + if node.level > 0: + contents.append(f"{'*' * node.level} {node.heading}") + + # Add body content if exists + if node.body: + contents.append(node.body.strip()) + + return contents + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest ORG data and yield document content.""" + if isinstance(data, bytes): + data = data.decode("utf-8") + + try: + # Create a temporary file-like object for orgparse + from io import StringIO + + file_obj = StringIO(data) + + # Parse the org file + root = self.orgparse.load(file_obj) + + # Process root node if it has content + if root.body: + yield root.body.strip() + + # Process all nodes + for node in root[1:]: # Skip root node in iteration + contents = self._process_node(node) + for content in contents: + if content.strip(): + yield content.strip() + + except Exception as e: + raise ValueError(f"Error processing ORG file: {str(e)}") from e + finally: + file_obj.close() diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/p7s_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/p7s_parser.py new file mode 100644 index 00000000..84983494 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/p7s_parser.py @@ -0,0 +1,178 @@ +# type: ignore +import email +import logging +from base64 import b64decode +from datetime import datetime +from email.message import Message +from typing import AsyncGenerator + +from cryptography import x509 +from cryptography.hazmat.primitives.serialization import pkcs7 +from cryptography.x509.oid import NameOID + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + +logger = logging.getLogger(__name__) + + +class P7SParser(AsyncParser[str | bytes]): + """Parser for S/MIME messages containing a P7S (PKCS#7 Signature) file.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.x509 = x509 + self.pkcs7 = pkcs7 + self.NameOID = NameOID + + def _format_datetime(self, dt: datetime) -> str: + """Format datetime in a readable way.""" + return dt.strftime("%Y-%m-%d %H:%M:%S UTC") + + def _get_name_attribute(self, name, oid): + """Safely get name attribute.""" + try: + return name.get_attributes_for_oid(oid)[0].value + except (IndexError, ValueError): + return None + + def _extract_cert_info(self, cert) -> dict: + """Extract relevant information from a certificate.""" + try: + subject = cert.subject + issuer = cert.issuer + + info = { + "common_name": self._get_name_attribute( + subject, self.NameOID.COMMON_NAME + ), + "organization": self._get_name_attribute( + subject, self.NameOID.ORGANIZATION_NAME + ), + "email": self._get_name_attribute( + subject, self.NameOID.EMAIL_ADDRESS + ), + "issuer_common_name": self._get_name_attribute( + issuer, self.NameOID.COMMON_NAME + ), + "issuer_organization": self._get_name_attribute( + issuer, self.NameOID.ORGANIZATION_NAME + ), + "serial_number": hex(cert.serial_number)[2:], + "not_valid_before": self._format_datetime( + cert.not_valid_before + ), + "not_valid_after": self._format_datetime(cert.not_valid_after), + "version": cert.version.name, + } + + return {k: v for k, v in info.items() if v is not None} + + except Exception as e: + logger.warning(f"Error extracting certificate info: {e}") + return {} + + def _try_parse_signature(self, data: bytes): + """Try to parse the signature data as PKCS7 containing certificates.""" + exceptions = [] + + # Try DER format PKCS7 + try: + certs = self.pkcs7.load_der_pkcs7_certificates(data) + if certs is not None: + return certs + except Exception as e: + exceptions.append(f"DER PKCS7 parsing failed: {str(e)}") + + # Try PEM format PKCS7 + try: + certs = self.pkcs7.load_pem_pkcs7_certificates(data) + if certs is not None: + return certs + except Exception as e: + exceptions.append(f"PEM PKCS7 parsing failed: {str(e)}") + + raise ValueError( + "Unable to parse signature file as PKCS7 with certificates. Attempted methods:\n" + + "\n".join(exceptions) + ) + + def _extract_p7s_data_from_mime(self, raw_data: bytes) -> bytes: + """Extract the raw PKCS#7 signature data from a MIME message.""" + msg: Message = email.message_from_bytes(raw_data) + + # If the message is multipart, find the part with application/x-pkcs7-signature + if msg.is_multipart(): + for part in msg.walk(): + ctype = part.get_content_type() + if ctype == "application/x-pkcs7-signature": + # Get the base64 encoded data from the payload + payload = part.get_payload(decode=False) + # payload at this stage is a base64 string + try: + return b64decode(payload) + except Exception as e: + raise ValueError( + f"Failed to decode base64 PKCS#7 signature: {str(e)}" + ) from e + # If we reach here, no PKCS#7 part was found + raise ValueError( + "No application/x-pkcs7-signature part found in the MIME message." + ) + else: + # Not multipart, try to parse directly if it's just a raw P7S + # This scenario is less common; usually it's multipart. + if msg.get_content_type() == "application/x-pkcs7-signature": + payload = msg.get_payload(decode=False) + return b64decode(payload) + + raise ValueError( + "The provided data does not contain a valid S/MIME signed message." + ) + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest an S/MIME message and extract the PKCS#7 signature + information.""" + # If data is a string, it might be base64 encoded, or it might be the raw MIME text. + # We should assume it's raw MIME text here because the input includes MIME headers. + if isinstance(data, str): + # Convert to bytes (raw MIME) + data = data.encode("utf-8") + + try: + # Extract the raw PKCS#7 data (der/pem) from the MIME message + p7s_data = self._extract_p7s_data_from_mime(data) + + # Parse the PKCS#7 data for certificates + certificates = self._try_parse_signature(p7s_data) + + if not certificates: + yield "No certificates found in the provided P7S file." + return + + # Process each certificate + for i, cert in enumerate(certificates, 1): + if cert_info := self._extract_cert_info(cert): + yield f"Certificate {i}:" + for key, value in cert_info.items(): + if value: + yield f"{key.replace('_', ' ').title()}: {value}" + yield "" # Empty line between certificates + else: + yield f"Certificate {i}: No detailed information extracted." + + except Exception as e: + raise ValueError(f"Error processing P7S file: {str(e)}") from e diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/rst_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/rst_parser.py new file mode 100644 index 00000000..76390655 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/rst_parser.py @@ -0,0 +1,58 @@ +# type: ignore +from typing import AsyncGenerator + +from docutils.core import publish_string +from docutils.writers import html5_polyglot + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class RSTParser(AsyncParser[str | bytes]): + """Parser for reStructuredText (.rst) files.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.publish_string = publish_string + self.html5_polyglot = html5_polyglot + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + if isinstance(data, bytes): + data = data.decode("utf-8") + + try: + # Convert RST to HTML + html = self.publish_string( + source=data, + writer=self.html5_polyglot.Writer(), + settings_overrides={"report_level": 5}, + ) + + # Basic HTML cleanup + import re + + text = html.decode("utf-8") + text = re.sub(r"<[^>]+>", " ", text) + text = re.sub(r"\s+", " ", text) + + # Split into paragraphs and yield non-empty ones + paragraphs = text.split("\n\n") + for paragraph in paragraphs: + if paragraph.strip(): + yield paragraph.strip() + + except Exception as e: + raise ValueError(f"Error processing RST file: {str(e)}") from e diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/tsv_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/tsv_parser.py new file mode 100644 index 00000000..35478360 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/tsv_parser.py @@ -0,0 +1,109 @@ +# type: ignore +from typing import IO, AsyncGenerator + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class TSVParser(AsyncParser[str | bytes]): + """A parser for TSV (Tab Separated Values) data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + + import csv + from io import StringIO + + self.csv = csv + self.StringIO = StringIO + + async def ingest( + self, data: str | bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest TSV data and yield text from each row.""" + if isinstance(data, bytes): + data = data.decode("utf-8") + tsv_reader = self.csv.reader(self.StringIO(data), delimiter="\t") + for row in tsv_reader: + yield ", ".join(row) # Still join with comma for readability + + +class TSVParserAdvanced(AsyncParser[str | bytes]): + """An advanced parser for TSV data with chunking support.""" + + def __init__( + self, config: IngestionConfig, llm_provider: CompletionProvider + ): + self.llm_provider = llm_provider + self.config = config + + import csv + from io import StringIO + + self.csv = csv + self.StringIO = StringIO + + def validate_tsv(self, file: IO[bytes]) -> bool: + """Validate if the file is actually tab-delimited.""" + num_bytes = 65536 + lines = file.readlines(num_bytes) + file.seek(0) + + if not lines: + return False + + # Check if tabs exist in first few lines + sample = "\n".join(ln.decode("utf-8") for ln in lines[:5]) + return "\t" in sample + + async def ingest( + self, + data: str | bytes, + num_col_times_num_rows: int = 100, + *args, + **kwargs, + ) -> AsyncGenerator[str, None]: + """Ingest TSV data and yield text in chunks.""" + if isinstance(data, bytes): + data = data.decode("utf-8") + + # Validate TSV format + if not self.validate_tsv(self.StringIO(data)): + raise ValueError("File does not appear to be tab-delimited") + + tsv_reader = self.csv.reader(self.StringIO(data), delimiter="\t") + + # Get header + header = next(tsv_reader) + num_cols = len(header) + num_rows = num_col_times_num_rows // num_cols + + chunk_rows = [] + for row_num, row in enumerate(tsv_reader): + chunk_rows.append(row) + if row_num % num_rows == 0: + yield ( + ", ".join(header) + + "\n" + + "\n".join([", ".join(row) for row in chunk_rows]) + ) + chunk_rows = [] + + # Yield remaining rows + if chunk_rows: + yield ( + ", ".join(header) + + "\n" + + "\n".join([", ".join(row) for row in chunk_rows]) + ) diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/xls_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/xls_parser.py new file mode 100644 index 00000000..0bda9510 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/xls_parser.py @@ -0,0 +1,140 @@ +# type: ignore +from typing import AsyncGenerator + +import networkx as nx +import numpy as np +import xlrd + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class XLSParser(AsyncParser[str | bytes]): + """A parser for XLS (Excel 97-2003) data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.xlrd = xlrd + + async def ingest( + self, data: bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest XLS data and yield text from each row.""" + if isinstance(data, str): + raise ValueError("XLS data must be in bytes format.") + + wb = self.xlrd.open_workbook(file_contents=data) + for sheet in wb.sheets(): + for row_idx in range(sheet.nrows): + # Get all values in the row + row_values = [] + for col_idx in range(sheet.ncols): + cell = sheet.cell(row_idx, col_idx) + # Handle different cell types + if cell.ctype == self.xlrd.XL_CELL_DATE: + try: + value = self.xlrd.xldate_as_datetime( + cell.value, wb.datemode + ).strftime("%Y-%m-%d") + except Exception: + value = str(cell.value) + elif cell.ctype == self.xlrd.XL_CELL_BOOLEAN: + value = str(bool(cell.value)).lower() + elif cell.ctype == self.xlrd.XL_CELL_ERROR: + value = "#ERROR#" + else: + value = str(cell.value).strip() + + row_values.append(value) + + # Yield non-empty rows + if any(val.strip() for val in row_values): + yield ", ".join(row_values) + + +class XLSParserAdvanced(AsyncParser[str | bytes]): + """An advanced parser for XLS data with chunking support.""" + + def __init__( + self, config: IngestionConfig, llm_provider: CompletionProvider + ): + self.llm_provider = llm_provider + self.config = config + self.nx = nx + self.np = np + self.xlrd = xlrd + + def connected_components(self, arr): + g = self.nx.grid_2d_graph(len(arr), len(arr[0])) + empty_cell_indices = list(zip(*self.np.where(arr == ""), strict=False)) + g.remove_nodes_from(empty_cell_indices) + components = self.nx.connected_components(g) + for component in components: + rows, cols = zip(*component, strict=False) + min_row, max_row = min(rows), max(rows) + min_col, max_col = min(cols), max(cols) + yield arr[min_row : max_row + 1, min_col : max_col + 1] + + def get_cell_value(self, cell, workbook): + """Extract cell value handling different data types.""" + if cell.ctype == self.xlrd.XL_CELL_DATE: + try: + return self.xlrd.xldate_as_datetime( + cell.value, workbook.datemode + ).strftime("%Y-%m-%d") + except Exception: + return str(cell.value) + elif cell.ctype == self.xlrd.XL_CELL_BOOLEAN: + return str(bool(cell.value)).lower() + elif cell.ctype == self.xlrd.XL_CELL_ERROR: + return "#ERROR#" + else: + return str(cell.value).strip() + + async def ingest( + self, data: bytes, num_col_times_num_rows: int = 100, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest XLS data and yield text from each connected component.""" + if isinstance(data, str): + raise ValueError("XLS data must be in bytes format.") + + workbook = self.xlrd.open_workbook(file_contents=data) + + for sheet in workbook.sheets(): + # Convert sheet to numpy array with proper value handling + ws_data = self.np.array( + [ + [ + self.get_cell_value(sheet.cell(row, col), workbook) + for col in range(sheet.ncols) + ] + for row in range(sheet.nrows) + ] + ) + + for table in self.connected_components(ws_data): + if len(table) <= 1: + continue + + num_rows = len(table) + num_rows_per_chunk = num_col_times_num_rows // num_rows + headers = ", ".join(table[0]) + + for i in range(1, num_rows, num_rows_per_chunk): + chunk = table[i : i + num_rows_per_chunk] + yield ( + headers + + "\n" + + "\n".join([", ".join(row) for row in chunk]) + ) diff --git a/.venv/lib/python3.12/site-packages/core/parsers/structured/xlsx_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/structured/xlsx_parser.py new file mode 100644 index 00000000..4c303177 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/structured/xlsx_parser.py @@ -0,0 +1,100 @@ +# type: ignore +from io import BytesIO +from typing import AsyncGenerator + +import networkx as nx +import numpy as np +from openpyxl import load_workbook + +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + + +class XLSXParser(AsyncParser[str | bytes]): + """A parser for XLSX data.""" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.load_workbook = load_workbook + + async def ingest( + self, data: bytes, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest XLSX data and yield text from each row.""" + if isinstance(data, str): + raise ValueError("XLSX data must be in bytes format.") + + wb = self.load_workbook(filename=BytesIO(data)) + for sheet in wb.worksheets: + for row in sheet.iter_rows(values_only=True): + yield ", ".join(map(str, row)) + + +class XLSXParserAdvanced(AsyncParser[str | bytes]): + """A parser for XLSX data.""" + + # identifies connected components in the excel graph and extracts data from each component + def __init__( + self, config: IngestionConfig, llm_provider: CompletionProvider + ): + self.llm_provider = llm_provider + self.config = config + self.nx = nx + self.np = np + self.load_workbook = load_workbook + + def connected_components(self, arr): + g = self.nx.grid_2d_graph(len(arr), len(arr[0])) + empty_cell_indices = list( + zip(*self.np.where(arr is None), strict=False) + ) + g.remove_nodes_from(empty_cell_indices) + components = self.nx.connected_components(g) + for component in components: + rows, cols = zip(*component, strict=False) + min_row, max_row = min(rows), max(rows) + min_col, max_col = min(cols), max(cols) + yield arr[min_row : max_row + 1, min_col : max_col + 1].astype( + "str" + ) + + async def ingest( + self, data: bytes, num_col_times_num_rows: int = 100, *args, **kwargs + ) -> AsyncGenerator[str, None]: + """Ingest XLSX data and yield text from each connected component.""" + if isinstance(data, str): + raise ValueError("XLSX data must be in bytes format.") + + workbook = self.load_workbook(filename=BytesIO(data)) + + for ws in workbook.worksheets: + ws_data = self.np.array( + [[cell.value for cell in row] for row in ws.iter_rows()] + ) + for table in self.connected_components(ws_data): + # parse like a csv parser, assumes that the first row has column names + if len(table) <= 1: + continue + + num_rows = len(table) + num_rows_per_chunk = num_col_times_num_rows // num_rows + headers = ", ".join(table[0]) + # add header to each one + for i in range(1, num_rows, num_rows_per_chunk): + chunk = table[i : i + num_rows_per_chunk] + yield ( + headers + + "\n" + + "\n".join([", ".join(row) for row in chunk]) + ) |