# Copyright (c) 2006, Mathieu Fenniak # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are # met: # # * Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # * The name of the author may not be used to endorse or promote products # derived from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. """ Implementation of stream filters for PDF. See TABLE H.1 Abbreviations for standard filter names """ __author__ = "Mathieu Fenniak" __author_email__ = "biziqe@mathieu.fenniak.net" import math import struct import zlib from io import BytesIO from typing import Any, Dict, Optional, Tuple, Union, cast from .generic import ArrayObject, DictionaryObject, IndirectObject, NameObject try: from typing import Literal # type: ignore[attr-defined] except ImportError: # PEP 586 introduced typing.Literal with Python 3.8 # For older Python versions, the backport typing_extensions is necessary: from typing_extensions import Literal # type: ignore[misc] from ._utils import b_, deprecate_with_replacement, ord_, paeth_predictor from .constants import CcittFaxDecodeParameters as CCITT from .constants import ColorSpaces from .constants import FilterTypeAbbreviations as FTA from .constants import FilterTypes as FT from .constants import GraphicsStateParameters as G from .constants import ImageAttributes as IA from .constants import LzwFilterParameters as LZW from .constants import StreamAttributes as SA from .errors import PdfReadError, PdfStreamError def decompress(data: bytes) -> bytes: try: return zlib.decompress(data) except zlib.error: d = zlib.decompressobj(zlib.MAX_WBITS | 32) result_str = b"" for b in [data[i : i + 1] for i in range(len(data))]: try: result_str += d.decompress(b) except zlib.error: pass return result_str class FlateDecode: @staticmethod def decode( data: bytes, decode_parms: Union[None, ArrayObject, DictionaryObject] = None, **kwargs: Any, ) -> bytes: """ Decode data which is flate-encoded. :param data: flate-encoded data. :param decode_parms: a dictionary of values, understanding the "/Predictor": key only :return: the flate-decoded data. :raises PdfReadError: """ if "decodeParms" in kwargs: # pragma: no cover deprecate_with_replacement("decodeParms", "parameters", "4.0.0") decode_parms = kwargs["decodeParms"] str_data = decompress(data) predictor = 1 if decode_parms: try: if isinstance(decode_parms, ArrayObject): for decode_parm in decode_parms: if "/Predictor" in decode_parm: predictor = decode_parm["/Predictor"] else: predictor = decode_parms.get("/Predictor", 1) except (AttributeError, TypeError): # Type Error is NullObject pass # Usually an array with a null object was read # predictor 1 == no predictor if predictor != 1: # The /Columns param. has 1 as the default value; see ISO 32000, # ยง7.4.4.3 LZWDecode and FlateDecode Parameters, Table 8 DEFAULT_BITS_PER_COMPONENT = 8 if isinstance(decode_parms, ArrayObject): columns = 1 bits_per_component = DEFAULT_BITS_PER_COMPONENT for decode_parm in decode_parms: if "/Columns" in decode_parm: columns = decode_parm["/Columns"] if LZW.BITS_PER_COMPONENT in decode_parm: bits_per_component = decode_parm[LZW.BITS_PER_COMPONENT] else: columns = ( 1 if decode_parms is None else decode_parms.get(LZW.COLUMNS, 1) ) bits_per_component = ( decode_parms.get(LZW.BITS_PER_COMPONENT, DEFAULT_BITS_PER_COMPONENT) if decode_parms else DEFAULT_BITS_PER_COMPONENT ) # PNG predictor can vary by row and so is the lead byte on each row rowlength = ( math.ceil(columns * bits_per_component / 8) + 1 ) # number of bytes # PNG prediction: if 10 <= predictor <= 15: str_data = FlateDecode._decode_png_prediction(str_data, columns, rowlength) # type: ignore else: # unsupported predictor raise PdfReadError(f"Unsupported flatedecode predictor {predictor!r}") return str_data @staticmethod def _decode_png_prediction(data: str, columns: int, rowlength: int) -> bytes: output = BytesIO() # PNG prediction can vary from row to row if len(data) % rowlength != 0: raise PdfReadError("Image data is not rectangular") prev_rowdata = (0,) * rowlength for row in range(len(data) // rowlength): rowdata = [ ord_(x) for x in data[(row * rowlength) : ((row + 1) * rowlength)] ] filter_byte = rowdata[0] if filter_byte == 0: pass elif filter_byte == 1: for i in range(2, rowlength): rowdata[i] = (rowdata[i] + rowdata[i - 1]) % 256 elif filter_byte == 2: for i in range(1, rowlength): rowdata[i] = (rowdata[i] + prev_rowdata[i]) % 256 elif filter_byte == 3: for i in range(1, rowlength): left = rowdata[i - 1] if i > 1 else 0 floor = math.floor(left + prev_rowdata[i]) / 2 rowdata[i] = (rowdata[i] + int(floor)) % 256 elif filter_byte == 4: for i in range(1, rowlength): left = rowdata[i - 1] if i > 1 else 0 up = prev_rowdata[i] up_left = prev_rowdata[i - 1] if i > 1 else 0 paeth = paeth_predictor(left, up, up_left) rowdata[i] = (rowdata[i] + paeth) % 256 else: # unsupported PNG filter raise PdfReadError(f"Unsupported PNG filter {filter_byte!r}") prev_rowdata = tuple(rowdata) output.write(bytearray(rowdata[1:])) return output.getvalue() @staticmethod def encode(data: bytes) -> bytes: return zlib.compress(data) class ASCIIHexDecode: """ The ASCIIHexDecode filter decodes data that has been encoded in ASCII hexadecimal form into a base-7 ASCII format. """ @staticmethod def decode( data: str, decode_parms: Union[None, ArrayObject, DictionaryObject] = None, # noqa: F841 **kwargs: Any, ) -> str: """ :param data: a str sequence of hexadecimal-encoded values to be converted into a base-7 ASCII string :param decode_parms: :return: a string conversion in base-7 ASCII, where each of its values v is such that 0 <= ord(v) <= 127. :raises PdfStreamError: """ if "decodeParms" in kwargs: # pragma: no cover deprecate_with_replacement("decodeParms", "parameters", "4.0.0") decode_parms = kwargs["decodeParms"] # noqa: F841 retval = "" hex_pair = "" index = 0 while True: if index >= len(data): raise PdfStreamError("Unexpected EOD in ASCIIHexDecode") char = data[index] if char == ">": break elif char.isspace(): index += 1 continue hex_pair += char if len(hex_pair) == 2: retval += chr(int(hex_pair, base=16)) hex_pair = "" index += 1 assert hex_pair == "" return retval class LZWDecode: """Taken from: http://www.java2s.com/Open-Source/Java-Document/PDF/PDF-Renderer/com/sun/pdfview/decode/LZWDecode.java.htm """ class Decoder: def __init__(self, data: bytes) -> None: self.STOP = 257 self.CLEARDICT = 256 self.data = data self.bytepos = 0 self.bitpos = 0 self.dict = [""] * 4096 for i in range(256): self.dict[i] = chr(i) self.reset_dict() def reset_dict(self) -> None: self.dictlen = 258 self.bitspercode = 9 def next_code(self) -> int: fillbits = self.bitspercode value = 0 while fillbits > 0: if self.bytepos >= len(self.data): return -1 nextbits = ord_(self.data[self.bytepos]) bitsfromhere = 8 - self.bitpos bitsfromhere = min(bitsfromhere, fillbits) value |= ( (nextbits >> (8 - self.bitpos - bitsfromhere)) & (0xFF >> (8 - bitsfromhere)) ) << (fillbits - bitsfromhere) fillbits -= bitsfromhere self.bitpos += bitsfromhere if self.bitpos >= 8: self.bitpos = 0 self.bytepos = self.bytepos + 1 return value def decode(self) -> str: """ TIFF 6.0 specification explains in sufficient details the steps to implement the LZW encode() and decode() algorithms. algorithm derived from: http://www.rasip.fer.hr/research/compress/algorithms/fund/lz/lzw.html and the PDFReference :raises PdfReadError: If the stop code is missing """ cW = self.CLEARDICT baos = "" while True: pW = cW cW = self.next_code() if cW == -1: raise PdfReadError("Missed the stop code in LZWDecode!") if cW == self.STOP: break elif cW == self.CLEARDICT: self.reset_dict() elif pW == self.CLEARDICT: baos += self.dict[cW] else: if cW < self.dictlen: baos += self.dict[cW] p = self.dict[pW] + self.dict[cW][0] self.dict[self.dictlen] = p self.dictlen += 1 else: p = self.dict[pW] + self.dict[pW][0] baos += p self.dict[self.dictlen] = p self.dictlen += 1 if ( self.dictlen >= (1 << self.bitspercode) - 1 and self.bitspercode < 12 ): self.bitspercode += 1 return baos @staticmethod def decode( data: bytes, decode_parms: Union[None, ArrayObject, DictionaryObject] = None, **kwargs: Any, ) -> str: """ :param data: ``bytes`` or ``str`` text to decode. :param decode_parms: a dictionary of parameter values. :return: decoded data. """ if "decodeParms" in kwargs: # pragma: no cover deprecate_with_replacement("decodeParms", "parameters", "4.0.0") decode_parms = kwargs["decodeParms"] # noqa: F841 return LZWDecode.Decoder(data).decode() class ASCII85Decode: """Decodes string ASCII85-encoded data into a byte format.""" @staticmethod def decode( data: Union[str, bytes], decode_parms: Union[None, ArrayObject, DictionaryObject] = None, **kwargs: Any, ) -> bytes: if "decodeParms" in kwargs: # pragma: no cover deprecate_with_replacement("decodeParms", "parameters", "4.0.0") decode_parms = kwargs["decodeParms"] # noqa: F841 if isinstance(data, str): data = data.encode("ascii") group_index = b = 0 out = bytearray() for char in data: if ord("!") <= char and char <= ord("u"): group_index += 1 b = b * 85 + (char - 33) if group_index == 5: out += struct.pack(b">L", b) group_index = b = 0 elif char == ord("z"): assert group_index == 0 out += b"\0\0\0\0" elif char == ord("~"): if group_index: for _ in range(5 - group_index): b = b * 85 + 84 out += struct.pack(b">L", b)[: group_index - 1] break return bytes(out) class DCTDecode: @staticmethod def decode( data: bytes, decode_parms: Union[None, ArrayObject, DictionaryObject] = None, **kwargs: Any, ) -> bytes: if "decodeParms" in kwargs: # pragma: no cover deprecate_with_replacement("decodeParms", "parameters", "4.0.0") decode_parms = kwargs["decodeParms"] # noqa: F841 return data class JPXDecode: @staticmethod def decode( data: bytes, decode_parms: Union[None, ArrayObject, DictionaryObject] = None, **kwargs: Any, ) -> bytes: if "decodeParms" in kwargs: # pragma: no cover deprecate_with_replacement("decodeParms", "parameters", "4.0.0") decode_parms = kwargs["decodeParms"] # noqa: F841 return data class CCITParameters: """TABLE 3.9 Optional parameters for the CCITTFaxDecode filter.""" def __init__(self, K: int = 0, columns: int = 0, rows: int = 0) -> None: self.K = K self.EndOfBlock = None self.EndOfLine = None self.EncodedByteAlign = None self.columns = columns # width self.rows = rows # height self.DamagedRowsBeforeError = None @property def group(self) -> int: if self.K < 0: CCITTgroup = 4 else: # k == 0: Pure one-dimensional encoding (Group 3, 1-D) # k > 0: Mixed one- and two-dimensional encoding (Group 3, 2-D) CCITTgroup = 3 return CCITTgroup class CCITTFaxDecode: """ See 3.3.5 CCITTFaxDecode Filter (PDF 1.7 Standard). Either Group 3 or Group 4 CCITT facsimile (fax) encoding. CCITT encoding is bit-oriented, not byte-oriented. See: TABLE 3.9 Optional parameters for the CCITTFaxDecode filter """ @staticmethod def _get_parameters( parameters: Union[None, ArrayObject, DictionaryObject], rows: int ) -> CCITParameters: # TABLE 3.9 Optional parameters for the CCITTFaxDecode filter k = 0 columns = 1728 if parameters: if isinstance(parameters, ArrayObject): for decode_parm in parameters: if CCITT.COLUMNS in decode_parm: columns = decode_parm[CCITT.COLUMNS] if CCITT.K in decode_parm: k = decode_parm[CCITT.K] else: if CCITT.COLUMNS in parameters: columns = parameters[CCITT.COLUMNS] # type: ignore if CCITT.K in parameters: k = parameters[CCITT.K] # type: ignore return CCITParameters(k, columns, rows) @staticmethod def decode( data: bytes, decode_parms: Union[None, ArrayObject, DictionaryObject] = None, height: int = 0, **kwargs: Any, ) -> bytes: if "decodeParms" in kwargs: # pragma: no cover deprecate_with_replacement("decodeParms", "parameters", "4.0.0") decode_parms = kwargs["decodeParms"] parms = CCITTFaxDecode._get_parameters(decode_parms, height) img_size = len(data) tiff_header_struct = "<2shlh" + "hhll" * 8 + "h" tiff_header = struct.pack( tiff_header_struct, b"II", # Byte order indication: Little endian 42, # Version number (always 42) 8, # Offset to first IFD 8, # Number of tags in IFD 256, 4, 1, parms.columns, # ImageWidth, LONG, 1, width 257, 4, 1, parms.rows, # ImageLength, LONG, 1, length 258, 3, 1, 1, # BitsPerSample, SHORT, 1, 1 259, 3, 1, parms.group, # Compression, SHORT, 1, 4 = CCITT Group 4 fax encoding 262, 3, 1, 0, # Thresholding, SHORT, 1, 0 = WhiteIsZero 273, 4, 1, struct.calcsize( tiff_header_struct ), # StripOffsets, LONG, 1, length of header 278, 4, 1, parms.rows, # RowsPerStrip, LONG, 1, length 279, 4, 1, img_size, # StripByteCounts, LONG, 1, size of image 0, # last IFD ) return tiff_header + data def decode_stream_data(stream: Any) -> Union[str, bytes]: # utils.StreamObject filters = stream.get(SA.FILTER, ()) if isinstance(filters, IndirectObject): filters = cast(ArrayObject, filters.get_object()) if len(filters) and not isinstance(filters[0], NameObject): # we have a single filter instance filters = (filters,) data: bytes = stream._data # If there is not data to decode we should not try to decode the data. if data: for filter_type in filters: if filter_type in (FT.FLATE_DECODE, FTA.FL): data = FlateDecode.decode(data, stream.get(SA.DECODE_PARMS)) elif filter_type in (FT.ASCII_HEX_DECODE, FTA.AHx): data = ASCIIHexDecode.decode(data) # type: ignore elif filter_type in (FT.LZW_DECODE, FTA.LZW): data = LZWDecode.decode(data, stream.get(SA.DECODE_PARMS)) # type: ignore elif filter_type in (FT.ASCII_85_DECODE, FTA.A85): data = ASCII85Decode.decode(data) elif filter_type == FT.DCT_DECODE: data = DCTDecode.decode(data) elif filter_type == "/JPXDecode": data = JPXDecode.decode(data) elif filter_type == FT.CCITT_FAX_DECODE: height = stream.get(IA.HEIGHT, ()) data = CCITTFaxDecode.decode(data, stream.get(SA.DECODE_PARMS), height) elif filter_type == "/Crypt": decode_parms = stream.get(SA.DECODE_PARMS, {}) if "/Name" not in decode_parms and "/Type" not in decode_parms: pass else: raise NotImplementedError( "/Crypt filter with /Name or /Type not supported yet" ) else: # Unsupported filter raise NotImplementedError(f"unsupported filter {filter_type}") return data def decodeStreamData(stream: Any) -> Union[str, bytes]: # pragma: no cover deprecate_with_replacement("decodeStreamData", "decode_stream_data", "4.0.0") return decode_stream_data(stream) def _xobj_to_image(x_object_obj: Dict[str, Any]) -> Tuple[Optional[str], bytes]: """ Users need to have the pillow package installed. It's unclear if PyPDF2 will keep this function here, hence it's private. It might get removed at any point. :return: Tuple[file extension, bytes] """ try: from PIL import Image except ImportError: raise ImportError( "pillow is required to do image extraction. " "It can be installed via 'pip install PyPDF2[image]'" ) size = (x_object_obj[IA.WIDTH], x_object_obj[IA.HEIGHT]) data = x_object_obj.get_data() # type: ignore if ( IA.COLOR_SPACE in x_object_obj and x_object_obj[IA.COLOR_SPACE] == ColorSpaces.DEVICE_RGB ): # https://pillow.readthedocs.io/en/stable/handbook/concepts.html#modes mode: Literal["RGB", "P"] = "RGB" else: mode = "P" extension = None if SA.FILTER in x_object_obj: if x_object_obj[SA.FILTER] == FT.FLATE_DECODE: extension = ".png" # mime_type = "image/png" color_space = None if "/ColorSpace" in x_object_obj: color_space = x_object_obj["/ColorSpace"].get_object() if ( isinstance(color_space, ArrayObject) and color_space[0] == "/Indexed" ): color_space, base, hival, lookup = ( value.get_object() for value in color_space ) img = Image.frombytes(mode, size, data) if color_space == "/Indexed": from .generic import ByteStringObject if isinstance(lookup, ByteStringObject): if base == ColorSpaces.DEVICE_GRAY and len(lookup) == hival + 1: lookup = b"".join( [lookup[i : i + 1] * 3 for i in range(len(lookup))] ) img.putpalette(lookup) else: img.putpalette(lookup.get_data()) img = img.convert("L" if base == ColorSpaces.DEVICE_GRAY else "RGB") if G.S_MASK in x_object_obj: # add alpha channel alpha = Image.frombytes("L", size, x_object_obj[G.S_MASK].get_data()) img.putalpha(alpha) img_byte_arr = BytesIO() img.save(img_byte_arr, format="PNG") data = img_byte_arr.getvalue() elif x_object_obj[SA.FILTER] in ( [FT.LZW_DECODE], [FT.ASCII_85_DECODE], [FT.CCITT_FAX_DECODE], ): # I'm not sure if the following logic is correct. # There might not be any relationship between the filters and the # extension if x_object_obj[SA.FILTER] in [[FT.LZW_DECODE], [FT.CCITT_FAX_DECODE]]: extension = ".tiff" # mime_type = "image/tiff" else: extension = ".png" # mime_type = "image/png" data = b_(data) elif x_object_obj[SA.FILTER] == FT.DCT_DECODE: extension = ".jpg" # mime_type = "image/jpeg" elif x_object_obj[SA.FILTER] == "/JPXDecode": extension = ".jp2" # mime_type = "image/x-jp2" elif x_object_obj[SA.FILTER] == FT.CCITT_FAX_DECODE: extension = ".tiff" # mime_type = "image/tiff" else: extension = ".png" # mime_type = "image/png" img = Image.frombytes(mode, size, data) img_byte_arr = BytesIO() img.save(img_byte_arr, format="PNG") data = img_byte_arr.getvalue() return extension, data