about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/realtime/_async/push.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/realtime/_async/push.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/realtime/_async/push.py')
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/_async/push.py122
1 files changed, 122 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/push.py b/.venv/lib/python3.12/site-packages/realtime/_async/push.py
new file mode 100644
index 00000000..8e7a68f0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/_async/push.py
@@ -0,0 +1,122 @@
+import asyncio
+import logging
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
+
+from ..types import DEFAULT_TIMEOUT, Callback, _Hook
+
+if TYPE_CHECKING:
+    from .channel import AsyncRealtimeChannel
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncPush:
+    def __init__(
+        self,
+        channel: "AsyncRealtimeChannel",
+        event: str,
+        payload: Optional[Dict[str, Any]] = None,
+        timeout: int = DEFAULT_TIMEOUT,
+    ):
+        self.channel = channel
+        self.event = event
+        self.payload = payload or {}
+        self.timeout = timeout
+        self.rec_hooks: List[_Hook] = []
+        self.ref: Optional[str] = None
+        self.ref_event: Optional[str] = None
+        self.received_resp: Optional[Dict[str, Any]] = None
+        self.sent = False
+        self.timeout_task: Optional[asyncio.Task] = None
+
+    async def resend(self):
+        self._cancel_ref_event()
+        self.ref = ""
+        self.ref_event = None
+        self.received_resp = None
+        self.sent = False
+        await self.send()
+
+    async def send(self):
+        if self._has_received("timeout"):
+            return
+
+        self.start_timeout()
+        self.sent = True
+
+        try:
+            await self.channel.socket.send(
+                {
+                    "topic": self.channel.topic,
+                    "event": self.event,
+                    "payload": self.payload,
+                    "ref": self.ref,
+                    "join_ref": self.channel.join_push.ref,
+                }
+            )
+        except Exception as e:
+            logger.error(f"send push failed: {e}")
+
+    def update_payload(self, payload: Dict[str, Any]):
+        self.payload = {**self.payload, **payload}
+
+    def receive(self, status: str, callback: Callback) -> "AsyncPush":
+        if self._has_received(status):
+            callback(self.received_resp.get("response", {}))
+
+        self.rec_hooks.append(_Hook(status, callback))
+        return self
+
+    def start_timeout(self):
+        if self.timeout_task:
+            return
+
+        self.ref = self.channel.socket._make_ref()
+        self.ref_event = self.channel._reply_event_name(self.ref)
+
+        def on_reply(payload, *args):
+            self._cancel_ref_event()
+            self._cancel_timeout()
+            self.received_resp = payload
+            self._match_receive(**self.received_resp)
+
+        self.channel._on(self.ref_event, on_reply)
+
+        async def timeout(self):
+            await asyncio.sleep(self.timeout)
+            self.trigger("timeout", {})
+
+        self.timeout_task = asyncio.create_task(timeout(self))
+
+    def trigger(self, status: str, response: Any):
+        if self.ref_event:
+            payload = {
+                "status": status,
+                "response": response,
+            }
+            self.channel._trigger(self.ref_event, payload)
+
+    def destroy(self):
+        self._cancel_ref_event()
+        self._cancel_timeout()
+
+    def _cancel_ref_event(self):
+        if not self.ref_event:
+            return
+
+        self.channel._off(self.ref_event, {})
+
+    def _cancel_timeout(self):
+        if not self.timeout_task:
+            return
+
+        self.timeout_task.cancel()
+        self.timeout_task = None
+
+    def _match_receive(self, status: str, response: Any):
+        for hook in self.rec_hooks:
+            if hook.status == status:
+                hook.callback(response)
+
+    def _has_received(self, status: str):
+        return self.received_resp and self.received_resp.get("status") == status