1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
|
from __future__ import annotations
import logging
from typing import Union
from requests_toolbelt.multipart.decoder import MultipartDecoder
from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME
from unstructured_client.models import shared
logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME)
FormData = dict[str, Union[str, shared.Files, list[str]]]
PARTITION_FORM_FILES_KEY = "files"
PARTITION_FORM_SPLIT_PDF_PAGE_KEY = "split_pdf_page"
PARTITION_FORM_PAGE_RANGE_KEY = "split_pdf_page_range[]"
PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY = "split_pdf_allow_failed"
PARTITION_FORM_STARTING_PAGE_NUMBER_KEY = "starting_page_number"
PARTITION_FORM_CONCURRENCY_LEVEL_KEY = "split_pdf_concurrency_level"
def get_page_range(form_data: FormData, key: str, max_pages: int) -> tuple[int, int]:
"""Retrieves the split page range from the given form data.
If the range is invalid or outside the bounds of the page count,
returns (1, num_pages), i.e. the full range.
Args:
form_data: The form data containing the page range
key: The key to look for in the form data.
Returns:
The range of pages to send in the request in the form (start, end)
"""
try:
_page_range = form_data.get(key)
if _page_range is not None:
page_range = (int(_page_range[0]), int(_page_range[1]))
else:
page_range = (1, max_pages)
except (ValueError, IndexError) as exc:
msg = f"{_page_range} is not a valid page range."
logger.error(msg)
raise ValueError(msg) from exc
start, end = page_range
if not 0 < start <= max_pages or not 0 < end <= max_pages or not start <= end:
msg = f"Page range {page_range} is out of bounds. Start and end values should be between 1 and {max_pages}."
logger.error(msg)
raise ValueError(msg)
return page_range
def get_starting_page_number(form_data: FormData, key: str, fallback_value: int) -> int:
"""Retrieves the starting page number from the given form data.
In case given starting page number is not a valid integer or less than 1, it will
use the default value.
Args:
form_data: The form data containing the starting page number.
key: The key to look for in the form data.
fallback_value: The default value to use in case of an error.
Returns:
The starting page number.
"""
starting_page_number = fallback_value
try:
_starting_page_number = form_data.get(key) or fallback_value
starting_page_number = int(_starting_page_number) # type: ignore
except ValueError:
logger.warning(
"'%s' is not a valid integer. Using default value '%d'.",
key,
fallback_value,
)
if starting_page_number < 1:
logger.warning(
"'%s' is less than 1. Using default value '%d'.",
key,
fallback_value,
)
starting_page_number = fallback_value
return starting_page_number
def get_split_pdf_allow_failed_param(
form_data: FormData, key: str, fallback_value: bool,
) -> bool:
"""Retrieves the value for allow failed that should be used for splitting pdf.
In case given the number is not a "false" or "true" literal, it will use the
default value.
Args:
form_data: The form data containing the desired concurrency level.
key: The key to look for in the form data.
fallback_value: The default value to use in case of an error.
Returns:
The concurrency level after validation.
"""
allow_failed = form_data.get(key)
if allow_failed is None:
return fallback_value
if allow_failed.lower() not in ["true", "false"]:
logger.warning(
"'%s' is not a valid boolean. Using default value '%s'.",
key,
fallback_value,
)
return fallback_value
return allow_failed.lower() == "true"
def get_split_pdf_concurrency_level_param(
form_data: FormData, key: str, fallback_value: int, max_allowed: int
) -> int:
"""Retrieves the value for concurreny level that should be used for splitting pdf.
In case given the number is not a valid integer or less than 1, it will use the
default value.
Args:
form_data: The form data containing the desired concurrency level.
key: The key to look for in the form data.
fallback_value: The default value to use in case of an error.
max_allowed: The maximum allowed value for the concurrency level.
Returns:
The concurrency level after validation.
"""
concurrency_level_str = form_data.get(key)
if concurrency_level_str is None:
return fallback_value
try:
concurrency_level = int(concurrency_level_str)
except ValueError:
logger.warning(
"'%s' is not a valid integer. Using default value '%s'.",
key,
fallback_value,
)
return fallback_value
if concurrency_level < 1:
logger.warning(
"'%s' is less than 1. Using the default value = %s.",
key,
fallback_value,
)
return fallback_value
if concurrency_level > max_allowed:
logger.warning(
"'%s' is greater than %s. Using the maximum allowed value = %s.",
key,
max_allowed,
max_allowed,
)
return max_allowed
return concurrency_level
def decode_content_disposition(content_disposition: bytes) -> dict[str, str]:
"""Decode the `Content-Disposition` header and return the parameters as a dictionary.
Args:
content_disposition: The `Content-Disposition` header as bytes.
Returns:
A dictionary containing the parameters extracted from the
`Content-Disposition` header.
"""
data = content_disposition.decode().split("; ")[1:]
parameters = [d.split("=") for d in data]
parameters_dict = {p[0]: p[1].strip('"') for p in parameters}
return parameters_dict
def parse_form_data(decoded_data: MultipartDecoder) -> FormData:
"""Parses the form data from the decoded multipart data.
Args:
decoded_data: The decoded multipart data.
Returns:
The parsed form data.
"""
form_data: FormData = {}
for part in decoded_data.parts:
content_disposition = part.headers.get(b"Content-Disposition")
if content_disposition is None:
raise RuntimeError("Content-Disposition header not found. Can't split pdf file.")
part_params = decode_content_disposition(content_disposition)
name = part_params.get("name")
if name is None:
continue
if name == PARTITION_FORM_FILES_KEY:
filename = part_params.get("filename")
if filename is None or not filename.strip():
raise ValueError("Filename can't be an empty string.")
form_data[PARTITION_FORM_FILES_KEY] = shared.Files(part.content, filename)
else:
content = part.content.decode()
if name in form_data:
if isinstance(form_data[name], list):
form_data[name].append(content)
else:
form_data[name] = [form_data[name], content]
else:
form_data[name] = content
return form_data
|