From 4a52a71956a8d46fcb7294ac71734504bb09bcc2 Mon Sep 17 00:00:00 2001 From: S. Solomon Darnell Date: Fri, 28 Mar 2025 21:52:21 -0500 Subject: two version of R2R are here --- .../asgiref/current_thread_executor.py | 115 +++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 .venv/lib/python3.12/site-packages/asgiref/current_thread_executor.py (limited to '.venv/lib/python3.12/site-packages/asgiref/current_thread_executor.py') diff --git a/.venv/lib/python3.12/site-packages/asgiref/current_thread_executor.py b/.venv/lib/python3.12/site-packages/asgiref/current_thread_executor.py new file mode 100644 index 00000000..67a7926f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/asgiref/current_thread_executor.py @@ -0,0 +1,115 @@ +import queue +import sys +import threading +from concurrent.futures import Executor, Future +from typing import TYPE_CHECKING, Any, Callable, TypeVar, Union + +if sys.version_info >= (3, 10): + from typing import ParamSpec +else: + from typing_extensions import ParamSpec + +_T = TypeVar("_T") +_P = ParamSpec("_P") +_R = TypeVar("_R") + + +class _WorkItem: + """ + Represents an item needing to be run in the executor. + Copied from ThreadPoolExecutor (but it's private, so we're not going to rely on importing it) + """ + + def __init__( + self, + future: "Future[_R]", + fn: Callable[_P, _R], + *args: _P.args, + **kwargs: _P.kwargs, + ): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def run(self) -> None: + __traceback_hide__ = True # noqa: F841 + if not self.future.set_running_or_notify_cancel(): + return + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException as exc: + self.future.set_exception(exc) + # Break a reference cycle with the exception 'exc' + self = None # type: ignore[assignment] + else: + self.future.set_result(result) + + +class CurrentThreadExecutor(Executor): + """ + An Executor that actually runs code in the thread it is instantiated in. + Passed to other threads running async code, so they can run sync code in + the thread they came from. + """ + + def __init__(self) -> None: + self._work_thread = threading.current_thread() + self._work_queue: queue.Queue[Union[_WorkItem, "Future[Any]"]] = queue.Queue() + self._broken = False + + def run_until_future(self, future: "Future[Any]") -> None: + """ + Runs the code in the work queue until a result is available from the future. + Should be run from the thread the executor is initialised in. + """ + # Check we're in the right thread + if threading.current_thread() != self._work_thread: + raise RuntimeError( + "You cannot run CurrentThreadExecutor from a different thread" + ) + future.add_done_callback(self._work_queue.put) + # Keep getting and running work items until we get the future we're waiting for + # back via the future's done callback. + try: + while True: + # Get a work item and run it + work_item = self._work_queue.get() + if work_item is future: + return + assert isinstance(work_item, _WorkItem) + work_item.run() + del work_item + finally: + self._broken = True + + def _submit( + self, + fn: Callable[_P, _R], + *args: _P.args, + **kwargs: _P.kwargs, + ) -> "Future[_R]": + # Check they're not submitting from the same thread + if threading.current_thread() == self._work_thread: + raise RuntimeError( + "You cannot submit onto CurrentThreadExecutor from its own thread" + ) + # Check they're not too late or the executor errored + if self._broken: + raise RuntimeError("CurrentThreadExecutor already quit or is broken") + # Add to work queue + f: "Future[_R]" = Future() + work_item = _WorkItem(f, fn, *args, **kwargs) + self._work_queue.put(work_item) + # Return the future + return f + + # Python 3.9+ has a new signature for submit with a "/" after `fn`, to enforce + # it to be a positional argument. If we ignore[override] mypy on 3.9+ will be + # happy but 3.8 will say that the ignore comment is unused, even when + # defining them differently based on sys.version_info. + # We should be able to remove this when we drop support for 3.8. + if not TYPE_CHECKING: + + def submit(self, fn, *args, **kwargs): + return self._submit(fn, *args, **kwargs) -- cgit v1.2.3