diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py | 228 |
1 files changed, 228 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py new file mode 100644 index 00000000..bf97faa3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py @@ -0,0 +1,228 @@ +from __future__ import annotations + +import logging +from typing import Union + +from requests_toolbelt.multipart.decoder import MultipartDecoder + +from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME +from unstructured_client.models import shared + +logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) +FormData = dict[str, Union[str, shared.Files, list[str]]] + +PARTITION_FORM_FILES_KEY = "files" +PARTITION_FORM_SPLIT_PDF_PAGE_KEY = "split_pdf_page" +PARTITION_FORM_PAGE_RANGE_KEY = "split_pdf_page_range[]" +PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY = "split_pdf_allow_failed" +PARTITION_FORM_STARTING_PAGE_NUMBER_KEY = "starting_page_number" +PARTITION_FORM_CONCURRENCY_LEVEL_KEY = "split_pdf_concurrency_level" + + +def get_page_range(form_data: FormData, key: str, max_pages: int) -> tuple[int, int]: + """Retrieves the split page range from the given form data. + + If the range is invalid or outside the bounds of the page count, + returns (1, num_pages), i.e. the full range. + + Args: + form_data: The form data containing the page range + key: The key to look for in the form data. + + Returns: + The range of pages to send in the request in the form (start, end) + """ + try: + _page_range = form_data.get(key) + + if _page_range is not None: + page_range = (int(_page_range[0]), int(_page_range[1])) + else: + page_range = (1, max_pages) + + except (ValueError, IndexError) as exc: + msg = f"{_page_range} is not a valid page range." + logger.error(msg) + raise ValueError(msg) from exc + + start, end = page_range + + if not 0 < start <= max_pages or not 0 < end <= max_pages or not start <= end: + msg = f"Page range {page_range} is out of bounds. Start and end values should be between 1 and {max_pages}." + logger.error(msg) + raise ValueError(msg) + + return page_range + + +def get_starting_page_number(form_data: FormData, key: str, fallback_value: int) -> int: + """Retrieves the starting page number from the given form data. + + In case given starting page number is not a valid integer or less than 1, it will + use the default value. + + Args: + form_data: The form data containing the starting page number. + key: The key to look for in the form data. + fallback_value: The default value to use in case of an error. + + Returns: + The starting page number. + """ + starting_page_number = fallback_value + try: + _starting_page_number = form_data.get(key) or fallback_value + starting_page_number = int(_starting_page_number) # type: ignore + except ValueError: + logger.warning( + "'%s' is not a valid integer. Using default value '%d'.", + key, + fallback_value, + ) + + if starting_page_number < 1: + logger.warning( + "'%s' is less than 1. Using default value '%d'.", + key, + fallback_value, + ) + starting_page_number = fallback_value + + return starting_page_number + +def get_split_pdf_allow_failed_param( + form_data: FormData, key: str, fallback_value: bool, +) -> bool: + """Retrieves the value for allow failed that should be used for splitting pdf. + + In case given the number is not a "false" or "true" literal, it will use the + default value. + + Args: + form_data: The form data containing the desired concurrency level. + key: The key to look for in the form data. + fallback_value: The default value to use in case of an error. + + Returns: + The concurrency level after validation. + """ + allow_failed = form_data.get(key) + + if allow_failed is None: + return fallback_value + + if allow_failed.lower() not in ["true", "false"]: + logger.warning( + "'%s' is not a valid boolean. Using default value '%s'.", + key, + fallback_value, + ) + return fallback_value + + return allow_failed.lower() == "true" + +def get_split_pdf_concurrency_level_param( + form_data: FormData, key: str, fallback_value: int, max_allowed: int +) -> int: + """Retrieves the value for concurreny level that should be used for splitting pdf. + + In case given the number is not a valid integer or less than 1, it will use the + default value. + + Args: + form_data: The form data containing the desired concurrency level. + key: The key to look for in the form data. + fallback_value: The default value to use in case of an error. + max_allowed: The maximum allowed value for the concurrency level. + + Returns: + The concurrency level after validation. + """ + concurrency_level_str = form_data.get(key) + + if concurrency_level_str is None: + return fallback_value + + try: + concurrency_level = int(concurrency_level_str) + except ValueError: + logger.warning( + "'%s' is not a valid integer. Using default value '%s'.", + key, + fallback_value, + ) + return fallback_value + + if concurrency_level < 1: + logger.warning( + "'%s' is less than 1. Using the default value = %s.", + key, + fallback_value, + ) + return fallback_value + + if concurrency_level > max_allowed: + logger.warning( + "'%s' is greater than %s. Using the maximum allowed value = %s.", + key, + max_allowed, + max_allowed, + ) + return max_allowed + + return concurrency_level + + +def decode_content_disposition(content_disposition: bytes) -> dict[str, str]: + """Decode the `Content-Disposition` header and return the parameters as a dictionary. + + Args: + content_disposition: The `Content-Disposition` header as bytes. + + Returns: + A dictionary containing the parameters extracted from the + `Content-Disposition` header. + """ + data = content_disposition.decode().split("; ")[1:] + parameters = [d.split("=") for d in data] + parameters_dict = {p[0]: p[1].strip('"') for p in parameters} + return parameters_dict + + +def parse_form_data(decoded_data: MultipartDecoder) -> FormData: + """Parses the form data from the decoded multipart data. + + Args: + decoded_data: The decoded multipart data. + + Returns: + The parsed form data. + """ + form_data: FormData = {} + + for part in decoded_data.parts: + content_disposition = part.headers.get(b"Content-Disposition") + if content_disposition is None: + raise RuntimeError("Content-Disposition header not found. Can't split pdf file.") + part_params = decode_content_disposition(content_disposition) + name = part_params.get("name") + + if name is None: + continue + + if name == PARTITION_FORM_FILES_KEY: + filename = part_params.get("filename") + if filename is None or not filename.strip(): + raise ValueError("Filename can't be an empty string.") + form_data[PARTITION_FORM_FILES_KEY] = shared.Files(part.content, filename) + else: + content = part.content.decode() + if name in form_data: + if isinstance(form_data[name], list): + form_data[name].append(content) + else: + form_data[name] = [form_data[name], content] + else: + form_data[name] = content + + return form_data |