From 4a52a71956a8d46fcb7294ac71734504bb09bcc2 Mon Sep 17 00:00:00 2001 From: S. Solomon Darnell Date: Fri, 28 Mar 2025 21:52:21 -0500 Subject: two version of R2R are here --- .../site-packages/hatchet_sdk/workflow_run.py | 59 ++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 .venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py') diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py new file mode 100644 index 00000000..064f6741 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py @@ -0,0 +1,59 @@ +import asyncio +from typing import Any, Coroutine, Generic, Optional, TypedDict, TypeVar + +from hatchet_sdk.clients.run_event_listener import ( + RunEventListener, + RunEventListenerClient, +) +from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener +from hatchet_sdk.utils.aio_utils import EventLoopThread, get_active_event_loop + + +class WorkflowRunRef: + workflow_run_id: str + + def __init__( + self, + workflow_run_id: str, + workflow_listener: PooledWorkflowRunListener, + workflow_run_event_listener: RunEventListenerClient, + ): + self.workflow_run_id = workflow_run_id + self.workflow_listener = workflow_listener + self.workflow_run_event_listener = workflow_run_event_listener + + def __str__(self): + return self.workflow_run_id + + def stream(self) -> RunEventListener: + return self.workflow_run_event_listener.stream(self.workflow_run_id) + + def result(self) -> Coroutine: + return self.workflow_listener.result(self.workflow_run_id) + + def sync_result(self) -> dict: + coro = self.workflow_listener.result(self.workflow_run_id) + loop = get_active_event_loop() + + if loop is None: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(coro) + finally: + asyncio.set_event_loop(None) + else: + return loop.run_until_complete(coro) + + +T = TypeVar("T") + + +class RunRef(WorkflowRunRef, Generic[T]): + async def result(self) -> T: + res = await self.workflow_listener.result(self.workflow_run_id) + + if len(res) == 1: + return list(res.values())[0] + + return res -- cgit v1.2.3