about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py')
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py281
1 files changed, 281 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py
new file mode 100644
index 00000000..bcb37eab
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py
@@ -0,0 +1,281 @@
+# type: ignore
+import base64
+import logging
+from io import BytesIO
+from typing import AsyncGenerator, Optional
+
+import filetype
+import pillow_heif
+from PIL import Image
+
+from core.base.abstractions import GenerationConfig
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+    CompletionProvider,
+    DatabaseProvider,
+    IngestionConfig,
+)
+
+logger = logging.getLogger()
+
+
+class ImageParser(AsyncParser[str | bytes]):
+    # Mapping of file extensions to MIME types
+    MIME_TYPE_MAPPING = {
+        "bmp": "image/bmp",
+        "gif": "image/gif",
+        "heic": "image/heic",
+        "jpeg": "image/jpeg",
+        "jpg": "image/jpeg",
+        "png": "image/png",
+        "tiff": "image/tiff",
+        "tif": "image/tiff",
+        "webp": "image/webp",
+    }
+
+    def __init__(
+        self,
+        config: IngestionConfig,
+        database_provider: DatabaseProvider,
+        llm_provider: CompletionProvider,
+    ):
+        self.database_provider = database_provider
+        self.llm_provider = llm_provider
+        self.config = config
+        self.vision_prompt_text = None
+        self.Image = Image
+        self.pillow_heif = pillow_heif
+        self.pillow_heif.register_heif_opener()
+
+    def _is_heic(self, data: bytes) -> bool:
+        """Detect HEIC format using magic numbers and patterns."""
+        heic_patterns = [
+            b"ftyp",
+            b"heic",
+            b"heix",
+            b"hevc",
+            b"HEIC",
+            b"mif1",
+            b"msf1",
+            b"hevc",
+            b"hevx",
+        ]
+
+        try:
+            header = data[:32]  # Get first 32 bytes
+            return any(pattern in header for pattern in heic_patterns)
+        except Exception as e:
+            logger.error(f"Error checking for HEIC format: {str(e)}")
+            return False
+
+    async def _convert_heic_to_jpeg(self, data: bytes) -> bytes:
+        """Convert HEIC image to JPEG format."""
+        try:
+            # Create BytesIO object for input
+            input_buffer = BytesIO(data)
+
+            # Load HEIC image using pillow_heif
+            heif_file = self.pillow_heif.read_heif(input_buffer)
+
+            # Get the primary image - API changed, need to get first image
+            heif_image = heif_file[0]  # Get first image in the container
+
+            # Convert to PIL Image directly from the HEIF image
+            pil_image = heif_image.to_pillow()
+
+            # Convert to RGB if needed
+            if pil_image.mode != "RGB":
+                pil_image = pil_image.convert("RGB")
+
+            # Save as JPEG
+            output_buffer = BytesIO()
+            pil_image.save(output_buffer, format="JPEG", quality=95)
+            return output_buffer.getvalue()
+
+        except Exception as e:
+            logger.error(f"Error converting HEIC to JPEG: {str(e)}")
+            raise
+
+    def _is_jpeg(self, data: bytes) -> bool:
+        """Detect JPEG format using magic numbers."""
+        return len(data) >= 2 and data[0] == 0xFF and data[1] == 0xD8
+
+    def _is_png(self, data: bytes) -> bool:
+        """Detect PNG format using magic numbers."""
+        png_signature = b"\x89PNG\r\n\x1a\n"
+        return data.startswith(png_signature)
+
+    def _is_bmp(self, data: bytes) -> bool:
+        """Detect BMP format using magic numbers."""
+        return data.startswith(b"BM")
+
+    def _is_tiff(self, data: bytes) -> bool:
+        """Detect TIFF format using magic numbers."""
+        return (
+            data.startswith(b"II*\x00")  # Little-endian
+            or data.startswith(b"MM\x00*")
+        )  # Big-endian
+
+    def _get_image_media_type(
+        self, data: bytes, filename: Optional[str] = None
+    ) -> str:
+        """
+        Determine the correct media type based on image data and/or filename.
+
+        Args:
+            data: The binary image data
+            filename: Optional filename which may contain extension information
+
+        Returns:
+            str: The MIME type for the image
+        """
+        try:
+            # First, try format-specific detection functions
+            if self._is_heic(data):
+                return "image/heic"
+            if self._is_jpeg(data):
+                return "image/jpeg"
+            if self._is_png(data):
+                return "image/png"
+            if self._is_bmp(data):
+                return "image/bmp"
+            if self._is_tiff(data):
+                return "image/tiff"
+
+            # Try using filetype as a fallback
+            img_type = filetype.guess(data)
+            if img_type:
+                # Map the detected type to a MIME type
+                return self.MIME_TYPE_MAPPING.get(
+                    img_type, f"image/{img_type}"
+                )
+
+            # If we have a filename, try to get the type from the extension
+            if filename:
+                extension = filename.split(".")[-1].lower()
+                if extension in self.MIME_TYPE_MAPPING:
+                    return self.MIME_TYPE_MAPPING[extension]
+
+            # If all else fails, default to octet-stream (generic binary)
+            logger.warning(
+                "Could not determine image type, using application/octet-stream"
+            )
+            return "application/octet-stream"
+
+        except Exception as e:
+            logger.error(f"Error determining image media type: {str(e)}")
+            return "application/octet-stream"  # Default to generic binary as fallback
+
+    async def ingest(
+        self, data: str | bytes, **kwargs
+    ) -> AsyncGenerator[str, None]:
+        if not self.vision_prompt_text:
+            self.vision_prompt_text = (
+                await self.database_provider.prompts_handler.get_cached_prompt(
+                    prompt_name=self.config.vision_img_prompt_name
+                )
+            )
+        try:
+            filename = kwargs.get("filename", None)
+            # Whether to convert HEIC to JPEG (default: True for backward compatibility)
+            convert_heic = kwargs.get("convert_heic", True)
+
+            if isinstance(data, bytes):
+                try:
+                    # First detect the original media type
+                    original_media_type = self._get_image_media_type(
+                        data, filename
+                    )
+                    logger.debug(
+                        f"Detected original image type: {original_media_type}"
+                    )
+
+                    # Determine if we need to convert HEIC
+                    is_heic_format = self._is_heic(data)
+
+                    # Handle HEIC images
+                    if is_heic_format and convert_heic:
+                        logger.debug(
+                            "Detected HEIC format, converting to JPEG"
+                        )
+                        data = await self._convert_heic_to_jpeg(data)
+                        media_type = "image/jpeg"
+                    else:
+                        # Keep original format and media type
+                        media_type = original_media_type
+
+                    # Encode the data to base64
+                    image_data = base64.b64encode(data).decode("utf-8")
+
+                except Exception as e:
+                    logger.error(f"Error processing image data: {str(e)}")
+                    raise
+            else:
+                # If data is already a string (base64), we assume it has a reliable content type
+                # from the source that encoded it
+                image_data = data
+
+                # Try to determine the media type from the context if available
+                media_type = kwargs.get(
+                    "media_type", "application/octet-stream"
+                )
+
+            # Get the model from kwargs or config
+            model = kwargs.get("vlm", None) or self.config.app.vlm
+
+            generation_config = GenerationConfig(
+                model=model,
+                stream=False,
+            )
+
+            logger.debug(f"Using model: {model}, media_type: {media_type}")
+
+            if "anthropic" in model:
+                messages = [
+                    {
+                        "role": "user",
+                        "content": [
+                            {"type": "text", "text": self.vision_prompt_text},
+                            {
+                                "type": "image",
+                                "source": {
+                                    "type": "base64",
+                                    "media_type": media_type,
+                                    "data": image_data,
+                                },
+                            },
+                        ],
+                    }
+                ]
+            else:
+                # For OpenAI-style APIs, use their format
+                messages = [
+                    {
+                        "role": "user",
+                        "content": [
+                            {"type": "text", "text": self.vision_prompt_text},
+                            {
+                                "type": "image_url",
+                                "image_url": {
+                                    "url": f"data:{media_type};base64,{image_data}"
+                                },
+                            },
+                        ],
+                    }
+                ]
+
+            response = await self.llm_provider.aget_completion(
+                messages=messages, generation_config=generation_config
+            )
+
+            if response.choices and response.choices[0].message:
+                content = response.choices[0].message.content
+                if not content:
+                    raise ValueError("No content in response")
+                yield content
+            else:
+                raise ValueError("No response content")
+
+        except Exception as e:
+            logger.error(f"Error processing image with vision model: {str(e)}")
+            raise