aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py')
-rw-r--r--.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py281
1 files changed, 281 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py
new file mode 100644
index 00000000..bcb37eab
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/core/parsers/media/img_parser.py
@@ -0,0 +1,281 @@
+# type: ignore
+import base64
+import logging
+from io import BytesIO
+from typing import AsyncGenerator, Optional
+
+import filetype
+import pillow_heif
+from PIL import Image
+
+from core.base.abstractions import GenerationConfig
+from core.base.parsers.base_parser import AsyncParser
+from core.base.providers import (
+ CompletionProvider,
+ DatabaseProvider,
+ IngestionConfig,
+)
+
+logger = logging.getLogger()
+
+
+class ImageParser(AsyncParser[str | bytes]):
+ # Mapping of file extensions to MIME types
+ MIME_TYPE_MAPPING = {
+ "bmp": "image/bmp",
+ "gif": "image/gif",
+ "heic": "image/heic",
+ "jpeg": "image/jpeg",
+ "jpg": "image/jpeg",
+ "png": "image/png",
+ "tiff": "image/tiff",
+ "tif": "image/tiff",
+ "webp": "image/webp",
+ }
+
+ def __init__(
+ self,
+ config: IngestionConfig,
+ database_provider: DatabaseProvider,
+ llm_provider: CompletionProvider,
+ ):
+ self.database_provider = database_provider
+ self.llm_provider = llm_provider
+ self.config = config
+ self.vision_prompt_text = None
+ self.Image = Image
+ self.pillow_heif = pillow_heif
+ self.pillow_heif.register_heif_opener()
+
+ def _is_heic(self, data: bytes) -> bool:
+ """Detect HEIC format using magic numbers and patterns."""
+ heic_patterns = [
+ b"ftyp",
+ b"heic",
+ b"heix",
+ b"hevc",
+ b"HEIC",
+ b"mif1",
+ b"msf1",
+ b"hevc",
+ b"hevx",
+ ]
+
+ try:
+ header = data[:32] # Get first 32 bytes
+ return any(pattern in header for pattern in heic_patterns)
+ except Exception as e:
+ logger.error(f"Error checking for HEIC format: {str(e)}")
+ return False
+
+ async def _convert_heic_to_jpeg(self, data: bytes) -> bytes:
+ """Convert HEIC image to JPEG format."""
+ try:
+ # Create BytesIO object for input
+ input_buffer = BytesIO(data)
+
+ # Load HEIC image using pillow_heif
+ heif_file = self.pillow_heif.read_heif(input_buffer)
+
+ # Get the primary image - API changed, need to get first image
+ heif_image = heif_file[0] # Get first image in the container
+
+ # Convert to PIL Image directly from the HEIF image
+ pil_image = heif_image.to_pillow()
+
+ # Convert to RGB if needed
+ if pil_image.mode != "RGB":
+ pil_image = pil_image.convert("RGB")
+
+ # Save as JPEG
+ output_buffer = BytesIO()
+ pil_image.save(output_buffer, format="JPEG", quality=95)
+ return output_buffer.getvalue()
+
+ except Exception as e:
+ logger.error(f"Error converting HEIC to JPEG: {str(e)}")
+ raise
+
+ def _is_jpeg(self, data: bytes) -> bool:
+ """Detect JPEG format using magic numbers."""
+ return len(data) >= 2 and data[0] == 0xFF and data[1] == 0xD8
+
+ def _is_png(self, data: bytes) -> bool:
+ """Detect PNG format using magic numbers."""
+ png_signature = b"\x89PNG\r\n\x1a\n"
+ return data.startswith(png_signature)
+
+ def _is_bmp(self, data: bytes) -> bool:
+ """Detect BMP format using magic numbers."""
+ return data.startswith(b"BM")
+
+ def _is_tiff(self, data: bytes) -> bool:
+ """Detect TIFF format using magic numbers."""
+ return (
+ data.startswith(b"II*\x00") # Little-endian
+ or data.startswith(b"MM\x00*")
+ ) # Big-endian
+
+ def _get_image_media_type(
+ self, data: bytes, filename: Optional[str] = None
+ ) -> str:
+ """
+ Determine the correct media type based on image data and/or filename.
+
+ Args:
+ data: The binary image data
+ filename: Optional filename which may contain extension information
+
+ Returns:
+ str: The MIME type for the image
+ """
+ try:
+ # First, try format-specific detection functions
+ if self._is_heic(data):
+ return "image/heic"
+ if self._is_jpeg(data):
+ return "image/jpeg"
+ if self._is_png(data):
+ return "image/png"
+ if self._is_bmp(data):
+ return "image/bmp"
+ if self._is_tiff(data):
+ return "image/tiff"
+
+ # Try using filetype as a fallback
+ img_type = filetype.guess(data)
+ if img_type:
+ # Map the detected type to a MIME type
+ return self.MIME_TYPE_MAPPING.get(
+ img_type, f"image/{img_type}"
+ )
+
+ # If we have a filename, try to get the type from the extension
+ if filename:
+ extension = filename.split(".")[-1].lower()
+ if extension in self.MIME_TYPE_MAPPING:
+ return self.MIME_TYPE_MAPPING[extension]
+
+ # If all else fails, default to octet-stream (generic binary)
+ logger.warning(
+ "Could not determine image type, using application/octet-stream"
+ )
+ return "application/octet-stream"
+
+ except Exception as e:
+ logger.error(f"Error determining image media type: {str(e)}")
+ return "application/octet-stream" # Default to generic binary as fallback
+
+ async def ingest(
+ self, data: str | bytes, **kwargs
+ ) -> AsyncGenerator[str, None]:
+ if not self.vision_prompt_text:
+ self.vision_prompt_text = (
+ await self.database_provider.prompts_handler.get_cached_prompt(
+ prompt_name=self.config.vision_img_prompt_name
+ )
+ )
+ try:
+ filename = kwargs.get("filename", None)
+ # Whether to convert HEIC to JPEG (default: True for backward compatibility)
+ convert_heic = kwargs.get("convert_heic", True)
+
+ if isinstance(data, bytes):
+ try:
+ # First detect the original media type
+ original_media_type = self._get_image_media_type(
+ data, filename
+ )
+ logger.debug(
+ f"Detected original image type: {original_media_type}"
+ )
+
+ # Determine if we need to convert HEIC
+ is_heic_format = self._is_heic(data)
+
+ # Handle HEIC images
+ if is_heic_format and convert_heic:
+ logger.debug(
+ "Detected HEIC format, converting to JPEG"
+ )
+ data = await self._convert_heic_to_jpeg(data)
+ media_type = "image/jpeg"
+ else:
+ # Keep original format and media type
+ media_type = original_media_type
+
+ # Encode the data to base64
+ image_data = base64.b64encode(data).decode("utf-8")
+
+ except Exception as e:
+ logger.error(f"Error processing image data: {str(e)}")
+ raise
+ else:
+ # If data is already a string (base64), we assume it has a reliable content type
+ # from the source that encoded it
+ image_data = data
+
+ # Try to determine the media type from the context if available
+ media_type = kwargs.get(
+ "media_type", "application/octet-stream"
+ )
+
+ # Get the model from kwargs or config
+ model = kwargs.get("vlm", None) or self.config.app.vlm
+
+ generation_config = GenerationConfig(
+ model=model,
+ stream=False,
+ )
+
+ logger.debug(f"Using model: {model}, media_type: {media_type}")
+
+ if "anthropic" in model:
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": self.vision_prompt_text},
+ {
+ "type": "image",
+ "source": {
+ "type": "base64",
+ "media_type": media_type,
+ "data": image_data,
+ },
+ },
+ ],
+ }
+ ]
+ else:
+ # For OpenAI-style APIs, use their format
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {"type": "text", "text": self.vision_prompt_text},
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:{media_type};base64,{image_data}"
+ },
+ },
+ ],
+ }
+ ]
+
+ response = await self.llm_provider.aget_completion(
+ messages=messages, generation_config=generation_config
+ )
+
+ if response.choices and response.choices[0].message:
+ content = response.choices[0].message.content
+ if not content:
+ raise ValueError("No content in response")
+ yield content
+ else:
+ raise ValueError("No response content")
+
+ except Exception as e:
+ logger.error(f"Error processing image with vision model: {str(e)}")
+ raise