diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py | 281 |
1 files changed, 281 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py new file mode 100644 index 00000000..bcb37eab --- /dev/null +++ b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py @@ -0,0 +1,281 @@ +# type: ignore +import base64 +import logging +from io import BytesIO +from typing import AsyncGenerator, Optional + +import filetype +import pillow_heif +from PIL import Image + +from core.base.abstractions import GenerationConfig +from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + +logger = logging.getLogger() + + +class ImageParser(AsyncParser[str | bytes]): + # Mapping of file extensions to MIME types + MIME_TYPE_MAPPING = { + "bmp": "image/bmp", + "gif": "image/gif", + "heic": "image/heic", + "jpeg": "image/jpeg", + "jpg": "image/jpeg", + "png": "image/png", + "tiff": "image/tiff", + "tif": "image/tiff", + "webp": "image/webp", + } + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.vision_prompt_text = None + self.Image = Image + self.pillow_heif = pillow_heif + self.pillow_heif.register_heif_opener() + + def _is_heic(self, data: bytes) -> bool: + """Detect HEIC format using magic numbers and patterns.""" + heic_patterns = [ + b"ftyp", + b"heic", + b"heix", + b"hevc", + b"HEIC", + b"mif1", + b"msf1", + b"hevc", + b"hevx", + ] + + try: + header = data[:32] # Get first 32 bytes + return any(pattern in header for pattern in heic_patterns) + except Exception as e: + logger.error(f"Error checking for HEIC format: {str(e)}") + return False + + async def _convert_heic_to_jpeg(self, data: bytes) -> bytes: + """Convert HEIC image to JPEG format.""" + try: + # Create BytesIO object for input + input_buffer = BytesIO(data) + + # Load HEIC image using pillow_heif + heif_file = self.pillow_heif.read_heif(input_buffer) + + # Get the primary image - API changed, need to get first image + heif_image = heif_file[0] # Get first image in the container + + # Convert to PIL Image directly from the HEIF image + pil_image = heif_image.to_pillow() + + # Convert to RGB if needed + if pil_image.mode != "RGB": + pil_image = pil_image.convert("RGB") + + # Save as JPEG + output_buffer = BytesIO() + pil_image.save(output_buffer, format="JPEG", quality=95) + return output_buffer.getvalue() + + except Exception as e: + logger.error(f"Error converting HEIC to JPEG: {str(e)}") + raise + + def _is_jpeg(self, data: bytes) -> bool: + """Detect JPEG format using magic numbers.""" + return len(data) >= 2 and data[0] == 0xFF and data[1] == 0xD8 + + def _is_png(self, data: bytes) -> bool: + """Detect PNG format using magic numbers.""" + png_signature = b"\x89PNG\r\n\x1a\n" + return data.startswith(png_signature) + + def _is_bmp(self, data: bytes) -> bool: + """Detect BMP format using magic numbers.""" + return data.startswith(b"BM") + + def _is_tiff(self, data: bytes) -> bool: + """Detect TIFF format using magic numbers.""" + return ( + data.startswith(b"II*\x00") # Little-endian + or data.startswith(b"MM\x00*") + ) # Big-endian + + def _get_image_media_type( + self, data: bytes, filename: Optional[str] = None + ) -> str: + """ + Determine the correct media type based on image data and/or filename. + + Args: + data: The binary image data + filename: Optional filename which may contain extension information + + Returns: + str: The MIME type for the image + """ + try: + # First, try format-specific detection functions + if self._is_heic(data): + return "image/heic" + if self._is_jpeg(data): + return "image/jpeg" + if self._is_png(data): + return "image/png" + if self._is_bmp(data): + return "image/bmp" + if self._is_tiff(data): + return "image/tiff" + + # Try using filetype as a fallback + img_type = filetype.guess(data) + if img_type: + # Map the detected type to a MIME type + return self.MIME_TYPE_MAPPING.get( + img_type, f"image/{img_type}" + ) + + # If we have a filename, try to get the type from the extension + if filename: + extension = filename.split(".")[-1].lower() + if extension in self.MIME_TYPE_MAPPING: + return self.MIME_TYPE_MAPPING[extension] + + # If all else fails, default to octet-stream (generic binary) + logger.warning( + "Could not determine image type, using application/octet-stream" + ) + return "application/octet-stream" + + except Exception as e: + logger.error(f"Error determining image media type: {str(e)}") + return "application/octet-stream" # Default to generic binary as fallback + + async def ingest( + self, data: str | bytes, **kwargs + ) -> AsyncGenerator[str, None]: + if not self.vision_prompt_text: + self.vision_prompt_text = ( + await self.database_provider.prompts_handler.get_cached_prompt( + prompt_name=self.config.vision_img_prompt_name + ) + ) + try: + filename = kwargs.get("filename", None) + # Whether to convert HEIC to JPEG (default: True for backward compatibility) + convert_heic = kwargs.get("convert_heic", True) + + if isinstance(data, bytes): + try: + # First detect the original media type + original_media_type = self._get_image_media_type( + data, filename + ) + logger.debug( + f"Detected original image type: {original_media_type}" + ) + + # Determine if we need to convert HEIC + is_heic_format = self._is_heic(data) + + # Handle HEIC images + if is_heic_format and convert_heic: + logger.debug( + "Detected HEIC format, converting to JPEG" + ) + data = await self._convert_heic_to_jpeg(data) + media_type = "image/jpeg" + else: + # Keep original format and media type + media_type = original_media_type + + # Encode the data to base64 + image_data = base64.b64encode(data).decode("utf-8") + + except Exception as e: + logger.error(f"Error processing image data: {str(e)}") + raise + else: + # If data is already a string (base64), we assume it has a reliable content type + # from the source that encoded it + image_data = data + + # Try to determine the media type from the context if available + media_type = kwargs.get( + "media_type", "application/octet-stream" + ) + + # Get the model from kwargs or config + model = kwargs.get("vlm", None) or self.config.app.vlm + + generation_config = GenerationConfig( + model=model, + stream=False, + ) + + logger.debug(f"Using model: {model}, media_type: {media_type}") + + if "anthropic" in model: + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": self.vision_prompt_text}, + { + "type": "image", + "source": { + "type": "base64", + "media_type": media_type, + "data": image_data, + }, + }, + ], + } + ] + else: + # For OpenAI-style APIs, use their format + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": self.vision_prompt_text}, + { + "type": "image_url", + "image_url": { + "url": f"data:{media_type};base64,{image_data}" + }, + }, + ], + } + ] + + response = await self.llm_provider.aget_completion( + messages=messages, generation_config=generation_config + ) + + if response.choices and response.choices[0].message: + content = response.choices[0].message.content + if not content: + raise ValueError("No content in response") + yield content + else: + raise ValueError("No response content") + + except Exception as e: + logger.error(f"Error processing image with vision model: {str(e)}") + raise |