aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/pypdf/_utils.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/pypdf/_utils.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/pypdf/_utils.py')
-rw-r--r--.venv/lib/python3.12/site-packages/pypdf/_utils.py683
1 files changed, 683 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pypdf/_utils.py b/.venv/lib/python3.12/site-packages/pypdf/_utils.py
new file mode 100644
index 00000000..97565369
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/pypdf/_utils.py
@@ -0,0 +1,683 @@
+# Copyright (c) 2006, Mathieu Fenniak
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+# * Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
+# and/or other materials provided with the distribution.
+# * The name of the author may not be used to endorse or promote products
+# derived from this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+"""Utility functions for PDF library."""
+__author__ = "Mathieu Fenniak"
+__author_email__ = "biziqe@mathieu.fenniak.net"
+
+import functools
+import logging
+import re
+import sys
+import warnings
+from dataclasses import dataclass
+from datetime import datetime, timezone
+from io import DEFAULT_BUFFER_SIZE, BytesIO
+from os import SEEK_CUR
+from typing import (
+ IO,
+ Any,
+ Dict,
+ List,
+ Optional,
+ Pattern,
+ Tuple,
+ Union,
+ cast,
+ overload,
+)
+
+if sys.version_info[:2] >= (3, 10):
+ # Python 3.10+: https://www.python.org/dev/peps/pep-0484/
+ from typing import TypeAlias
+else:
+ from typing_extensions import TypeAlias
+
+from .errors import (
+ STREAM_TRUNCATED_PREMATURELY,
+ DeprecationError,
+ PdfStreamError,
+)
+
+TransformationMatrixType: TypeAlias = Tuple[
+ Tuple[float, float, float], Tuple[float, float, float], Tuple[float, float, float]
+]
+CompressedTransformationMatrix: TypeAlias = Tuple[
+ float, float, float, float, float, float
+]
+
+StreamType = IO[Any]
+StrByteType = Union[str, StreamType]
+
+
+def parse_iso8824_date(text: Optional[str]) -> Optional[datetime]:
+ orgtext = text
+ if text is None:
+ return None
+ if text[0].isdigit():
+ text = "D:" + text
+ if text.endswith(("Z", "z")):
+ text += "0000"
+ text = text.replace("z", "+").replace("Z", "+").replace("'", "")
+ i = max(text.find("+"), text.find("-"))
+ if i > 0 and i != len(text) - 5:
+ text += "00"
+ for f in (
+ "D:%Y",
+ "D:%Y%m",
+ "D:%Y%m%d",
+ "D:%Y%m%d%H",
+ "D:%Y%m%d%H%M",
+ "D:%Y%m%d%H%M%S",
+ "D:%Y%m%d%H%M%S%z",
+ ):
+ try:
+ d = datetime.strptime(text, f) # noqa: DTZ007
+ except ValueError:
+ continue
+ else:
+ if text.endswith("+0000"):
+ d = d.replace(tzinfo=timezone.utc)
+ return d
+ raise ValueError(f"Can not convert date: {orgtext}")
+
+
+def _get_max_pdf_version_header(header1: str, header2: str) -> str:
+ versions = (
+ "%PDF-1.3",
+ "%PDF-1.4",
+ "%PDF-1.5",
+ "%PDF-1.6",
+ "%PDF-1.7",
+ "%PDF-2.0",
+ )
+ pdf_header_indices = []
+ if header1 in versions:
+ pdf_header_indices.append(versions.index(header1))
+ if header2 in versions:
+ pdf_header_indices.append(versions.index(header2))
+ if len(pdf_header_indices) == 0:
+ raise ValueError(f"neither {header1!r} nor {header2!r} are proper headers")
+ return versions[max(pdf_header_indices)]
+
+
+def read_until_whitespace(stream: StreamType, maxchars: Optional[int] = None) -> bytes:
+ """
+ Read non-whitespace characters and return them.
+
+ Stops upon encountering whitespace or when maxchars is reached.
+
+ Args:
+ stream: The data stream from which was read.
+ maxchars: The maximum number of bytes returned; by default unlimited.
+
+ Returns:
+ The data which was read.
+ """
+ txt = b""
+ while True:
+ tok = stream.read(1)
+ if tok.isspace() or not tok:
+ break
+ txt += tok
+ if len(txt) == maxchars:
+ break
+ return txt
+
+
+def read_non_whitespace(stream: StreamType) -> bytes:
+ """
+ Find and read the next non-whitespace character (ignores whitespace).
+
+ Args:
+ stream: The data stream from which was read.
+
+ Returns:
+ The data which was read.
+ """
+ tok = stream.read(1)
+ while tok in WHITESPACES:
+ tok = stream.read(1)
+ return tok
+
+
+def skip_over_whitespace(stream: StreamType) -> bool:
+ """
+ Similar to read_non_whitespace, but return a boolean if more than one
+ whitespace character was read.
+
+ Args:
+ stream: The data stream from which was read.
+
+ Returns:
+ True if more than one whitespace was skipped, otherwise return False.
+ """
+ tok = WHITESPACES[0]
+ cnt = 0
+ while tok in WHITESPACES:
+ tok = stream.read(1)
+ cnt += 1
+ return cnt > 1
+
+
+def check_if_whitespace_only(value: bytes) -> bool:
+ """
+ Check if the given value consists of whitespace characters only.
+
+ Args:
+ value: The bytes to check.
+
+ Returns:
+ True if the value only has whitespace characters, otherwise return False.
+ """
+ for index in range(len(value)):
+ current = value[index : index + 1]
+ if current not in WHITESPACES:
+ return False
+ return True
+
+
+def skip_over_comment(stream: StreamType) -> None:
+ tok = stream.read(1)
+ stream.seek(-1, 1)
+ if tok == b"%":
+ while tok not in (b"\n", b"\r"):
+ tok = stream.read(1)
+
+
+def read_until_regex(stream: StreamType, regex: Pattern[bytes]) -> bytes:
+ """
+ Read until the regular expression pattern matched (ignore the match).
+ Treats EOF on the underlying stream as the end of the token to be matched.
+
+ Args:
+ regex: re.Pattern
+
+ Returns:
+ The read bytes.
+ """
+ name = b""
+ while True:
+ tok = stream.read(16)
+ if not tok:
+ return name
+ m = regex.search(name + tok)
+ if m is not None:
+ stream.seek(m.start() - (len(name) + len(tok)), 1)
+ name = (name + tok)[: m.start()]
+ break
+ name += tok
+ return name
+
+
+def read_block_backwards(stream: StreamType, to_read: int) -> bytes:
+ """
+ Given a stream at position X, read a block of size to_read ending at position X.
+
+ This changes the stream's position to the beginning of where the block was
+ read.
+
+ Args:
+ stream:
+ to_read:
+
+ Returns:
+ The data which was read.
+ """
+ if stream.tell() < to_read:
+ raise PdfStreamError("Could not read malformed PDF file")
+ # Seek to the start of the block we want to read.
+ stream.seek(-to_read, SEEK_CUR)
+ read = stream.read(to_read)
+ # Seek to the start of the block we read after reading it.
+ stream.seek(-to_read, SEEK_CUR)
+ return read
+
+
+def read_previous_line(stream: StreamType) -> bytes:
+ """
+ Given a byte stream with current position X, return the previous line.
+
+ All characters between the first CR/LF byte found before X
+ (or, the start of the file, if no such byte is found) and position X
+ After this call, the stream will be positioned one byte after the
+ first non-CRLF character found beyond the first CR/LF byte before X,
+ or, if no such byte is found, at the beginning of the stream.
+
+ Args:
+ stream: StreamType:
+
+ Returns:
+ The data which was read.
+ """
+ line_content = []
+ found_crlf = False
+ if stream.tell() == 0:
+ raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
+ while True:
+ to_read = min(DEFAULT_BUFFER_SIZE, stream.tell())
+ if to_read == 0:
+ break
+ # Read the block. After this, our stream will be one
+ # beyond the initial position.
+ block = read_block_backwards(stream, to_read)
+ idx = len(block) - 1
+ if not found_crlf:
+ # We haven't found our first CR/LF yet.
+ # Read off characters until we hit one.
+ while idx >= 0 and block[idx] not in b"\r\n":
+ idx -= 1
+ if idx >= 0:
+ found_crlf = True
+ if found_crlf:
+ # We found our first CR/LF already (on this block or
+ # a previous one).
+ # Our combined line is the remainder of the block
+ # plus any previously read blocks.
+ line_content.append(block[idx + 1 :])
+ # Continue to read off any more CRLF characters.
+ while idx >= 0 and block[idx] in b"\r\n":
+ idx -= 1
+ else:
+ # Didn't find CR/LF yet - add this block to our
+ # previously read blocks and continue.
+ line_content.append(block)
+ if idx >= 0:
+ # We found the next non-CRLF character.
+ # Set the stream position correctly, then break
+ stream.seek(idx + 1, SEEK_CUR)
+ break
+ # Join all the blocks in the line (which are in reverse order)
+ return b"".join(line_content[::-1])
+
+
+def matrix_multiply(
+ a: TransformationMatrixType, b: TransformationMatrixType
+) -> TransformationMatrixType:
+ return tuple( # type: ignore[return-value]
+ tuple(sum(float(i) * float(j) for i, j in zip(row, col)) for col in zip(*b))
+ for row in a
+ )
+
+
+def mark_location(stream: StreamType) -> None:
+ """Create text file showing current location in context."""
+ # Mainly for debugging
+ radius = 5000
+ stream.seek(-radius, 1)
+ with open("pypdf_pdfLocation.txt", "wb") as output_fh:
+ output_fh.write(stream.read(radius))
+ output_fh.write(b"HERE")
+ output_fh.write(stream.read(radius))
+ stream.seek(-radius, 1)
+
+
+B_CACHE: Dict[Union[str, bytes], bytes] = {}
+
+
+def b_(s: Union[str, bytes]) -> bytes:
+ if isinstance(s, bytes):
+ return s
+ bc = B_CACHE
+ if s in bc:
+ return bc[s]
+ try:
+ r = s.encode("latin-1")
+ if len(s) < 2:
+ bc[s] = r
+ return r
+ except Exception:
+ r = s.encode("utf-8")
+ if len(s) < 2:
+ bc[s] = r
+ return r
+
+
+def str_(b: Any) -> str:
+ if isinstance(b, bytes):
+ return b.decode("latin-1")
+ else:
+ return str(b) # will return b.__str__() if defined
+
+
+@overload
+def ord_(b: str) -> int:
+ ...
+
+
+@overload
+def ord_(b: bytes) -> bytes:
+ ...
+
+
+@overload
+def ord_(b: int) -> int:
+ ...
+
+
+def ord_(b: Union[int, str, bytes]) -> Union[int, bytes]:
+ if isinstance(b, str):
+ return ord(b)
+ return b
+
+
+WHITESPACES = (b" ", b"\n", b"\r", b"\t", b"\x00")
+WHITESPACES_AS_BYTES = b"".join(WHITESPACES)
+WHITESPACES_AS_REGEXP = b"[" + WHITESPACES_AS_BYTES + b"]"
+
+
+def paeth_predictor(left: int, up: int, up_left: int) -> int:
+ p = left + up - up_left
+ dist_left = abs(p - left)
+ dist_up = abs(p - up)
+ dist_up_left = abs(p - up_left)
+
+ if dist_left <= dist_up and dist_left <= dist_up_left:
+ return left
+ elif dist_up <= dist_up_left:
+ return up
+ else:
+ return up_left
+
+
+def deprecate(msg: str, stacklevel: int = 3) -> None:
+ warnings.warn(msg, DeprecationWarning, stacklevel=stacklevel)
+
+
+def deprecation(msg: str) -> None:
+ raise DeprecationError(msg)
+
+
+def deprecate_with_replacement(old_name: str, new_name: str, removed_in: str) -> None:
+ """Raise an exception that a feature will be removed, but has a replacement."""
+ deprecate(f"{old_name} is deprecated and will be removed in pypdf {removed_in}. Use {new_name} instead.", 4)
+
+
+def deprecation_with_replacement(old_name: str, new_name: str, removed_in: str) -> None:
+ """Raise an exception that a feature was already removed, but has a replacement."""
+ deprecation(f"{old_name} is deprecated and was removed in pypdf {removed_in}. Use {new_name} instead.")
+
+
+def deprecate_no_replacement(name: str, removed_in: str) -> None:
+ """Raise an exception that a feature will be removed without replacement."""
+ deprecate(f"{name} is deprecated and will be removed in pypdf {removed_in}.", 4)
+
+
+def deprecation_no_replacement(name: str, removed_in: str) -> None:
+ """Raise an exception that a feature was already removed without replacement."""
+ deprecation(f"{name} is deprecated and was removed in pypdf {removed_in}.")
+
+
+def logger_error(msg: str, src: str) -> None:
+ """
+ Use this instead of logger.error directly.
+
+ That allows people to overwrite it more easily.
+
+ See the docs on when to use which:
+ https://pypdf.readthedocs.io/en/latest/user/suppress-warnings.html
+ """
+ logging.getLogger(src).error(msg)
+
+
+def logger_warning(msg: str, src: str) -> None:
+ """
+ Use this instead of logger.warning directly.
+
+ That allows people to overwrite it more easily.
+
+ ## Exception, warnings.warn, logger_warning
+ - Exceptions should be used if the user should write code that deals with
+ an error case, e.g. the PDF being completely broken.
+ - warnings.warn should be used if the user needs to fix their code, e.g.
+ DeprecationWarnings
+ - logger_warning should be used if the user needs to know that an issue was
+ handled by pypdf, e.g. a non-compliant PDF being read in a way that
+ pypdf could apply a robustness fix to still read it. This applies mainly
+ to strict=False mode.
+ """
+ logging.getLogger(src).warning(msg)
+
+
+def rename_kwargs(
+ func_name: str, kwargs: Dict[str, Any], aliases: Dict[str, str], fail: bool = False
+) -> None:
+ """
+ Helper function to deprecate arguments.
+
+ Args:
+ func_name: Name of the function to be deprecated
+ kwargs:
+ aliases:
+ fail:
+ """
+ for old_term, new_term in aliases.items():
+ if old_term in kwargs:
+ if fail:
+ raise DeprecationError(
+ f"{old_term} is deprecated as an argument. Use {new_term} instead"
+ )
+ if new_term in kwargs:
+ raise TypeError(
+ f"{func_name} received both {old_term} and {new_term} as "
+ f"an argument. {old_term} is deprecated. "
+ f"Use {new_term} instead."
+ )
+ kwargs[new_term] = kwargs.pop(old_term)
+ warnings.warn(
+ message=(
+ f"{old_term} is deprecated as an argument. Use {new_term} instead"
+ ),
+ category=DeprecationWarning,
+ )
+
+
+def _human_readable_bytes(bytes: int) -> str:
+ if bytes < 10**3:
+ return f"{bytes} Byte"
+ elif bytes < 10**6:
+ return f"{bytes / 10**3:.1f} kB"
+ elif bytes < 10**9:
+ return f"{bytes / 10**6:.1f} MB"
+ else:
+ return f"{bytes / 10**9:.1f} GB"
+
+
+# The following class has been copied from Django:
+# https://github.com/django/django/blob/adae619426b6f50046b3daaa744db52989c9d6db/django/utils/functional.py#L51-L65
+#
+# Original license:
+#
+# ---------------------------------------------------------------------------------
+# Copyright (c) Django Software Foundation and individual contributors.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without modification,
+# are permitted provided that the following conditions are met:
+#
+# 1. Redistributions of source code must retain the above copyright notice,
+# this list of conditions and the following disclaimer.
+#
+# 2. Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# 3. Neither the name of Django nor the names of its contributors may be used
+# to endorse or promote products derived from this software without
+# specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
+# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
+# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+# ---------------------------------------------------------------------------------
+class classproperty: # noqa: N801
+ """
+ Decorator that converts a method with a single cls argument into a property
+ that can be accessed directly from the class.
+ """
+
+ def __init__(self, method=None): # type: ignore # noqa: ANN001
+ self.fget = method
+
+ def __get__(self, instance, cls=None) -> Any: # type: ignore # noqa: ANN001
+ return self.fget(cls)
+
+ def getter(self, method): # type: ignore # noqa: ANN001, ANN202
+ self.fget = method
+ return self
+
+
+@dataclass
+class File:
+ from .generic import IndirectObject
+
+ name: str
+ data: bytes
+ image: Optional[Any] = None # optional ; direct image access
+ indirect_reference: Optional[IndirectObject] = None # optional ; link to PdfObject
+
+ def __str__(self) -> str:
+ return f"{self.__class__.__name__}(name={self.name}, data: {_human_readable_bytes(len(self.data))})"
+
+ def __repr__(self) -> str:
+ return self.__str__()[:-1] + f", hash: {hash(self.data)})"
+
+
+@dataclass
+class ImageFile(File):
+ from .generic import IndirectObject
+
+ image: Optional[Any] = None # optional ; direct PIL image access
+ indirect_reference: Optional[IndirectObject] = None # optional ; link to PdfObject
+
+ def replace(self, new_image: Any, **kwargs: Any) -> None:
+ """
+ Replace the Image with a new PIL image.
+
+ Args:
+ new_image (PIL.Image.Image): The new PIL image to replace the existing image.
+ **kwargs: Additional keyword arguments to pass to `Image.Image.save()`.
+
+ Raises:
+ TypeError: If the image is inline or in a PdfReader.
+ TypeError: If the image does not belong to a PdfWriter.
+ TypeError: If `new_image` is not a PIL Image.
+
+ Note:
+ This method replaces the existing image with a new image.
+ It is not allowed for inline images or images within a PdfReader.
+ The `kwargs` parameter allows passing additional parameters
+ to `Image.Image.save()`, such as quality.
+ """
+ from PIL import Image
+
+ from ._reader import PdfReader
+
+ # to prevent circular import
+ from .filters import _xobj_to_image
+ from .generic import DictionaryObject, PdfObject
+
+ if self.indirect_reference is None:
+ raise TypeError("Can not update an inline image")
+ if not hasattr(self.indirect_reference.pdf, "_id_translated"):
+ raise TypeError("Can not update an image not belonging to a PdfWriter")
+ if not isinstance(new_image, Image.Image):
+ raise TypeError("new_image shall be a PIL Image")
+ b = BytesIO()
+ new_image.save(b, "PDF", **kwargs)
+ reader = PdfReader(b)
+ assert reader.pages[0].images[0].indirect_reference is not None
+ self.indirect_reference.pdf._objects[self.indirect_reference.idnum - 1] = (
+ reader.pages[0].images[0].indirect_reference.get_object()
+ )
+ cast(
+ PdfObject, self.indirect_reference.get_object()
+ ).indirect_reference = self.indirect_reference
+ # change the object attributes
+ extension, byte_stream, img = _xobj_to_image(
+ cast(DictionaryObject, self.indirect_reference.get_object())
+ )
+ assert extension is not None
+ self.name = self.name[: self.name.rfind(".")] + extension
+ self.data = byte_stream
+ self.image = img
+
+
+@functools.total_ordering
+class Version:
+ COMPONENT_PATTERN = re.compile(r"^(\d+)(.*)$")
+
+ def __init__(self, version_str: str) -> None:
+ self.version_str = version_str
+ self.components = self._parse_version(version_str)
+
+ def _parse_version(self, version_str: str) -> List[Tuple[int, str]]:
+ components = version_str.split(".")
+ parsed_components = []
+ for component in components:
+ match = Version.COMPONENT_PATTERN.match(component)
+ if not match:
+ parsed_components.append((0, component))
+ continue
+ integer_prefix = match.group(1)
+ suffix = match.group(2)
+ if integer_prefix is None:
+ integer_prefix = 0
+ parsed_components.append((int(integer_prefix), suffix))
+ return parsed_components
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, Version):
+ return False
+ return self.components == other.components
+
+ def __lt__(self, other: Any) -> bool:
+ if not isinstance(other, Version):
+ raise ValueError(f"Version cannot be compared against {type(other)}")
+ min_len = min(len(self.components), len(other.components))
+ for i in range(min_len):
+ self_value, self_suffix = self.components[i]
+ other_value, other_suffix = other.components[i]
+
+ if self_value < other_value:
+ return True
+ elif self_value > other_value:
+ return False
+
+ if self_suffix < other_suffix:
+ return True
+ elif self_suffix > other_suffix:
+ return False
+
+ return len(self.components) < len(other.components)