about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/asyncify.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/asyncify.py')
-rw-r--r--.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/asyncify.py116
1 files changed, 116 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/asyncify.py b/.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/asyncify.py
new file mode 100644
index 00000000..8d56a1bb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/litellm/litellm_core_utils/asyncify.py
@@ -0,0 +1,116 @@
+import asyncio
+import functools
+from typing import Awaitable, Callable, Optional
+
+import anyio
+import anyio.to_thread
+from typing_extensions import ParamSpec, TypeVar
+
+T_ParamSpec = ParamSpec("T_ParamSpec")
+T_Retval = TypeVar("T_Retval")
+
+
+def function_has_argument(function: Callable, arg_name: str) -> bool:
+    """Helper function to check if a function has a specific argument."""
+    import inspect
+
+    signature = inspect.signature(function)
+    return arg_name in signature.parameters
+
+
+def asyncify(
+    function: Callable[T_ParamSpec, T_Retval],
+    *,
+    cancellable: bool = False,
+    limiter: Optional[anyio.CapacityLimiter] = None,
+) -> Callable[T_ParamSpec, Awaitable[T_Retval]]:
+    """
+    Take a blocking function and create an async one that receives the same
+    positional and keyword arguments, and that when called, calls the original function
+    in a worker thread using `anyio.to_thread.run_sync()`.
+
+    If the `cancellable` option is enabled and the task waiting for its completion is
+    cancelled, the thread will still run its course but its return value (or any raised
+    exception) will be ignored.
+
+    ## Arguments
+    - `function`: a blocking regular callable (e.g. a function)
+    - `cancellable`: `True` to allow cancellation of the operation
+    - `limiter`: capacity limiter to use to limit the total amount of threads running
+        (if omitted, the default limiter is used)
+
+    ## Return
+    An async function that takes the same positional and keyword arguments as the
+    original one, that when called runs the same original function in a thread worker
+    and returns the result.
+    """
+
+    async def wrapper(
+        *args: T_ParamSpec.args, **kwargs: T_ParamSpec.kwargs
+    ) -> T_Retval:
+        partial_f = functools.partial(function, *args, **kwargs)
+
+        # In `v4.1.0` anyio added the `abandon_on_cancel` argument and deprecated the old
+        # `cancellable` argument, so we need to use the new `abandon_on_cancel` to avoid
+        # surfacing deprecation warnings.
+        if function_has_argument(anyio.to_thread.run_sync, "abandon_on_cancel"):
+            return await anyio.to_thread.run_sync(
+                partial_f,
+                abandon_on_cancel=cancellable,
+                limiter=limiter,
+            )
+
+        return await anyio.to_thread.run_sync(
+            partial_f,
+            cancellable=cancellable,
+            limiter=limiter,
+        )
+
+    return wrapper
+
+
+def run_async_function(async_function, *args, **kwargs):
+    """
+    Helper utility to run an async function in a sync context.
+    Handles the case where there is an existing event loop running.
+
+    Args:
+        async_function (Callable): The async function to run
+        *args: Positional arguments to pass to the async function
+        **kwargs: Keyword arguments to pass to the async function
+
+    Returns:
+        The result of the async function execution
+
+    Example:
+        ```python
+        async def my_async_func(x, y):
+            return x + y
+
+        result = run_async_function(my_async_func, 1, 2)
+        ```
+    """
+    from concurrent.futures import ThreadPoolExecutor
+
+    def run_in_new_loop():
+        """Run the coroutine in a new event loop within this thread."""
+        new_loop = asyncio.new_event_loop()
+        try:
+            asyncio.set_event_loop(new_loop)
+            return new_loop.run_until_complete(async_function(*args, **kwargs))
+        finally:
+            new_loop.close()
+            asyncio.set_event_loop(None)
+
+    try:
+        # First, try to get the current event loop
+        _ = asyncio.get_running_loop()
+        # If we're already in an event loop, run in a separate thread
+        # to avoid nested event loop issues
+        with ThreadPoolExecutor(max_workers=1) as executor:
+            future = executor.submit(run_in_new_loop)
+            return future.result()
+
+    except RuntimeError:
+        # No running event loop, we can safely run in this thread
+        return run_in_new_loop()