diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom')
9 files changed, 1095 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/__init__.py b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/__init__.py new file mode 100644 index 00000000..321a6b9c --- /dev/null +++ b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/__init__.py @@ -0,0 +1,5 @@ +from .clean_server_url_hook import CleanServerUrlSDKInitHook +from .logger_hook import LoggerHook +from .suggest_defining_url import SuggestDefiningUrlIf401AfterErrorHook +from .split_pdf_hook import SplitPdfHook +import logging diff --git a/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/clean_server_url_hook.py b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/clean_server_url_hook.py new file mode 100644 index 00000000..5a2d6e3f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/clean_server_url_hook.py @@ -0,0 +1,34 @@ +from typing import Tuple +from urllib.parse import ParseResult, urlparse, urlunparse + +import requests + +from unstructured_client._hooks.types import SDKInitHook + + +class CleanServerUrlSDKInitHook(SDKInitHook): + """Hook fixing common mistakes by users in defining `server_url` in the unstructured-client""" + + def clean_server_url(self, base_url) -> str: + """Fix url scheme and remove the '/general/v0/general' path.""" + + # -- add a url scheme if not present (urllib.parse does not work reliably without it) + if "http" not in base_url: + base_url = "http://" + base_url + + parsed_url: ParseResult = urlparse(base_url) + + if "api.unstructuredapp.io" in base_url: + if parsed_url.scheme != "https": + parsed_url = parsed_url._replace(scheme="https") + + # -- path should always be empty + return urlunparse(parsed_url._replace(path="")) + + def sdk_init( + self, base_url: str, client: requests.Session + ) -> Tuple[str, requests.Session]: + """Concrete implementation for SDKInitHook.""" + cleaned_url = self.clean_server_url(base_url) + + return cleaned_url, client diff --git a/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/common.py b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/common.py new file mode 100644 index 00000000..ec759932 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/common.py @@ -0,0 +1 @@ +UNSTRUCTURED_CLIENT_LOGGER_NAME = "unstructured-client" diff --git a/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py new file mode 100644 index 00000000..bf97faa3 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/form_utils.py @@ -0,0 +1,228 @@ +from __future__ import annotations + +import logging +from typing import Union + +from requests_toolbelt.multipart.decoder import MultipartDecoder + +from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME +from unstructured_client.models import shared + +logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) +FormData = dict[str, Union[str, shared.Files, list[str]]] + +PARTITION_FORM_FILES_KEY = "files" +PARTITION_FORM_SPLIT_PDF_PAGE_KEY = "split_pdf_page" +PARTITION_FORM_PAGE_RANGE_KEY = "split_pdf_page_range[]" +PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY = "split_pdf_allow_failed" +PARTITION_FORM_STARTING_PAGE_NUMBER_KEY = "starting_page_number" +PARTITION_FORM_CONCURRENCY_LEVEL_KEY = "split_pdf_concurrency_level" + + +def get_page_range(form_data: FormData, key: str, max_pages: int) -> tuple[int, int]: + """Retrieves the split page range from the given form data. + + If the range is invalid or outside the bounds of the page count, + returns (1, num_pages), i.e. the full range. + + Args: + form_data: The form data containing the page range + key: The key to look for in the form data. + + Returns: + The range of pages to send in the request in the form (start, end) + """ + try: + _page_range = form_data.get(key) + + if _page_range is not None: + page_range = (int(_page_range[0]), int(_page_range[1])) + else: + page_range = (1, max_pages) + + except (ValueError, IndexError) as exc: + msg = f"{_page_range} is not a valid page range." + logger.error(msg) + raise ValueError(msg) from exc + + start, end = page_range + + if not 0 < start <= max_pages or not 0 < end <= max_pages or not start <= end: + msg = f"Page range {page_range} is out of bounds. Start and end values should be between 1 and {max_pages}." + logger.error(msg) + raise ValueError(msg) + + return page_range + + +def get_starting_page_number(form_data: FormData, key: str, fallback_value: int) -> int: + """Retrieves the starting page number from the given form data. + + In case given starting page number is not a valid integer or less than 1, it will + use the default value. + + Args: + form_data: The form data containing the starting page number. + key: The key to look for in the form data. + fallback_value: The default value to use in case of an error. + + Returns: + The starting page number. + """ + starting_page_number = fallback_value + try: + _starting_page_number = form_data.get(key) or fallback_value + starting_page_number = int(_starting_page_number) # type: ignore + except ValueError: + logger.warning( + "'%s' is not a valid integer. Using default value '%d'.", + key, + fallback_value, + ) + + if starting_page_number < 1: + logger.warning( + "'%s' is less than 1. Using default value '%d'.", + key, + fallback_value, + ) + starting_page_number = fallback_value + + return starting_page_number + +def get_split_pdf_allow_failed_param( + form_data: FormData, key: str, fallback_value: bool, +) -> bool: + """Retrieves the value for allow failed that should be used for splitting pdf. + + In case given the number is not a "false" or "true" literal, it will use the + default value. + + Args: + form_data: The form data containing the desired concurrency level. + key: The key to look for in the form data. + fallback_value: The default value to use in case of an error. + + Returns: + The concurrency level after validation. + """ + allow_failed = form_data.get(key) + + if allow_failed is None: + return fallback_value + + if allow_failed.lower() not in ["true", "false"]: + logger.warning( + "'%s' is not a valid boolean. Using default value '%s'.", + key, + fallback_value, + ) + return fallback_value + + return allow_failed.lower() == "true" + +def get_split_pdf_concurrency_level_param( + form_data: FormData, key: str, fallback_value: int, max_allowed: int +) -> int: + """Retrieves the value for concurreny level that should be used for splitting pdf. + + In case given the number is not a valid integer or less than 1, it will use the + default value. + + Args: + form_data: The form data containing the desired concurrency level. + key: The key to look for in the form data. + fallback_value: The default value to use in case of an error. + max_allowed: The maximum allowed value for the concurrency level. + + Returns: + The concurrency level after validation. + """ + concurrency_level_str = form_data.get(key) + + if concurrency_level_str is None: + return fallback_value + + try: + concurrency_level = int(concurrency_level_str) + except ValueError: + logger.warning( + "'%s' is not a valid integer. Using default value '%s'.", + key, + fallback_value, + ) + return fallback_value + + if concurrency_level < 1: + logger.warning( + "'%s' is less than 1. Using the default value = %s.", + key, + fallback_value, + ) + return fallback_value + + if concurrency_level > max_allowed: + logger.warning( + "'%s' is greater than %s. Using the maximum allowed value = %s.", + key, + max_allowed, + max_allowed, + ) + return max_allowed + + return concurrency_level + + +def decode_content_disposition(content_disposition: bytes) -> dict[str, str]: + """Decode the `Content-Disposition` header and return the parameters as a dictionary. + + Args: + content_disposition: The `Content-Disposition` header as bytes. + + Returns: + A dictionary containing the parameters extracted from the + `Content-Disposition` header. + """ + data = content_disposition.decode().split("; ")[1:] + parameters = [d.split("=") for d in data] + parameters_dict = {p[0]: p[1].strip('"') for p in parameters} + return parameters_dict + + +def parse_form_data(decoded_data: MultipartDecoder) -> FormData: + """Parses the form data from the decoded multipart data. + + Args: + decoded_data: The decoded multipart data. + + Returns: + The parsed form data. + """ + form_data: FormData = {} + + for part in decoded_data.parts: + content_disposition = part.headers.get(b"Content-Disposition") + if content_disposition is None: + raise RuntimeError("Content-Disposition header not found. Can't split pdf file.") + part_params = decode_content_disposition(content_disposition) + name = part_params.get("name") + + if name is None: + continue + + if name == PARTITION_FORM_FILES_KEY: + filename = part_params.get("filename") + if filename is None or not filename.strip(): + raise ValueError("Filename can't be an empty string.") + form_data[PARTITION_FORM_FILES_KEY] = shared.Files(part.content, filename) + else: + content = part.content.decode() + if name in form_data: + if isinstance(form_data[name], list): + form_data[name].append(content) + else: + form_data[name] = [form_data[name], content] + else: + form_data[name] = content + + return form_data diff --git a/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/logger_hook.py b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/logger_hook.py new file mode 100644 index 00000000..354f9ccd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/logger_hook.py @@ -0,0 +1,83 @@ +import logging +from typing import Optional, Tuple, Union, DefaultDict + +import requests + +from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME +from unstructured_client._hooks.types import ( + AfterSuccessContext, + AfterErrorContext, + AfterErrorHook, + SDKInitHook, + AfterSuccessHook, +) +from collections import defaultdict + +logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) + + +class LoggerHook(AfterErrorHook, AfterSuccessHook, SDKInitHook): + """Hook providing custom logging""" + + def __init__(self) -> None: + self.retries_counter: DefaultDict[str, int] = defaultdict(int) + + def log_retries(self, response: Optional[requests.Response], error: Optional[Exception], operation_id: str,): + """Log retries to give users visibility into requests.""" + + if response is not None and response.status_code // 100 == 5: + logger.info( + "Failed to process a request due to API server error with status code %d. " + "Attempting retry number %d after sleep.", + response.status_code, + self.retries_counter[operation_id], + ) + if response.text: + logger.info("Server message - %s", response.text) + + elif error is not None and isinstance(error, requests.exceptions.ConnectionError): + logger.info( + "Failed to process a request due to connection error - %s. " + "Attempting retry number %d after sleep.", + error, + self.retries_counter[operation_id], + ) + + + def sdk_init( + self, base_url: str, client: requests.Session + ) -> Tuple[str, requests.Session]: + logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + return base_url, client + + def after_success( + self, hook_ctx: AfterSuccessContext, response: requests.Response + ) -> Union[requests.Response, Exception]: + self.retries_counter.pop(hook_ctx.operation_id, None) + # NOTE: In case of split page partition this means - at least one of the splits was partitioned successfully + logger.info("Successfully partitioned the document.") + return response + + def after_error( + self, + hook_ctx: AfterErrorContext, + response: Optional[requests.Response], + error: Optional[Exception], + ) -> Union[Tuple[Optional[requests.Response], Optional[Exception]], Exception]: + """Concrete implementation for AfterErrorHook.""" + self.retries_counter[hook_ctx.operation_id] += 1 + self.log_retries(response, error, hook_ctx.operation_id) + + if response and response.status_code == 200: + # NOTE: Even though this is an after_error method, due to split_pdf_hook logic we may get + # a success here when one of the split requests was partitioned successfully + logger.info("Successfully partitioned the document.") + + else: + logger.error("Failed to partition the document.") + if response: + logger.error("Server responded with %d - %s", response.status_code, response.text) + if error is not None: + logger.error("Following error occurred - %s", error) + + return response, error diff --git a/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/pdf_utils.py b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/pdf_utils.py new file mode 100644 index 00000000..27dc5e03 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/pdf_utils.py @@ -0,0 +1,79 @@ +import io +import logging +from typing import Generator, Tuple, Optional + +from pypdf import PdfReader, PdfWriter +from pypdf.errors import PdfReadError + +from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME +from unstructured_client.models import shared + +logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) + +# Loading pdfs with strict=False can dump a lot of warnings +# We don't need to display these +pdf_logger = logging.getLogger("pypdf") +pdf_logger.setLevel(logging.ERROR) + + +def get_pdf_pages( + pdf: PdfReader, split_size: int = 1, page_start: int = 1, page_end: Optional[int] = None +) -> Generator[Tuple[io.BytesIO, int, int], None, None]: + """Reads given bytes of a pdf file and split it into n file-like objects, each + with `split_size` pages. + + Args: + file_content: Content of the PDF file. + split_size: Split size, e.g. if the given file has 10 pages + and this value is set to 2 it will yield 5 documents, each containing 2 pages + of the original document. By default it will split each page to a separate file. + page_start: Begin splitting at this page number + page_end: If provided, split up to and including this page number + + Yields: + The file contents with their page number and overall pages number of the original document. + """ + + offset = page_start - 1 + offset_end = page_end or len(pdf.pages) + + while offset < offset_end: + new_pdf = PdfWriter() + pdf_buffer = io.BytesIO() + + end = min(offset + split_size, offset_end) + + for page in list(pdf.pages[offset:end]): + new_pdf.add_page(page) + + new_pdf.write(pdf_buffer) + pdf_buffer.seek(0) + + yield pdf_buffer, offset, offset_end + offset += split_size + + +def is_pdf(file: shared.Files) -> bool: + """Checks if the given file is a PDF. + + First it checks the file extension and if it is equal to `.pdf`, then + it tries to read that file. If there is no error then we assume it is a proper PDF. + + Args: + file: The file to be checked. + + Returns: + True if the file is a PDF, False otherwise. + """ + if not file.file_name.endswith(".pdf"): + logger.info("Given file doesn't have '.pdf' extension, so splitting is not enabled.") + return False + + try: + PdfReader(io.BytesIO(file.content), strict=True) + except (PdfReadError, UnicodeDecodeError) as exc: + logger.error(exc) + logger.warning("The file does not appear to be a valid PDF.") + return False + + return True diff --git a/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/request_utils.py b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/request_utils.py new file mode 100644 index 00000000..1512e80b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/request_utils.py @@ -0,0 +1,190 @@ +from __future__ import annotations + +import asyncio +import copy +import io +import json +import logging +from typing import Optional, Tuple, Any + +import httpx +import requests +from requests.structures import CaseInsensitiveDict +from requests_toolbelt.multipart.encoder import MultipartEncoder + +from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME +from unstructured_client._hooks.custom.form_utils import ( + PARTITION_FORM_FILES_KEY, + PARTITION_FORM_SPLIT_PDF_PAGE_KEY, + PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY, + PARTITION_FORM_PAGE_RANGE_KEY, + PARTITION_FORM_STARTING_PAGE_NUMBER_KEY, + FormData, +) + +logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) + + +def create_request_body( + form_data: FormData, page_content: io.BytesIO, filename: str, page_number: int +) -> MultipartEncoder: + payload = prepare_request_payload(form_data) + + payload_fields: list[tuple[str, Any]] = [] + for key, value in payload.items(): + if isinstance(value, list): + payload_fields.extend([(key, list_value) for list_value in value]) + else: + payload_fields.append((key, value)) + + payload_fields.append((PARTITION_FORM_FILES_KEY, ( + filename, + page_content, + "application/pdf", + ))) + + payload_fields.append((PARTITION_FORM_STARTING_PAGE_NUMBER_KEY, str(page_number))) + + body = MultipartEncoder( + fields=payload_fields + ) + return body + + +def create_httpx_request( + original_request: requests.Request, body: MultipartEncoder +) -> httpx.Request: + headers = prepare_request_headers(original_request.headers) + return httpx.Request( + method="POST", + url=original_request.url or "", + content=body.to_string(), + headers={**headers, "Content-Type": body.content_type}, + ) + + +def create_request( + request: requests.PreparedRequest, + body: MultipartEncoder, +) -> requests.Request: + headers = prepare_request_headers(request.headers) + return requests.Request( + method="POST", + url=request.url or "", + data=body, + headers={**headers, "Content-Type": body.content_type}, + ) + + +async def call_api_async( + client: httpx.AsyncClient, + page: Tuple[io.BytesIO, int], + original_request: requests.Request, + form_data: FormData, + filename: str, + limiter: asyncio.Semaphore, +) -> tuple[int, dict]: + page_content, page_number = page + body = create_request_body(form_data, page_content, filename, page_number) + new_request = create_httpx_request(original_request, body) + async with limiter: + try: + response = await client.send(new_request) + return response.status_code, response.json() + except Exception: + logger.error("Failed to send request for page %d", page_number) + return 500, {} + + +def call_api( + client: Optional[requests.Session], + page: Tuple[io.BytesIO, int], + request: requests.PreparedRequest, + form_data: FormData, + filename: str, +) -> requests.Response: + if client is None: + raise RuntimeError("HTTP client not accessible!") + page_content, page_number = page + + body = create_request_body(form_data, page_content, filename, page_number) + new_request = create_request(request, body) + prepared_request = client.prepare_request(new_request) + + try: + return client.send(prepared_request) + except Exception: + logger.error("Failed to send request for page %d", page_number) + return requests.Response() + + +def prepare_request_headers( + headers: CaseInsensitiveDict[str], +) -> CaseInsensitiveDict[str]: + """Prepare the request headers by removing the 'Content-Type' and 'Content-Length' headers. + + Args: + headers: The original request headers. + + Returns: + The modified request headers. + """ + headers = copy.deepcopy(headers) + headers.pop("Content-Type", None) + headers.pop("Content-Length", None) + return headers + + +def prepare_request_payload(form_data: FormData) -> FormData: + """Prepares the request payload by removing unnecessary keys and updating the file. + + Args: + form_data: The original form data. + + Returns: + The updated request payload. + """ + payload = copy.deepcopy(form_data) + payload.pop(PARTITION_FORM_SPLIT_PDF_PAGE_KEY, None) + payload.pop(PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY, None) + payload.pop(PARTITION_FORM_FILES_KEY, None) + payload.pop(PARTITION_FORM_PAGE_RANGE_KEY, None) + payload.pop(PARTITION_FORM_STARTING_PAGE_NUMBER_KEY, None) + updated_parameters = { + PARTITION_FORM_SPLIT_PDF_PAGE_KEY: "false", + } + payload.update(updated_parameters) + return payload + + +def create_response(response: requests.Response, elements: list) -> requests.Response: + """ + Creates a modified response object with updated content. + + Args: + response: The original response object. + elements: The list of elements to be serialized and added to + the response. + + Returns: + The modified response object with updated content. + """ + response_copy = copy.deepcopy(response) + content = json.dumps(elements).encode() + content_length = str(len(content)) + response_copy.headers.update({"Content-Length": content_length}) + setattr(response_copy, "_content", content) + return response_copy + + +def log_after_split_response(status_code: int, split_number: int): + if status_code == 200: + logger.info( + "Successfully partitioned set #%d, elements added to the final result.", + split_number, + ) + else: + logger.warning( + "Failed to partition set #%d, its elements will be omitted in the final result.", + split_number, + ) diff --git a/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/split_pdf_hook.py b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/split_pdf_hook.py new file mode 100644 index 00000000..54a3b89e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -0,0 +1,445 @@ +from __future__ import annotations + +import asyncio +import io +import json +import logging +import math +from collections.abc import Awaitable +from typing import Any, Coroutine, Optional, Tuple, Union + +import httpx +import nest_asyncio +import requests +from pypdf import PdfReader +from requests_toolbelt.multipart.decoder import MultipartDecoder + +from unstructured_client._hooks.custom import form_utils, pdf_utils, request_utils +from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME +from unstructured_client._hooks.custom.form_utils import ( + PARTITION_FORM_CONCURRENCY_LEVEL_KEY, + PARTITION_FORM_FILES_KEY, + PARTITION_FORM_PAGE_RANGE_KEY, + PARTITION_FORM_SPLIT_PDF_PAGE_KEY, + PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY, + PARTITION_FORM_STARTING_PAGE_NUMBER_KEY, +) +from unstructured_client._hooks.types import ( + AfterErrorContext, + AfterErrorHook, + AfterSuccessContext, + AfterSuccessHook, + BeforeRequestContext, + BeforeRequestHook, + SDKInitHook, +) +from unstructured_client.models import shared + +logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) + +DEFAULT_STARTING_PAGE_NUMBER = 1 +DEFAULT_ALLOW_FAILED = False +DEFAULT_CONCURRENCY_LEVEL = 8 +MAX_CONCURRENCY_LEVEL = 15 +MIN_PAGES_PER_SPLIT = 2 +MAX_PAGES_PER_SPLIT = 20 + + +async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, requests.Response]: + response = await coro + return index, response + + +async def run_tasks(coroutines: list[Awaitable], allow_failed: bool = False) -> list[tuple[int, requests.Response]]: + if allow_failed: + responses = await asyncio.gather(*coroutines, return_exceptions=False) + return list(enumerate(responses, 1)) + # TODO: replace with asyncio.TaskGroup for python >3.11 # pylint: disable=fixme + tasks = [asyncio.create_task(_order_keeper(index, coro)) for index, coro in enumerate(coroutines, 1)] + results = [] + remaining_tasks = dict(enumerate(tasks, 1)) + for future in asyncio.as_completed(tasks): + index, response = await future + if response.status_code != 200: + # cancel all remaining tasks + for remaining_task in remaining_tasks.values(): + remaining_task.cancel() + results.append((index, response)) + break + results.append((index, response)) + # remove task from remaining_tasks that should be cancelled in case of failure + del remaining_tasks[index] + # return results in the original order + return sorted(results, key=lambda x: x[0]) + + +def context_is_uvloop(): + """Return true if uvloop is installed and we're currently in a uvloop context. Our asyncio splitting code currently doesn't work under uvloop.""" + try: + import uvloop # pylint: disable=import-outside-toplevel + loop = asyncio.get_event_loop() + return isinstance(loop, uvloop.Loop) + except (ImportError, RuntimeError): + return False + +def get_optimal_split_size(num_pages: int, concurrency_level: int) -> int: + """Distributes pages to workers evenly based on the number of pages and desired concurrency level.""" + if num_pages < MAX_PAGES_PER_SPLIT * concurrency_level: + split_size = math.ceil(num_pages / concurrency_level) + else: + split_size = MAX_PAGES_PER_SPLIT + + return max(split_size, MIN_PAGES_PER_SPLIT) + + +class SplitPdfHook(SDKInitHook, BeforeRequestHook, AfterSuccessHook, AfterErrorHook): + """ + A hook class that splits a PDF file into multiple pages and sends each page as + a separate request. This hook is designed to be used with an Speakeasy SDK. + + Usage: + 1. Create an instance of the `SplitPdfHook` class. + 2. Register SDK Init, Before Request, After Success and After Error hooks. + """ + + def __init__(self) -> None: + self.client: Optional[requests.Session] = None + self.coroutines_to_execute: dict[ + str, list[Coroutine[Any, Any, requests.Response]] + ] = {} + self.api_successful_responses: dict[str, list[requests.Response]] = {} + self.api_failed_responses: dict[str, list[requests.Response]] = {} + self.allow_failed: bool = DEFAULT_ALLOW_FAILED + + def sdk_init( + self, base_url: str, client: requests.Session + ) -> Tuple[str, requests.Session]: + """Initializes Split PDF Hook. + + Args: + base_url (str): URL of the API. + client (requests.Session): HTTP Client. + + Returns: + Tuple[str, requests.Session]: The initialized SDK options. + """ + self.client = client + return base_url, client + + + # pylint: disable=too-many-return-statements + def before_request( + self, hook_ctx: BeforeRequestContext, request: requests.PreparedRequest + ) -> Union[requests.PreparedRequest, Exception]: + """If `splitPdfPage` is set to `true` in the request, the PDF file is split into + separate pages. Each page is sent as a separate request in parallel. The last + page request is returned by this method. It will return the original request + when: `splitPdfPage` is set to `false`, the file is not a PDF, or the HTTP + has not been initialized. + + Args: + hook_ctx (BeforeRequestContext): The hook context containing information about + the operation. + request (requests.PreparedRequest): The request object. + + Returns: + Union[requests.PreparedRequest, Exception]: If `splitPdfPage` is set to `true`, + the last page request; otherwise, the original request. + """ + if self.client is None: + logger.warning("HTTP client not accessible! Continuing without splitting.") + return request + + if context_is_uvloop(): + logger.warning("Splitting is currently incompatible with uvloop. Continuing without splitting.") + return request + + # This allows us to use an event loop in an env with an existing loop + # Temporary fix until we can improve the async splitting behavior + nest_asyncio.apply() + operation_id = hook_ctx.operation_id + content_type = request.headers.get("Content-Type") + body = request.body + if not isinstance(body, bytes) or content_type is None: + return request + + decoded_body = MultipartDecoder(body, content_type) + form_data = form_utils.parse_form_data(decoded_body) + split_pdf_page = form_data.get(PARTITION_FORM_SPLIT_PDF_PAGE_KEY) + if split_pdf_page is None or split_pdf_page == "false": + logger.info("Partitioning without split.") + return request + + logger.info("Preparing to split document for partition.") + file = form_data.get(PARTITION_FORM_FILES_KEY) + if ( + file is None + or not isinstance(file, shared.Files) + or not pdf_utils.is_pdf(file) + ): + logger.info("Partitioning without split.") + return request + + starting_page_number = form_utils.get_starting_page_number( + form_data, + key=PARTITION_FORM_STARTING_PAGE_NUMBER_KEY, + fallback_value=DEFAULT_STARTING_PAGE_NUMBER, + ) + if starting_page_number > 1: + logger.info("Starting page number set to %d", starting_page_number) + logger.info("Starting page number set to %d", starting_page_number) + + self.allow_failed = form_utils.get_split_pdf_allow_failed_param( + form_data, + key=PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY, + fallback_value=DEFAULT_ALLOW_FAILED, + ) + logger.info("Allow failed set to %d", self.allow_failed) + + concurrency_level = form_utils.get_split_pdf_concurrency_level_param( + form_data, + key=PARTITION_FORM_CONCURRENCY_LEVEL_KEY, + fallback_value=DEFAULT_CONCURRENCY_LEVEL, + max_allowed=MAX_CONCURRENCY_LEVEL, + ) + logger.info("Concurrency level set to %d", concurrency_level) + limiter = asyncio.Semaphore(concurrency_level) + + pdf = PdfReader(io.BytesIO(file.content)) + + page_range_start, page_range_end = form_utils.get_page_range( + form_data, + key=PARTITION_FORM_PAGE_RANGE_KEY, + max_pages=len(pdf.pages), + ) + + page_count = page_range_end - page_range_start + 1 + logger.info( + "Splitting pages %d to %d (%d total)", + page_range_start, + page_range_end, + page_count, + ) + + split_size = get_optimal_split_size( + num_pages=page_count, concurrency_level=concurrency_level + ) + logger.info("Determined optimal split size of %d pages.", split_size) + + # If the doc is small enough, and we aren't slicing it with a page range: + # do not split, just continue with the original request + if split_size >= page_count and page_count == len(pdf.pages): + logger.info( + "Document has too few pages (%d) to be split efficiently. Partitioning without split.", + page_count, + ) + return request + + pages = pdf_utils.get_pdf_pages(pdf, split_size=split_size, page_start=page_range_start, page_end=page_range_end) + logger.info( + "Partitioning %d files with %d page(s) each.", + math.floor(page_count / split_size), + split_size, + ) + + # Log the remainder pages if there are any + if page_count % split_size > 0: + logger.info( + "Partitioning 1 file with %d page(s).", + page_count % split_size, + ) + + async def call_api_partial(page): + async with httpx.AsyncClient() as client: + status_code, json_response = await request_utils.call_api_async( + client=client, + original_request=request, + form_data=form_data, + filename=file.file_name, + page=page, + limiter=limiter, + ) + + # convert httpx response to requests.Response to preserve + # compatibility with the synchronous SDK generated by speakeasy + response = requests.Response() + response.status_code = status_code + response._content = json.dumps( # pylint: disable=W0212 + json_response + ).encode() + response.headers["Content-Type"] = "application/json" + return response + + self.coroutines_to_execute[operation_id] = [] + last_page_content = io.BytesIO() + last_page_number = 0 + set_index = 1 + for page_content, page_index, all_pages_number in pages: + page_number = page_index + starting_page_number + logger.info( + "Partitioning set #%d (pages %d-%d).", + set_index, + page_number, + min(page_number + split_size - 1, all_pages_number), + ) + # Check if this set of pages is the last one + if page_index + split_size >= all_pages_number: + last_page_content = page_content + last_page_number = page_number + break + coroutine = call_api_partial((page_content, page_number)) + self.coroutines_to_execute[operation_id].append(coroutine) + set_index += 1 + # `before_request` method needs to return a request so we skip sending the last page in parallel + # and return that last page at the end of this method + + body = request_utils.create_request_body( + form_data, last_page_content, file.file_name, last_page_number + ) + last_page_request = request_utils.create_request(request, body) + last_page_prepared_request = self.client.prepare_request(last_page_request) + return last_page_prepared_request + + def _await_elements( + self, operation_id: str, response: requests.Response + ) -> Optional[list]: + """ + Waits for the partition requests to complete and returns the flattened + elements. + + Args: + operation_id (str): The ID of the operation. + response (requests.Response): The response object. + + Returns: + Optional[list]: The flattened elements if the partition requests are + completed, otherwise None. + """ + tasks = self.coroutines_to_execute.get(operation_id) + if tasks is None: + return None + + ioloop = asyncio.get_event_loop() + task_responses: list[tuple[int, requests.Response]] = ioloop.run_until_complete( + run_tasks(tasks, allow_failed=self.allow_failed) + ) + + if task_responses is None: + return None + + successful_responses = [] + failed_responses = [] + elements = [] + for response_number, res in task_responses: + request_utils.log_after_split_response(res.status_code, response_number) + if res.status_code == 200: + successful_responses.append(res) + elements.append(res.json()) + else: + failed_responses.append(res) + + if self.allow_failed or not failed_responses: + last_response_number = len(task_responses) + 1 + request_utils.log_after_split_response( + response.status_code, last_response_number + ) + if response.status_code == 200: + elements.append(response.json()) + successful_responses.append(response) + else: + failed_responses.append(response) + + self.api_successful_responses[operation_id] = successful_responses + self.api_failed_responses[operation_id] = failed_responses + flattened_elements = [element for sublist in elements for element in sublist] + return flattened_elements + + def after_success( + self, hook_ctx: AfterSuccessContext, response: requests.Response + ) -> Union[requests.Response, Exception]: + """Executes after a successful API request. Awaits all parallel requests and + combines the responses into a single response object. + + Args: + hook_ctx (AfterSuccessContext): The context object containing information + about the hook execution. + response (requests.Response): The response object returned from the API + request. + + Returns: + Union[requests.Response, Exception]: If requests were run in parallel, a + combined response object; otherwise, the original response. Can return + exception if it ocurred during the execution. + """ + operation_id = hook_ctx.operation_id + # Because in `before_request` method we skipped sending last page in parallel + # we need to pass response, which contains last page, to `_await_elements` method + elements = self._await_elements(operation_id, response) + + # if fails are disallowed, return the first failed response + if not self.allow_failed and self.api_failed_responses.get(operation_id): + return self.api_failed_responses[operation_id][0] + + if elements is None: + return response + + updated_response = request_utils.create_response(response, elements) + self._clear_operation(operation_id) + return updated_response + + def after_error( + self, + hook_ctx: AfterErrorContext, + response: Optional[requests.Response], + error: Optional[Exception], + ) -> Union[Tuple[Optional[requests.Response], Optional[Exception]], Exception]: + """Executes after an unsuccessful API request. Awaits all parallel requests, + if at least one request was successful, combines the responses into a single + response object and doesn't throw an error. It will return an error only if + all requests failed, or there was no PDF split. + + Args: + hook_ctx (AfterErrorContext): The AfterErrorContext object containing + information about the hook context. + response (Optional[requests.Response]): The Response object representing + the response received before the exception occurred. + error (Optional[Exception]): The exception object that was thrown. + + Returns: + Union[Tuple[Optional[requests.Response], Optional[Exception]], Exception]: + If requests were run in parallel, and at least one was successful, a combined + response object; otherwise, the original response and exception. + """ + + # if fails are disallowed - return response and error objects immediately + if not self.allow_failed: + return (response, error) + + operation_id = hook_ctx.operation_id + # We know that this request failed so we pass a failed or empty response to `_await_elements` method + # where it checks if at least on of the other requests succeeded + elements = self._await_elements(operation_id, response or requests.Response()) + successful_responses = self.api_successful_responses.get(operation_id) + + if elements is None or successful_responses is None: + return (response, error) + + if len(successful_responses) == 0: + self._clear_operation(operation_id) + return (response, error) + + updated_response = request_utils.create_response( + successful_responses[0], elements + ) + self._clear_operation(operation_id) + return (updated_response, None) + + def _clear_operation(self, operation_id: str) -> None: + """ + Clears the operation data associated with the given operation ID. + + Args: + operation_id (str): The ID of the operation to clear. + """ + self.coroutines_to_execute.pop(operation_id, None) + self.api_successful_responses.pop(operation_id, None) diff --git a/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/suggest_defining_url.py b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/suggest_defining_url.py new file mode 100644 index 00000000..d9268159 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/suggest_defining_url.py @@ -0,0 +1,30 @@ +from typing import Optional, Tuple, Union + +import logging +import requests + +from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME +from unstructured_client._hooks.types import AfterErrorContext, AfterErrorHook + +logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) + + +class SuggestDefiningUrlIf401AfterErrorHook(AfterErrorHook): + """Hook advising users to check that 'server_url' is defined if a 401 error is encountered.""" + + def warn_if_401(self, response: Optional[requests.Response]): + """If the paid API returns 401, warn the user in case they meant to use the free api.""" + if response is not None and response.status_code == 401: + logger.warning( + "This API key is invalid against the paid API. If intending to use the free API, please initialize UnstructuredClient with `server='free-api'`." + ) + + def after_error( + self, + hook_ctx: AfterErrorContext, + response: Optional[requests.Response], + error: Optional[Exception], + ) -> Union[Tuple[Optional[requests.Response], Optional[Exception]], Exception]: + """Concrete implementation for AfterErrorHook.""" + self.warn_if_401(response) + return response, error |