# Copyright (c) 2024, pypdf contributors
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * The name of the author may not be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import logging
from io import BytesIO
from .._utils import (
WHITESPACES,
StreamType,
read_non_whitespace,
)
from ..errors import PdfReadError
logger = logging.getLogger(__name__)
BUFFER_SIZE = 8192
def extract_inline_AHx(stream: StreamType) -> bytes:
"""
Extract HexEncoded Stream from Inline Image.
the stream will be moved onto the EI
"""
data_out: bytes = b""
# Read data until delimiter > and EI as backup
# ignoring backup.
while True:
data_buffered = read_non_whitespace(stream) + stream.read(BUFFER_SIZE)
if not data_buffered:
raise PdfReadError("Unexpected end of stream")
pos_tok = data_buffered.find(b">")
if pos_tok >= 0: # found >
data_out += data_buffered[: (pos_tok + 1)]
stream.seek(-len(data_buffered) + pos_tok + 1, 1)
break
pos_ei = data_buffered.find(b"EI")
if pos_ei >= 0: # found EI
stream.seek(-len(data_buffered) + pos_ei - 1, 1)
c = stream.read(1)
while c in WHITESPACES:
stream.seek(-2, 1)
c = stream.read(1)
pos_ei -= 1
data_out += data_buffered[:pos_ei]
break
elif len(data_buffered) == 2:
data_out += data_buffered
raise PdfReadError("Unexpected end of stream")
else: # > nor EI found
data_out += data_buffered[:-2]
stream.seek(-2, 1)
ei_tok = read_non_whitespace(stream)
ei_tok += stream.read(2)
stream.seek(-3, 1)
if ei_tok[0:2] != b"EI" or not (ei_tok[2:3] == b"" or ei_tok[2:3] in WHITESPACES):
raise PdfReadError("EI stream not found")
return data_out
def extract_inline_A85(stream: StreamType) -> bytes:
"""
Extract A85 Stream from Inline Image.
the stream will be moved onto the EI
"""
data_out: bytes = b""
# Read data up to delimiter ~>
# see §3.3.2 from PDF ref 1.7
while True:
data_buffered = read_non_whitespace(stream) + stream.read(BUFFER_SIZE)
if not data_buffered:
raise PdfReadError("Unexpected end of stream")
pos_tok = data_buffered.find(b"~>")
if pos_tok >= 0: # found!
data_out += data_buffered[: pos_tok + 2]
stream.seek(-len(data_buffered) + pos_tok + 2, 1)
break
elif len(data_buffered) == 2: # end of buffer
data_out += data_buffered
raise PdfReadError("Unexpected end of stream")
data_out += data_buffered[
:-2
] # back by one char in case of in the middle of ~>
stream.seek(-2, 1)
ei_tok = read_non_whitespace(stream)
ei_tok += stream.read(2)
stream.seek(-3, 1)
if ei_tok[0:2] != b"EI" or not (ei_tok[2:3] == b"" or ei_tok[2:3] in WHITESPACES):
raise PdfReadError("EI stream not found")
return data_out
def extract_inline_RL(stream: StreamType) -> bytes:
"""
Extract RL Stream from Inline Image.
the stream will be moved onto the EI
"""
data_out: bytes = b""
# Read data up to delimiter ~>
# see §3.3.4 from PDF ref 1.7
while True:
data_buffered = stream.read(BUFFER_SIZE)
if not data_buffered:
raise PdfReadError("Unexpected end of stream")
pos_tok = data_buffered.find(b"\x80")
if pos_tok >= 0: # found
data_out += data_buffered[: pos_tok + 1]
stream.seek(-len(data_buffered) + pos_tok + 1, 1)
break
data_out += data_buffered
ei_tok = read_non_whitespace(stream)
ei_tok += stream.read(2)
stream.seek(-3, 1)
if ei_tok[0:2] != b"EI" or not (ei_tok[2:3] == b"" or ei_tok[2:3] in WHITESPACES):
raise PdfReadError("EI stream not found")
return data_out
def extract_inline_DCT(stream: StreamType) -> bytes:
"""
Extract DCT (JPEG) Stream from Inline Image.
the stream will be moved onto the EI
"""
data_out: bytes = b""
# Read Blocks of data (ID/Size/data) up to ID=FF/D9
# see https://www.digicamsoft.com/itu/itu-t81-36.html
notfirst = False
while True:
c = stream.read(1)
if notfirst or (c == b"\xff"):
data_out += c
if c != b"\xff":
continue
else:
notfirst = True
c = stream.read(1)
data_out += c
if c == b"\xff":
stream.seek(-1, 1) # pragma: no cover
elif c == b"\x00": # stuffing
pass
elif c == b"\xd9": # end
break
elif c in (
b"\xc0\xc1\xc2\xc3\xc4\xc5\xc6\xc7\xc9\xca\xcb\xcc\xcd\xce\xcf"
b"\xda\xdb\xdc\xdd\xde\xdf"
b"\xe0\xe1\xe2\xe3\xe4\xe5\xe6\xe7\xe8\xe9\xea\xeb\xec\xed\xee\xef\xfe"
):
c = stream.read(2)
data_out += c
sz = c[0] * 256 + c[1]
data_out += stream.read(sz - 2)
# else: pass
ei_tok = read_non_whitespace(stream)
ei_tok += stream.read(2)
stream.seek(-3, 1)
if ei_tok[0:2] != b"EI" or not (ei_tok[2:3] == b"" or ei_tok[2:3] in WHITESPACES):
raise PdfReadError("EI stream not found")
return data_out
def extract_inline_default(stream: StreamType) -> bytes:
"""
Legacy method
used by default
"""
stream_out = BytesIO()
# Read the inline image, while checking for EI (End Image) operator.
while True:
data_buffered = stream.read(BUFFER_SIZE)
if not data_buffered:
raise PdfReadError("Unexpected end of stream")
pos_ei = data_buffered.find(
b"E"
) # we can not look straight for "EI" because it may not have been loaded in the buffer
if pos_ei == -1:
stream_out.write(data_buffered)
else:
# Write out everything including E (the one from EI to be removed).
stream_out.write(data_buffered[0 : pos_ei + 1])
sav_pos_ei = stream_out.tell() - 1
# Seek back in the stream to read the E next.
stream.seek(pos_ei + 1 - len(data_buffered), 1)
saved_pos = stream.tell()
# Check for End Image
tok2 = stream.read(1) # I of "EI"
if tok2 != b"I":
stream.seek(saved_pos, 0)
continue
tok3 = stream.read(1) # possible space after "EI"
if tok3 not in WHITESPACES:
stream.seek(saved_pos, 0)
continue
while tok3 in WHITESPACES:
tok3 = stream.read(1)
if data_buffered[pos_ei - 1 : pos_ei] not in WHITESPACES and tok3 not in {
b"Q",
b"E",
}: # for Q ou EMC
stream.seek(saved_pos, 0)
continue
# Data contains [\s]EI[\s](Q|EMC): 4 chars are sufficients
# remove E(I) wrongly inserted earlier
stream_out.truncate(sav_pos_ei)
break
return stream_out.getvalue()