diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/pypdf/_utils.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/pypdf/_utils.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/pypdf/_utils.py | 683 |
1 files changed, 683 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/pypdf/_utils.py b/.venv/lib/python3.12/site-packages/pypdf/_utils.py new file mode 100644 index 00000000..97565369 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/pypdf/_utils.py @@ -0,0 +1,683 @@ +# Copyright (c) 2006, Mathieu Fenniak +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * The name of the author may not be used to endorse or promote products +# derived from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +"""Utility functions for PDF library.""" +__author__ = "Mathieu Fenniak" +__author_email__ = "biziqe@mathieu.fenniak.net" + +import functools +import logging +import re +import sys +import warnings +from dataclasses import dataclass +from datetime import datetime, timezone +from io import DEFAULT_BUFFER_SIZE, BytesIO +from os import SEEK_CUR +from typing import ( + IO, + Any, + Dict, + List, + Optional, + Pattern, + Tuple, + Union, + cast, + overload, +) + +if sys.version_info[:2] >= (3, 10): + # Python 3.10+: https://www.python.org/dev/peps/pep-0484/ + from typing import TypeAlias +else: + from typing_extensions import TypeAlias + +from .errors import ( + STREAM_TRUNCATED_PREMATURELY, + DeprecationError, + PdfStreamError, +) + +TransformationMatrixType: TypeAlias = Tuple[ + Tuple[float, float, float], Tuple[float, float, float], Tuple[float, float, float] +] +CompressedTransformationMatrix: TypeAlias = Tuple[ + float, float, float, float, float, float +] + +StreamType = IO[Any] +StrByteType = Union[str, StreamType] + + +def parse_iso8824_date(text: Optional[str]) -> Optional[datetime]: + orgtext = text + if text is None: + return None + if text[0].isdigit(): + text = "D:" + text + if text.endswith(("Z", "z")): + text += "0000" + text = text.replace("z", "+").replace("Z", "+").replace("'", "") + i = max(text.find("+"), text.find("-")) + if i > 0 and i != len(text) - 5: + text += "00" + for f in ( + "D:%Y", + "D:%Y%m", + "D:%Y%m%d", + "D:%Y%m%d%H", + "D:%Y%m%d%H%M", + "D:%Y%m%d%H%M%S", + "D:%Y%m%d%H%M%S%z", + ): + try: + d = datetime.strptime(text, f) # noqa: DTZ007 + except ValueError: + continue + else: + if text.endswith("+0000"): + d = d.replace(tzinfo=timezone.utc) + return d + raise ValueError(f"Can not convert date: {orgtext}") + + +def _get_max_pdf_version_header(header1: str, header2: str) -> str: + versions = ( + "%PDF-1.3", + "%PDF-1.4", + "%PDF-1.5", + "%PDF-1.6", + "%PDF-1.7", + "%PDF-2.0", + ) + pdf_header_indices = [] + if header1 in versions: + pdf_header_indices.append(versions.index(header1)) + if header2 in versions: + pdf_header_indices.append(versions.index(header2)) + if len(pdf_header_indices) == 0: + raise ValueError(f"neither {header1!r} nor {header2!r} are proper headers") + return versions[max(pdf_header_indices)] + + +def read_until_whitespace(stream: StreamType, maxchars: Optional[int] = None) -> bytes: + """ + Read non-whitespace characters and return them. + + Stops upon encountering whitespace or when maxchars is reached. + + Args: + stream: The data stream from which was read. + maxchars: The maximum number of bytes returned; by default unlimited. + + Returns: + The data which was read. + """ + txt = b"" + while True: + tok = stream.read(1) + if tok.isspace() or not tok: + break + txt += tok + if len(txt) == maxchars: + break + return txt + + +def read_non_whitespace(stream: StreamType) -> bytes: + """ + Find and read the next non-whitespace character (ignores whitespace). + + Args: + stream: The data stream from which was read. + + Returns: + The data which was read. + """ + tok = stream.read(1) + while tok in WHITESPACES: + tok = stream.read(1) + return tok + + +def skip_over_whitespace(stream: StreamType) -> bool: + """ + Similar to read_non_whitespace, but return a boolean if more than one + whitespace character was read. + + Args: + stream: The data stream from which was read. + + Returns: + True if more than one whitespace was skipped, otherwise return False. + """ + tok = WHITESPACES[0] + cnt = 0 + while tok in WHITESPACES: + tok = stream.read(1) + cnt += 1 + return cnt > 1 + + +def check_if_whitespace_only(value: bytes) -> bool: + """ + Check if the given value consists of whitespace characters only. + + Args: + value: The bytes to check. + + Returns: + True if the value only has whitespace characters, otherwise return False. + """ + for index in range(len(value)): + current = value[index : index + 1] + if current not in WHITESPACES: + return False + return True + + +def skip_over_comment(stream: StreamType) -> None: + tok = stream.read(1) + stream.seek(-1, 1) + if tok == b"%": + while tok not in (b"\n", b"\r"): + tok = stream.read(1) + + +def read_until_regex(stream: StreamType, regex: Pattern[bytes]) -> bytes: + """ + Read until the regular expression pattern matched (ignore the match). + Treats EOF on the underlying stream as the end of the token to be matched. + + Args: + regex: re.Pattern + + Returns: + The read bytes. + """ + name = b"" + while True: + tok = stream.read(16) + if not tok: + return name + m = regex.search(name + tok) + if m is not None: + stream.seek(m.start() - (len(name) + len(tok)), 1) + name = (name + tok)[: m.start()] + break + name += tok + return name + + +def read_block_backwards(stream: StreamType, to_read: int) -> bytes: + """ + Given a stream at position X, read a block of size to_read ending at position X. + + This changes the stream's position to the beginning of where the block was + read. + + Args: + stream: + to_read: + + Returns: + The data which was read. + """ + if stream.tell() < to_read: + raise PdfStreamError("Could not read malformed PDF file") + # Seek to the start of the block we want to read. + stream.seek(-to_read, SEEK_CUR) + read = stream.read(to_read) + # Seek to the start of the block we read after reading it. + stream.seek(-to_read, SEEK_CUR) + return read + + +def read_previous_line(stream: StreamType) -> bytes: + """ + Given a byte stream with current position X, return the previous line. + + All characters between the first CR/LF byte found before X + (or, the start of the file, if no such byte is found) and position X + After this call, the stream will be positioned one byte after the + first non-CRLF character found beyond the first CR/LF byte before X, + or, if no such byte is found, at the beginning of the stream. + + Args: + stream: StreamType: + + Returns: + The data which was read. + """ + line_content = [] + found_crlf = False + if stream.tell() == 0: + raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY) + while True: + to_read = min(DEFAULT_BUFFER_SIZE, stream.tell()) + if to_read == 0: + break + # Read the block. After this, our stream will be one + # beyond the initial position. + block = read_block_backwards(stream, to_read) + idx = len(block) - 1 + if not found_crlf: + # We haven't found our first CR/LF yet. + # Read off characters until we hit one. + while idx >= 0 and block[idx] not in b"\r\n": + idx -= 1 + if idx >= 0: + found_crlf = True + if found_crlf: + # We found our first CR/LF already (on this block or + # a previous one). + # Our combined line is the remainder of the block + # plus any previously read blocks. + line_content.append(block[idx + 1 :]) + # Continue to read off any more CRLF characters. + while idx >= 0 and block[idx] in b"\r\n": + idx -= 1 + else: + # Didn't find CR/LF yet - add this block to our + # previously read blocks and continue. + line_content.append(block) + if idx >= 0: + # We found the next non-CRLF character. + # Set the stream position correctly, then break + stream.seek(idx + 1, SEEK_CUR) + break + # Join all the blocks in the line (which are in reverse order) + return b"".join(line_content[::-1]) + + +def matrix_multiply( + a: TransformationMatrixType, b: TransformationMatrixType +) -> TransformationMatrixType: + return tuple( # type: ignore[return-value] + tuple(sum(float(i) * float(j) for i, j in zip(row, col)) for col in zip(*b)) + for row in a + ) + + +def mark_location(stream: StreamType) -> None: + """Create text file showing current location in context.""" + # Mainly for debugging + radius = 5000 + stream.seek(-radius, 1) + with open("pypdf_pdfLocation.txt", "wb") as output_fh: + output_fh.write(stream.read(radius)) + output_fh.write(b"HERE") + output_fh.write(stream.read(radius)) + stream.seek(-radius, 1) + + +B_CACHE: Dict[Union[str, bytes], bytes] = {} + + +def b_(s: Union[str, bytes]) -> bytes: + if isinstance(s, bytes): + return s + bc = B_CACHE + if s in bc: + return bc[s] + try: + r = s.encode("latin-1") + if len(s) < 2: + bc[s] = r + return r + except Exception: + r = s.encode("utf-8") + if len(s) < 2: + bc[s] = r + return r + + +def str_(b: Any) -> str: + if isinstance(b, bytes): + return b.decode("latin-1") + else: + return str(b) # will return b.__str__() if defined + + +@overload +def ord_(b: str) -> int: + ... + + +@overload +def ord_(b: bytes) -> bytes: + ... + + +@overload +def ord_(b: int) -> int: + ... + + +def ord_(b: Union[int, str, bytes]) -> Union[int, bytes]: + if isinstance(b, str): + return ord(b) + return b + + +WHITESPACES = (b" ", b"\n", b"\r", b"\t", b"\x00") +WHITESPACES_AS_BYTES = b"".join(WHITESPACES) +WHITESPACES_AS_REGEXP = b"[" + WHITESPACES_AS_BYTES + b"]" + + +def paeth_predictor(left: int, up: int, up_left: int) -> int: + p = left + up - up_left + dist_left = abs(p - left) + dist_up = abs(p - up) + dist_up_left = abs(p - up_left) + + if dist_left <= dist_up and dist_left <= dist_up_left: + return left + elif dist_up <= dist_up_left: + return up + else: + return up_left + + +def deprecate(msg: str, stacklevel: int = 3) -> None: + warnings.warn(msg, DeprecationWarning, stacklevel=stacklevel) + + +def deprecation(msg: str) -> None: + raise DeprecationError(msg) + + +def deprecate_with_replacement(old_name: str, new_name: str, removed_in: str) -> None: + """Raise an exception that a feature will be removed, but has a replacement.""" + deprecate(f"{old_name} is deprecated and will be removed in pypdf {removed_in}. Use {new_name} instead.", 4) + + +def deprecation_with_replacement(old_name: str, new_name: str, removed_in: str) -> None: + """Raise an exception that a feature was already removed, but has a replacement.""" + deprecation(f"{old_name} is deprecated and was removed in pypdf {removed_in}. Use {new_name} instead.") + + +def deprecate_no_replacement(name: str, removed_in: str) -> None: + """Raise an exception that a feature will be removed without replacement.""" + deprecate(f"{name} is deprecated and will be removed in pypdf {removed_in}.", 4) + + +def deprecation_no_replacement(name: str, removed_in: str) -> None: + """Raise an exception that a feature was already removed without replacement.""" + deprecation(f"{name} is deprecated and was removed in pypdf {removed_in}.") + + +def logger_error(msg: str, src: str) -> None: + """ + Use this instead of logger.error directly. + + That allows people to overwrite it more easily. + + See the docs on when to use which: + https://pypdf.readthedocs.io/en/latest/user/suppress-warnings.html + """ + logging.getLogger(src).error(msg) + + +def logger_warning(msg: str, src: str) -> None: + """ + Use this instead of logger.warning directly. + + That allows people to overwrite it more easily. + + ## Exception, warnings.warn, logger_warning + - Exceptions should be used if the user should write code that deals with + an error case, e.g. the PDF being completely broken. + - warnings.warn should be used if the user needs to fix their code, e.g. + DeprecationWarnings + - logger_warning should be used if the user needs to know that an issue was + handled by pypdf, e.g. a non-compliant PDF being read in a way that + pypdf could apply a robustness fix to still read it. This applies mainly + to strict=False mode. + """ + logging.getLogger(src).warning(msg) + + +def rename_kwargs( + func_name: str, kwargs: Dict[str, Any], aliases: Dict[str, str], fail: bool = False +) -> None: + """ + Helper function to deprecate arguments. + + Args: + func_name: Name of the function to be deprecated + kwargs: + aliases: + fail: + """ + for old_term, new_term in aliases.items(): + if old_term in kwargs: + if fail: + raise DeprecationError( + f"{old_term} is deprecated as an argument. Use {new_term} instead" + ) + if new_term in kwargs: + raise TypeError( + f"{func_name} received both {old_term} and {new_term} as " + f"an argument. {old_term} is deprecated. " + f"Use {new_term} instead." + ) + kwargs[new_term] = kwargs.pop(old_term) + warnings.warn( + message=( + f"{old_term} is deprecated as an argument. Use {new_term} instead" + ), + category=DeprecationWarning, + ) + + +def _human_readable_bytes(bytes: int) -> str: + if bytes < 10**3: + return f"{bytes} Byte" + elif bytes < 10**6: + return f"{bytes / 10**3:.1f} kB" + elif bytes < 10**9: + return f"{bytes / 10**6:.1f} MB" + else: + return f"{bytes / 10**9:.1f} GB" + + +# The following class has been copied from Django: +# https://github.com/django/django/blob/adae619426b6f50046b3daaa744db52989c9d6db/django/utils/functional.py#L51-L65 +# +# Original license: +# +# --------------------------------------------------------------------------------- +# Copyright (c) Django Software Foundation and individual contributors. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in the +# documentation and/or other materials provided with the distribution. +# +# 3. Neither the name of Django nor the names of its contributors may be used +# to endorse or promote products derived from this software without +# specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# --------------------------------------------------------------------------------- +class classproperty: # noqa: N801 + """ + Decorator that converts a method with a single cls argument into a property + that can be accessed directly from the class. + """ + + def __init__(self, method=None): # type: ignore # noqa: ANN001 + self.fget = method + + def __get__(self, instance, cls=None) -> Any: # type: ignore # noqa: ANN001 + return self.fget(cls) + + def getter(self, method): # type: ignore # noqa: ANN001, ANN202 + self.fget = method + return self + + +@dataclass +class File: + from .generic import IndirectObject + + name: str + data: bytes + image: Optional[Any] = None # optional ; direct image access + indirect_reference: Optional[IndirectObject] = None # optional ; link to PdfObject + + def __str__(self) -> str: + return f"{self.__class__.__name__}(name={self.name}, data: {_human_readable_bytes(len(self.data))})" + + def __repr__(self) -> str: + return self.__str__()[:-1] + f", hash: {hash(self.data)})" + + +@dataclass +class ImageFile(File): + from .generic import IndirectObject + + image: Optional[Any] = None # optional ; direct PIL image access + indirect_reference: Optional[IndirectObject] = None # optional ; link to PdfObject + + def replace(self, new_image: Any, **kwargs: Any) -> None: + """ + Replace the Image with a new PIL image. + + Args: + new_image (PIL.Image.Image): The new PIL image to replace the existing image. + **kwargs: Additional keyword arguments to pass to `Image.Image.save()`. + + Raises: + TypeError: If the image is inline or in a PdfReader. + TypeError: If the image does not belong to a PdfWriter. + TypeError: If `new_image` is not a PIL Image. + + Note: + This method replaces the existing image with a new image. + It is not allowed for inline images or images within a PdfReader. + The `kwargs` parameter allows passing additional parameters + to `Image.Image.save()`, such as quality. + """ + from PIL import Image + + from ._reader import PdfReader + + # to prevent circular import + from .filters import _xobj_to_image + from .generic import DictionaryObject, PdfObject + + if self.indirect_reference is None: + raise TypeError("Can not update an inline image") + if not hasattr(self.indirect_reference.pdf, "_id_translated"): + raise TypeError("Can not update an image not belonging to a PdfWriter") + if not isinstance(new_image, Image.Image): + raise TypeError("new_image shall be a PIL Image") + b = BytesIO() + new_image.save(b, "PDF", **kwargs) + reader = PdfReader(b) + assert reader.pages[0].images[0].indirect_reference is not None + self.indirect_reference.pdf._objects[self.indirect_reference.idnum - 1] = ( + reader.pages[0].images[0].indirect_reference.get_object() + ) + cast( + PdfObject, self.indirect_reference.get_object() + ).indirect_reference = self.indirect_reference + # change the object attributes + extension, byte_stream, img = _xobj_to_image( + cast(DictionaryObject, self.indirect_reference.get_object()) + ) + assert extension is not None + self.name = self.name[: self.name.rfind(".")] + extension + self.data = byte_stream + self.image = img + + +@functools.total_ordering +class Version: + COMPONENT_PATTERN = re.compile(r"^(\d+)(.*)$") + + def __init__(self, version_str: str) -> None: + self.version_str = version_str + self.components = self._parse_version(version_str) + + def _parse_version(self, version_str: str) -> List[Tuple[int, str]]: + components = version_str.split(".") + parsed_components = [] + for component in components: + match = Version.COMPONENT_PATTERN.match(component) + if not match: + parsed_components.append((0, component)) + continue + integer_prefix = match.group(1) + suffix = match.group(2) + if integer_prefix is None: + integer_prefix = 0 + parsed_components.append((int(integer_prefix), suffix)) + return parsed_components + + def __eq__(self, other: object) -> bool: + if not isinstance(other, Version): + return False + return self.components == other.components + + def __lt__(self, other: Any) -> bool: + if not isinstance(other, Version): + raise ValueError(f"Version cannot be compared against {type(other)}") + min_len = min(len(self.components), len(other.components)) + for i in range(min_len): + self_value, self_suffix = self.components[i] + other_value, other_suffix = other.components[i] + + if self_value < other_value: + return True + elif self_value > other_value: + return False + + if self_suffix < other_suffix: + return True + elif self_suffix > other_suffix: + return False + + return len(self.components) < len(other.components) |