aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py')
-rw-r--r--.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py228
1 files changed, 228 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py
new file mode 100644
index 00000000..bf97faa3
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py
@@ -0,0 +1,228 @@
+from __future__ import annotations
+
+import logging
+from typing import Union
+
+from requests_toolbelt.multipart.decoder import MultipartDecoder
+
+from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME
+from unstructured_client.models import shared
+
+logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME)
+FormData = dict[str, Union[str, shared.Files, list[str]]]
+
+PARTITION_FORM_FILES_KEY = "files"
+PARTITION_FORM_SPLIT_PDF_PAGE_KEY = "split_pdf_page"
+PARTITION_FORM_PAGE_RANGE_KEY = "split_pdf_page_range[]"
+PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY = "split_pdf_allow_failed"
+PARTITION_FORM_STARTING_PAGE_NUMBER_KEY = "starting_page_number"
+PARTITION_FORM_CONCURRENCY_LEVEL_KEY = "split_pdf_concurrency_level"
+
+
+def get_page_range(form_data: FormData, key: str, max_pages: int) -> tuple[int, int]:
+ """Retrieves the split page range from the given form data.
+
+ If the range is invalid or outside the bounds of the page count,
+ returns (1, num_pages), i.e. the full range.
+
+ Args:
+ form_data: The form data containing the page range
+ key: The key to look for in the form data.
+
+ Returns:
+ The range of pages to send in the request in the form (start, end)
+ """
+ try:
+ _page_range = form_data.get(key)
+
+ if _page_range is not None:
+ page_range = (int(_page_range[0]), int(_page_range[1]))
+ else:
+ page_range = (1, max_pages)
+
+ except (ValueError, IndexError) as exc:
+ msg = f"{_page_range} is not a valid page range."
+ logger.error(msg)
+ raise ValueError(msg) from exc
+
+ start, end = page_range
+
+ if not 0 < start <= max_pages or not 0 < end <= max_pages or not start <= end:
+ msg = f"Page range {page_range} is out of bounds. Start and end values should be between 1 and {max_pages}."
+ logger.error(msg)
+ raise ValueError(msg)
+
+ return page_range
+
+
+def get_starting_page_number(form_data: FormData, key: str, fallback_value: int) -> int:
+ """Retrieves the starting page number from the given form data.
+
+ In case given starting page number is not a valid integer or less than 1, it will
+ use the default value.
+
+ Args:
+ form_data: The form data containing the starting page number.
+ key: The key to look for in the form data.
+ fallback_value: The default value to use in case of an error.
+
+ Returns:
+ The starting page number.
+ """
+ starting_page_number = fallback_value
+ try:
+ _starting_page_number = form_data.get(key) or fallback_value
+ starting_page_number = int(_starting_page_number) # type: ignore
+ except ValueError:
+ logger.warning(
+ "'%s' is not a valid integer. Using default value '%d'.",
+ key,
+ fallback_value,
+ )
+
+ if starting_page_number < 1:
+ logger.warning(
+ "'%s' is less than 1. Using default value '%d'.",
+ key,
+ fallback_value,
+ )
+ starting_page_number = fallback_value
+
+ return starting_page_number
+
+def get_split_pdf_allow_failed_param(
+ form_data: FormData, key: str, fallback_value: bool,
+) -> bool:
+ """Retrieves the value for allow failed that should be used for splitting pdf.
+
+ In case given the number is not a "false" or "true" literal, it will use the
+ default value.
+
+ Args:
+ form_data: The form data containing the desired concurrency level.
+ key: The key to look for in the form data.
+ fallback_value: The default value to use in case of an error.
+
+ Returns:
+ The concurrency level after validation.
+ """
+ allow_failed = form_data.get(key)
+
+ if allow_failed is None:
+ return fallback_value
+
+ if allow_failed.lower() not in ["true", "false"]:
+ logger.warning(
+ "'%s' is not a valid boolean. Using default value '%s'.",
+ key,
+ fallback_value,
+ )
+ return fallback_value
+
+ return allow_failed.lower() == "true"
+
+def get_split_pdf_concurrency_level_param(
+ form_data: FormData, key: str, fallback_value: int, max_allowed: int
+) -> int:
+ """Retrieves the value for concurreny level that should be used for splitting pdf.
+
+ In case given the number is not a valid integer or less than 1, it will use the
+ default value.
+
+ Args:
+ form_data: The form data containing the desired concurrency level.
+ key: The key to look for in the form data.
+ fallback_value: The default value to use in case of an error.
+ max_allowed: The maximum allowed value for the concurrency level.
+
+ Returns:
+ The concurrency level after validation.
+ """
+ concurrency_level_str = form_data.get(key)
+
+ if concurrency_level_str is None:
+ return fallback_value
+
+ try:
+ concurrency_level = int(concurrency_level_str)
+ except ValueError:
+ logger.warning(
+ "'%s' is not a valid integer. Using default value '%s'.",
+ key,
+ fallback_value,
+ )
+ return fallback_value
+
+ if concurrency_level < 1:
+ logger.warning(
+ "'%s' is less than 1. Using the default value = %s.",
+ key,
+ fallback_value,
+ )
+ return fallback_value
+
+ if concurrency_level > max_allowed:
+ logger.warning(
+ "'%s' is greater than %s. Using the maximum allowed value = %s.",
+ key,
+ max_allowed,
+ max_allowed,
+ )
+ return max_allowed
+
+ return concurrency_level
+
+
+def decode_content_disposition(content_disposition: bytes) -> dict[str, str]:
+ """Decode the `Content-Disposition` header and return the parameters as a dictionary.
+
+ Args:
+ content_disposition: The `Content-Disposition` header as bytes.
+
+ Returns:
+ A dictionary containing the parameters extracted from the
+ `Content-Disposition` header.
+ """
+ data = content_disposition.decode().split("; ")[1:]
+ parameters = [d.split("=") for d in data]
+ parameters_dict = {p[0]: p[1].strip('"') for p in parameters}
+ return parameters_dict
+
+
+def parse_form_data(decoded_data: MultipartDecoder) -> FormData:
+ """Parses the form data from the decoded multipart data.
+
+ Args:
+ decoded_data: The decoded multipart data.
+
+ Returns:
+ The parsed form data.
+ """
+ form_data: FormData = {}
+
+ for part in decoded_data.parts:
+ content_disposition = part.headers.get(b"Content-Disposition")
+ if content_disposition is None:
+ raise RuntimeError("Content-Disposition header not found. Can't split pdf file.")
+ part_params = decode_content_disposition(content_disposition)
+ name = part_params.get("name")
+
+ if name is None:
+ continue
+
+ if name == PARTITION_FORM_FILES_KEY:
+ filename = part_params.get("filename")
+ if filename is None or not filename.strip():
+ raise ValueError("Filename can't be an empty string.")
+ form_data[PARTITION_FORM_FILES_KEY] = shared.Files(part.content, filename)
+ else:
+ content = part.content.decode()
+ if name in form_data:
+ if isinstance(form_data[name], list):
+ form_data[name].append(content)
+ else:
+ form_data[name] = [form_data[name], content]
+ else:
+ form_data[name] = content
+
+ return form_data