diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/utils')
5 files changed, 184 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/aio_utils.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/aio_utils.py new file mode 100644 index 00000000..459205f1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/aio_utils.py @@ -0,0 +1,137 @@ +import asyncio +import inspect +from concurrent.futures import Executor +from functools import partial, wraps +from threading import Thread +from typing import Any + + +## TODO: Stricter typing here +def sync_to_async(func: Any) -> Any: + """ + A decorator to run a synchronous function or coroutine in an asynchronous context with added + asyncio loop safety. + + This decorator allows you to safely call synchronous functions or coroutines from an + asynchronous function by running them in an executor. + + Args: + func (callable): The synchronous function or coroutine to be run asynchronously. + + Returns: + callable: An asynchronous wrapper function that runs the given function in an executor. + + Example: + @sync_to_async + def sync_function(x, y): + return x + y + + @sync_to_async + async def async_function(x, y): + return x + y + + + def undecorated_function(x, y): + return x + y + + async def main(): + result1 = await sync_function(1, 2) + result2 = await async_function(3, 4) + result3 = await sync_to_async(undecorated_function)(5, 6) + print(result1, result2, result3) + + asyncio.run(main()) + """ + + ## TODO: Stricter typing here + @wraps(func) + async def run( + *args: Any, + loop: asyncio.AbstractEventLoop | None = None, + executor: Executor | None = None, + **kwargs: Any + ) -> Any: + """ + The asynchronous wrapper function that runs the given function in an executor. + + Args: + *args: Positional arguments to pass to the function. + loop (asyncio.AbstractEventLoop, optional): The event loop to use. If None, the current running loop is used. + executor (concurrent.futures.Executor, optional): The executor to use. If None, the default executor is used. + **kwargs: Keyword arguments to pass to the function. + + Returns: + The result of the function call. + """ + if loop is None: + loop = asyncio.get_running_loop() + + if inspect.iscoroutinefunction(func): + # Wrap the coroutine to run it in an executor + async def wrapper() -> Any: + return await func(*args, **kwargs) + + pfunc = partial(asyncio.run, wrapper()) + return await loop.run_in_executor(executor, pfunc) + else: + # Run the synchronous function in an executor + pfunc = partial(func, *args, **kwargs) + return await loop.run_in_executor(executor, pfunc) + + return run + + +class EventLoopThread: + """A class that manages an asyncio event loop running in a separate thread.""" + + def __init__(self) -> None: + """ + Initializes the EventLoopThread by creating an event loop + and setting up a thread to run the loop. + """ + self.loop = asyncio.new_event_loop() + self.thread = Thread(target=self.run_loop_in_thread, args=(self.loop,)) + + def __enter__(self, *a, **kw) -> asyncio.AbstractEventLoop: + """ + Starts the thread running the event loop when entering the context. + + Returns: + asyncio.AbstractEventLoop: The event loop running in the separate thread. + """ + self.thread.start() + return self.loop + + def __exit__(self, *a, **kw) -> None: + """ + Stops the event loop and joins the thread when exiting the context. + """ + self.loop.call_soon_threadsafe(self.loop.stop) + self.thread.join() + + def run_loop_in_thread(self, loop: asyncio.AbstractEventLoop) -> None: + """ + Sets the event loop for the current thread and runs it forever. + + Args: + loop (asyncio.AbstractEventLoop): The event loop to run. + """ + asyncio.set_event_loop(loop) + loop.run_forever() + + +def get_active_event_loop() -> asyncio.AbstractEventLoop | None: + """ + Get the active event loop. + + Returns: + asyncio.AbstractEventLoop: The active event loop, or None if there is no active + event loop in the current thread. + """ + try: + return asyncio.get_event_loop() + except RuntimeError as e: + if str(e).startswith("There is no current event loop in thread"): + return None + else: + raise e diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/backoff.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/backoff.py new file mode 100644 index 00000000..34ddac7f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/backoff.py @@ -0,0 +1,9 @@ +import asyncio +import random + + +async def exp_backoff_sleep(attempt: int, max_sleep_time: float = 5) -> None: + base_time = 0.1 # starting sleep time in seconds (100 milliseconds) + jitter = random.uniform(0, base_time) # add random jitter + sleep_time = min(base_time * (2**attempt) + jitter, max_sleep_time) + await asyncio.sleep(sleep_time) diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/serialization.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/serialization.py new file mode 100644 index 00000000..7eb1d13a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/serialization.py @@ -0,0 +1,18 @@ +from typing import Any + + +def flatten(xs: dict[str, Any], parent_key: str, separator: str) -> dict[str, Any]: + if not xs: + return {} + + items: list[tuple[str, Any]] = [] + + for k, v in xs.items(): + new_key = parent_key + separator + k if parent_key else k + + if isinstance(v, dict): + items.extend(flatten(v, new_key, separator).items()) + else: + items.append((new_key, v)) + + return dict(items) diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/types.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/types.py new file mode 100644 index 00000000..30e469f7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/types.py @@ -0,0 +1,8 @@ +from typing import Type + +from pydantic import BaseModel + + +class WorkflowValidator(BaseModel): + workflow_input: Type[BaseModel] | None = None + step_output: Type[BaseModel] | None = None diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/typing.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/typing.py new file mode 100644 index 00000000..db111db5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/utils/typing.py @@ -0,0 +1,12 @@ +from typing import Any, Type, TypeGuard, TypeVar + +from pydantic import BaseModel + +T = TypeVar("T", bound=BaseModel) + + +def is_basemodel_subclass(model: Any) -> bool: + try: + return issubclass(model, BaseModel) + except TypeError: + return False |