diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/asyncify.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/litellm/litellm_core_utils/asyncify.py | 116 |
1 files changed, 116 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/asyncify.py b/.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/asyncify.py new file mode 100644 index 00000000..8d56a1bb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/asyncify.py @@ -0,0 +1,116 @@ +import asyncio +import functools +from typing import Awaitable, Callable, Optional + +import anyio +import anyio.to_thread +from typing_extensions import ParamSpec, TypeVar + +T_ParamSpec = ParamSpec("T_ParamSpec") +T_Retval = TypeVar("T_Retval") + + +def function_has_argument(function: Callable, arg_name: str) -> bool: + """Helper function to check if a function has a specific argument.""" + import inspect + + signature = inspect.signature(function) + return arg_name in signature.parameters + + +def asyncify( + function: Callable[T_ParamSpec, T_Retval], + *, + cancellable: bool = False, + limiter: Optional[anyio.CapacityLimiter] = None, +) -> Callable[T_ParamSpec, Awaitable[T_Retval]]: + """ + Take a blocking function and create an async one that receives the same + positional and keyword arguments, and that when called, calls the original function + in a worker thread using `anyio.to_thread.run_sync()`. + + If the `cancellable` option is enabled and the task waiting for its completion is + cancelled, the thread will still run its course but its return value (or any raised + exception) will be ignored. + + ## Arguments + - `function`: a blocking regular callable (e.g. a function) + - `cancellable`: `True` to allow cancellation of the operation + - `limiter`: capacity limiter to use to limit the total amount of threads running + (if omitted, the default limiter is used) + + ## Return + An async function that takes the same positional and keyword arguments as the + original one, that when called runs the same original function in a thread worker + and returns the result. + """ + + async def wrapper( + *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs + ) -> T_Retval: + partial_f = functools.partial(function, *args, **kwargs) + + # In `v4.1.0` anyio added the `abandon_on_cancel` argument and deprecated the old + # `cancellable` argument, so we need to use the new `abandon_on_cancel` to avoid + # surfacing deprecation warnings. + if function_has_argument(anyio.to_thread.run_sync, "abandon_on_cancel"): + return await anyio.to_thread.run_sync( + partial_f, + abandon_on_cancel=cancellable, + limiter=limiter, + ) + + return await anyio.to_thread.run_sync( + partial_f, + cancellable=cancellable, + limiter=limiter, + ) + + return wrapper + + +def run_async_function(async_function, *args, **kwargs): + """ + Helper utility to run an async function in a sync context. + Handles the case where there is an existing event loop running. + + Args: + async_function (Callable): The async function to run + *args: Positional arguments to pass to the async function + **kwargs: Keyword arguments to pass to the async function + + Returns: + The result of the async function execution + + Example: + ```python + async def my_async_func(x, y): + return x + y + + result = run_async_function(my_async_func, 1, 2) + ``` + """ + from concurrent.futures import ThreadPoolExecutor + + def run_in_new_loop(): + """Run the coroutine in a new event loop within this thread.""" + new_loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(new_loop) + return new_loop.run_until_complete(async_function(*args, **kwargs)) + finally: + new_loop.close() + asyncio.set_event_loop(None) + + try: + # First, try to get the current event loop + _ = asyncio.get_running_loop() + # If we're already in an event loop, run in a separate thread + # to avoid nested event loop issues + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit(run_in_new_loop) + return future.result() + + except RuntimeError: + # No running event loop, we can safely run in this thread + return run_in_new_loop() |