diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py new file mode 100644 index 00000000..064f6741 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py @@ -0,0 +1,59 @@ +import asyncio +from typing import Any, Coroutine, Generic, Optional, TypedDict, TypeVar + +from hatchet_sdk.clients.run_event_listener import ( + RunEventListener, + RunEventListenerClient, +) +from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener +from hatchet_sdk.utils.aio_utils import EventLoopThread, get_active_event_loop + + +class WorkflowRunRef: + workflow_run_id: str + + def __init__( + self, + workflow_run_id: str, + workflow_listener: PooledWorkflowRunListener, + workflow_run_event_listener: RunEventListenerClient, + ): + self.workflow_run_id = workflow_run_id + self.workflow_listener = workflow_listener + self.workflow_run_event_listener = workflow_run_event_listener + + def __str__(self): + return self.workflow_run_id + + def stream(self) -> RunEventListener: + return self.workflow_run_event_listener.stream(self.workflow_run_id) + + def result(self) -> Coroutine: + return self.workflow_listener.result(self.workflow_run_id) + + def sync_result(self) -> dict: + coro = self.workflow_listener.result(self.workflow_run_id) + loop = get_active_event_loop() + + if loop is None: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete(coro) + finally: + asyncio.set_event_loop(None) + else: + return loop.run_until_complete(coro) + + +T = TypeVar("T") + + +class RunRef(WorkflowRunRef, Generic[T]): + async def result(self) -> T: + res = await self.workflow_listener.result(self.workflow_run_id) + + if len(res) == 1: + return list(res.values())[0] + + return res |