aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/split_pdf_hook.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/split_pdf_hook.py')
-rw-r--r--.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/split_pdf_hook.py445
1 files changed, 445 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/split_pdf_hook.py b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/split_pdf_hook.py
new file mode 100644
index 00000000..54a3b89e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/unstructured_client/_hooks/custom/split_pdf_hook.py
@@ -0,0 +1,445 @@
+from __future__ import annotations
+
+import asyncio
+import io
+import json
+import logging
+import math
+from collections.abc import Awaitable
+from typing import Any, Coroutine, Optional, Tuple, Union
+
+import httpx
+import nest_asyncio
+import requests
+from pypdf import PdfReader
+from requests_toolbelt.multipart.decoder import MultipartDecoder
+
+from unstructured_client._hooks.custom import form_utils, pdf_utils, request_utils
+from unstructured_client._hooks.custom.common import UNSTRUCTURED_CLIENT_LOGGER_NAME
+from unstructured_client._hooks.custom.form_utils import (
+ PARTITION_FORM_CONCURRENCY_LEVEL_KEY,
+ PARTITION_FORM_FILES_KEY,
+ PARTITION_FORM_PAGE_RANGE_KEY,
+ PARTITION_FORM_SPLIT_PDF_PAGE_KEY,
+ PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY,
+ PARTITION_FORM_STARTING_PAGE_NUMBER_KEY,
+)
+from unstructured_client._hooks.types import (
+ AfterErrorContext,
+ AfterErrorHook,
+ AfterSuccessContext,
+ AfterSuccessHook,
+ BeforeRequestContext,
+ BeforeRequestHook,
+ SDKInitHook,
+)
+from unstructured_client.models import shared
+
+logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME)
+
+DEFAULT_STARTING_PAGE_NUMBER = 1
+DEFAULT_ALLOW_FAILED = False
+DEFAULT_CONCURRENCY_LEVEL = 8
+MAX_CONCURRENCY_LEVEL = 15
+MIN_PAGES_PER_SPLIT = 2
+MAX_PAGES_PER_SPLIT = 20
+
+
+async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, requests.Response]:
+ response = await coro
+ return index, response
+
+
+async def run_tasks(coroutines: list[Awaitable], allow_failed: bool = False) -> list[tuple[int, requests.Response]]:
+ if allow_failed:
+ responses = await asyncio.gather(*coroutines, return_exceptions=False)
+ return list(enumerate(responses, 1))
+ # TODO: replace with asyncio.TaskGroup for python >3.11 # pylint: disable=fixme
+ tasks = [asyncio.create_task(_order_keeper(index, coro)) for index, coro in enumerate(coroutines, 1)]
+ results = []
+ remaining_tasks = dict(enumerate(tasks, 1))
+ for future in asyncio.as_completed(tasks):
+ index, response = await future
+ if response.status_code != 200:
+ # cancel all remaining tasks
+ for remaining_task in remaining_tasks.values():
+ remaining_task.cancel()
+ results.append((index, response))
+ break
+ results.append((index, response))
+ # remove task from remaining_tasks that should be cancelled in case of failure
+ del remaining_tasks[index]
+ # return results in the original order
+ return sorted(results, key=lambda x: x[0])
+
+
+def context_is_uvloop():
+ """Return true if uvloop is installed and we're currently in a uvloop context. Our asyncio splitting code currently doesn't work under uvloop."""
+ try:
+ import uvloop # pylint: disable=import-outside-toplevel
+ loop = asyncio.get_event_loop()
+ return isinstance(loop, uvloop.Loop)
+ except (ImportError, RuntimeError):
+ return False
+
+def get_optimal_split_size(num_pages: int, concurrency_level: int) -> int:
+ """Distributes pages to workers evenly based on the number of pages and desired concurrency level."""
+ if num_pages < MAX_PAGES_PER_SPLIT * concurrency_level:
+ split_size = math.ceil(num_pages / concurrency_level)
+ else:
+ split_size = MAX_PAGES_PER_SPLIT
+
+ return max(split_size, MIN_PAGES_PER_SPLIT)
+
+
+class SplitPdfHook(SDKInitHook, BeforeRequestHook, AfterSuccessHook, AfterErrorHook):
+ """
+ A hook class that splits a PDF file into multiple pages and sends each page as
+ a separate request. This hook is designed to be used with an Speakeasy SDK.
+
+ Usage:
+ 1. Create an instance of the `SplitPdfHook` class.
+ 2. Register SDK Init, Before Request, After Success and After Error hooks.
+ """
+
+ def __init__(self) -> None:
+ self.client: Optional[requests.Session] = None
+ self.coroutines_to_execute: dict[
+ str, list[Coroutine[Any, Any, requests.Response]]
+ ] = {}
+ self.api_successful_responses: dict[str, list[requests.Response]] = {}
+ self.api_failed_responses: dict[str, list[requests.Response]] = {}
+ self.allow_failed: bool = DEFAULT_ALLOW_FAILED
+
+ def sdk_init(
+ self, base_url: str, client: requests.Session
+ ) -> Tuple[str, requests.Session]:
+ """Initializes Split PDF Hook.
+
+ Args:
+ base_url (str): URL of the API.
+ client (requests.Session): HTTP Client.
+
+ Returns:
+ Tuple[str, requests.Session]: The initialized SDK options.
+ """
+ self.client = client
+ return base_url, client
+
+
+ # pylint: disable=too-many-return-statements
+ def before_request(
+ self, hook_ctx: BeforeRequestContext, request: requests.PreparedRequest
+ ) -> Union[requests.PreparedRequest, Exception]:
+ """If `splitPdfPage` is set to `true` in the request, the PDF file is split into
+ separate pages. Each page is sent as a separate request in parallel. The last
+ page request is returned by this method. It will return the original request
+ when: `splitPdfPage` is set to `false`, the file is not a PDF, or the HTTP
+ has not been initialized.
+
+ Args:
+ hook_ctx (BeforeRequestContext): The hook context containing information about
+ the operation.
+ request (requests.PreparedRequest): The request object.
+
+ Returns:
+ Union[requests.PreparedRequest, Exception]: If `splitPdfPage` is set to `true`,
+ the last page request; otherwise, the original request.
+ """
+ if self.client is None:
+ logger.warning("HTTP client not accessible! Continuing without splitting.")
+ return request
+
+ if context_is_uvloop():
+ logger.warning("Splitting is currently incompatible with uvloop. Continuing without splitting.")
+ return request
+
+ # This allows us to use an event loop in an env with an existing loop
+ # Temporary fix until we can improve the async splitting behavior
+ nest_asyncio.apply()
+ operation_id = hook_ctx.operation_id
+ content_type = request.headers.get("Content-Type")
+ body = request.body
+ if not isinstance(body, bytes) or content_type is None:
+ return request
+
+ decoded_body = MultipartDecoder(body, content_type)
+ form_data = form_utils.parse_form_data(decoded_body)
+ split_pdf_page = form_data.get(PARTITION_FORM_SPLIT_PDF_PAGE_KEY)
+ if split_pdf_page is None or split_pdf_page == "false":
+ logger.info("Partitioning without split.")
+ return request
+
+ logger.info("Preparing to split document for partition.")
+ file = form_data.get(PARTITION_FORM_FILES_KEY)
+ if (
+ file is None
+ or not isinstance(file, shared.Files)
+ or not pdf_utils.is_pdf(file)
+ ):
+ logger.info("Partitioning without split.")
+ return request
+
+ starting_page_number = form_utils.get_starting_page_number(
+ form_data,
+ key=PARTITION_FORM_STARTING_PAGE_NUMBER_KEY,
+ fallback_value=DEFAULT_STARTING_PAGE_NUMBER,
+ )
+ if starting_page_number > 1:
+ logger.info("Starting page number set to %d", starting_page_number)
+ logger.info("Starting page number set to %d", starting_page_number)
+
+ self.allow_failed = form_utils.get_split_pdf_allow_failed_param(
+ form_data,
+ key=PARTITION_FORM_SPLIT_PDF_ALLOW_FAILED_KEY,
+ fallback_value=DEFAULT_ALLOW_FAILED,
+ )
+ logger.info("Allow failed set to %d", self.allow_failed)
+
+ concurrency_level = form_utils.get_split_pdf_concurrency_level_param(
+ form_data,
+ key=PARTITION_FORM_CONCURRENCY_LEVEL_KEY,
+ fallback_value=DEFAULT_CONCURRENCY_LEVEL,
+ max_allowed=MAX_CONCURRENCY_LEVEL,
+ )
+ logger.info("Concurrency level set to %d", concurrency_level)
+ limiter = asyncio.Semaphore(concurrency_level)
+
+ pdf = PdfReader(io.BytesIO(file.content))
+
+ page_range_start, page_range_end = form_utils.get_page_range(
+ form_data,
+ key=PARTITION_FORM_PAGE_RANGE_KEY,
+ max_pages=len(pdf.pages),
+ )
+
+ page_count = page_range_end - page_range_start + 1
+ logger.info(
+ "Splitting pages %d to %d (%d total)",
+ page_range_start,
+ page_range_end,
+ page_count,
+ )
+
+ split_size = get_optimal_split_size(
+ num_pages=page_count, concurrency_level=concurrency_level
+ )
+ logger.info("Determined optimal split size of %d pages.", split_size)
+
+ # If the doc is small enough, and we aren't slicing it with a page range:
+ # do not split, just continue with the original request
+ if split_size >= page_count and page_count == len(pdf.pages):
+ logger.info(
+ "Document has too few pages (%d) to be split efficiently. Partitioning without split.",
+ page_count,
+ )
+ return request
+
+ pages = pdf_utils.get_pdf_pages(pdf, split_size=split_size, page_start=page_range_start, page_end=page_range_end)
+ logger.info(
+ "Partitioning %d files with %d page(s) each.",
+ math.floor(page_count / split_size),
+ split_size,
+ )
+
+ # Log the remainder pages if there are any
+ if page_count % split_size > 0:
+ logger.info(
+ "Partitioning 1 file with %d page(s).",
+ page_count % split_size,
+ )
+
+ async def call_api_partial(page):
+ async with httpx.AsyncClient() as client:
+ status_code, json_response = await request_utils.call_api_async(
+ client=client,
+ original_request=request,
+ form_data=form_data,
+ filename=file.file_name,
+ page=page,
+ limiter=limiter,
+ )
+
+ # convert httpx response to requests.Response to preserve
+ # compatibility with the synchronous SDK generated by speakeasy
+ response = requests.Response()
+ response.status_code = status_code
+ response._content = json.dumps( # pylint: disable=W0212
+ json_response
+ ).encode()
+ response.headers["Content-Type"] = "application/json"
+ return response
+
+ self.coroutines_to_execute[operation_id] = []
+ last_page_content = io.BytesIO()
+ last_page_number = 0
+ set_index = 1
+ for page_content, page_index, all_pages_number in pages:
+ page_number = page_index + starting_page_number
+ logger.info(
+ "Partitioning set #%d (pages %d-%d).",
+ set_index,
+ page_number,
+ min(page_number + split_size - 1, all_pages_number),
+ )
+ # Check if this set of pages is the last one
+ if page_index + split_size >= all_pages_number:
+ last_page_content = page_content
+ last_page_number = page_number
+ break
+ coroutine = call_api_partial((page_content, page_number))
+ self.coroutines_to_execute[operation_id].append(coroutine)
+ set_index += 1
+ # `before_request` method needs to return a request so we skip sending the last page in parallel
+ # and return that last page at the end of this method
+
+ body = request_utils.create_request_body(
+ form_data, last_page_content, file.file_name, last_page_number
+ )
+ last_page_request = request_utils.create_request(request, body)
+ last_page_prepared_request = self.client.prepare_request(last_page_request)
+ return last_page_prepared_request
+
+ def _await_elements(
+ self, operation_id: str, response: requests.Response
+ ) -> Optional[list]:
+ """
+ Waits for the partition requests to complete and returns the flattened
+ elements.
+
+ Args:
+ operation_id (str): The ID of the operation.
+ response (requests.Response): The response object.
+
+ Returns:
+ Optional[list]: The flattened elements if the partition requests are
+ completed, otherwise None.
+ """
+ tasks = self.coroutines_to_execute.get(operation_id)
+ if tasks is None:
+ return None
+
+ ioloop = asyncio.get_event_loop()
+ task_responses: list[tuple[int, requests.Response]] = ioloop.run_until_complete(
+ run_tasks(tasks, allow_failed=self.allow_failed)
+ )
+
+ if task_responses is None:
+ return None
+
+ successful_responses = []
+ failed_responses = []
+ elements = []
+ for response_number, res in task_responses:
+ request_utils.log_after_split_response(res.status_code, response_number)
+ if res.status_code == 200:
+ successful_responses.append(res)
+ elements.append(res.json())
+ else:
+ failed_responses.append(res)
+
+ if self.allow_failed or not failed_responses:
+ last_response_number = len(task_responses) + 1
+ request_utils.log_after_split_response(
+ response.status_code, last_response_number
+ )
+ if response.status_code == 200:
+ elements.append(response.json())
+ successful_responses.append(response)
+ else:
+ failed_responses.append(response)
+
+ self.api_successful_responses[operation_id] = successful_responses
+ self.api_failed_responses[operation_id] = failed_responses
+ flattened_elements = [element for sublist in elements for element in sublist]
+ return flattened_elements
+
+ def after_success(
+ self, hook_ctx: AfterSuccessContext, response: requests.Response
+ ) -> Union[requests.Response, Exception]:
+ """Executes after a successful API request. Awaits all parallel requests and
+ combines the responses into a single response object.
+
+ Args:
+ hook_ctx (AfterSuccessContext): The context object containing information
+ about the hook execution.
+ response (requests.Response): The response object returned from the API
+ request.
+
+ Returns:
+ Union[requests.Response, Exception]: If requests were run in parallel, a
+ combined response object; otherwise, the original response. Can return
+ exception if it ocurred during the execution.
+ """
+ operation_id = hook_ctx.operation_id
+ # Because in `before_request` method we skipped sending last page in parallel
+ # we need to pass response, which contains last page, to `_await_elements` method
+ elements = self._await_elements(operation_id, response)
+
+ # if fails are disallowed, return the first failed response
+ if not self.allow_failed and self.api_failed_responses.get(operation_id):
+ return self.api_failed_responses[operation_id][0]
+
+ if elements is None:
+ return response
+
+ updated_response = request_utils.create_response(response, elements)
+ self._clear_operation(operation_id)
+ return updated_response
+
+ def after_error(
+ self,
+ hook_ctx: AfterErrorContext,
+ response: Optional[requests.Response],
+ error: Optional[Exception],
+ ) -> Union[Tuple[Optional[requests.Response], Optional[Exception]], Exception]:
+ """Executes after an unsuccessful API request. Awaits all parallel requests,
+ if at least one request was successful, combines the responses into a single
+ response object and doesn't throw an error. It will return an error only if
+ all requests failed, or there was no PDF split.
+
+ Args:
+ hook_ctx (AfterErrorContext): The AfterErrorContext object containing
+ information about the hook context.
+ response (Optional[requests.Response]): The Response object representing
+ the response received before the exception occurred.
+ error (Optional[Exception]): The exception object that was thrown.
+
+ Returns:
+ Union[Tuple[Optional[requests.Response], Optional[Exception]], Exception]:
+ If requests were run in parallel, and at least one was successful, a combined
+ response object; otherwise, the original response and exception.
+ """
+
+ # if fails are disallowed - return response and error objects immediately
+ if not self.allow_failed:
+ return (response, error)
+
+ operation_id = hook_ctx.operation_id
+ # We know that this request failed so we pass a failed or empty response to `_await_elements` method
+ # where it checks if at least on of the other requests succeeded
+ elements = self._await_elements(operation_id, response or requests.Response())
+ successful_responses = self.api_successful_responses.get(operation_id)
+
+ if elements is None or successful_responses is None:
+ return (response, error)
+
+ if len(successful_responses) == 0:
+ self._clear_operation(operation_id)
+ return (response, error)
+
+ updated_response = request_utils.create_response(
+ successful_responses[0], elements
+ )
+ self._clear_operation(operation_id)
+ return (updated_response, None)
+
+ def _clear_operation(self, operation_id: str) -> None:
+ """
+ Clears the operation data associated with the given operation ID.
+
+ Args:
+ operation_id (str): The ID of the operation to clear.
+ """
+ self.coroutines_to_execute.pop(operation_id, None)
+ self.api_successful_responses.pop(operation_id, None)