about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py')
-rw-r--r--.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py59
1 files changed, 59 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py b/.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py
new file mode 100644
index 00000000..064f6741
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/hatchet_sdk/workflow_run.py
@@ -0,0 +1,59 @@
+import asyncio
+from typing import Any, Coroutine, Generic, Optional, TypedDict, TypeVar
+
+from hatchet_sdk.clients.run_event_listener import (
+    RunEventListener,
+    RunEventListenerClient,
+)
+from hatchet_sdk.clients.workflow_listener import PooledWorkflowRunListener
+from hatchet_sdk.utils.aio_utils import EventLoopThread, get_active_event_loop
+
+
+class WorkflowRunRef:
+    workflow_run_id: str
+
+    def __init__(
+        self,
+        workflow_run_id: str,
+        workflow_listener: PooledWorkflowRunListener,
+        workflow_run_event_listener: RunEventListenerClient,
+    ):
+        self.workflow_run_id = workflow_run_id
+        self.workflow_listener = workflow_listener
+        self.workflow_run_event_listener = workflow_run_event_listener
+
+    def __str__(self):
+        return self.workflow_run_id
+
+    def stream(self) -> RunEventListener:
+        return self.workflow_run_event_listener.stream(self.workflow_run_id)
+
+    def result(self) -> Coroutine:
+        return self.workflow_listener.result(self.workflow_run_id)
+
+    def sync_result(self) -> dict:
+        coro = self.workflow_listener.result(self.workflow_run_id)
+        loop = get_active_event_loop()
+
+        if loop is None:
+            loop = asyncio.new_event_loop()
+            asyncio.set_event_loop(loop)
+            try:
+                return loop.run_until_complete(coro)
+            finally:
+                asyncio.set_event_loop(None)
+        else:
+            return loop.run_until_complete(coro)
+
+
+T = TypeVar("T")
+
+
+class RunRef(WorkflowRunRef, Generic[T]):
+    async def result(self) -> T:
+        res = await self.workflow_listener.result(self.workflow_run_id)
+
+        if len(res) == 1:
+            return list(res.values())[0]
+
+        return res