aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/realtime/_async/push.py
blob: 8e7a68f06f282d0ec5fb352310a8014ef9dea1a9 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from ..types import DEFAULT_TIMEOUT, Callback, _Hook

if TYPE_CHECKING:
    from .channel import AsyncRealtimeChannel

logger = logging.getLogger(__name__)


class AsyncPush:
    def __init__(
        self,
        channel: "AsyncRealtimeChannel",
        event: str,
        payload: Optional[Dict[str, Any]] = None,
        timeout: int = DEFAULT_TIMEOUT,
    ):
        self.channel = channel
        self.event = event
        self.payload = payload or {}
        self.timeout = timeout
        self.rec_hooks: List[_Hook] = []
        self.ref: Optional[str] = None
        self.ref_event: Optional[str] = None
        self.received_resp: Optional[Dict[str, Any]] = None
        self.sent = False
        self.timeout_task: Optional[asyncio.Task] = None

    async def resend(self):
        self._cancel_ref_event()
        self.ref = ""
        self.ref_event = None
        self.received_resp = None
        self.sent = False
        await self.send()

    async def send(self):
        if self._has_received("timeout"):
            return

        self.start_timeout()
        self.sent = True

        try:
            await self.channel.socket.send(
                {
                    "topic": self.channel.topic,
                    "event": self.event,
                    "payload": self.payload,
                    "ref": self.ref,
                    "join_ref": self.channel.join_push.ref,
                }
            )
        except Exception as e:
            logger.error(f"send push failed: {e}")

    def update_payload(self, payload: Dict[str, Any]):
        self.payload = {**self.payload, **payload}

    def receive(self, status: str, callback: Callback) -> "AsyncPush":
        if self._has_received(status):
            callback(self.received_resp.get("response", {}))

        self.rec_hooks.append(_Hook(status, callback))
        return self

    def start_timeout(self):
        if self.timeout_task:
            return

        self.ref = self.channel.socket._make_ref()
        self.ref_event = self.channel._reply_event_name(self.ref)

        def on_reply(payload, *args):
            self._cancel_ref_event()
            self._cancel_timeout()
            self.received_resp = payload
            self._match_receive(**self.received_resp)

        self.channel._on(self.ref_event, on_reply)

        async def timeout(self):
            await asyncio.sleep(self.timeout)
            self.trigger("timeout", {})

        self.timeout_task = asyncio.create_task(timeout(self))

    def trigger(self, status: str, response: Any):
        if self.ref_event:
            payload = {
                "status": status,
                "response": response,
            }
            self.channel._trigger(self.ref_event, payload)

    def destroy(self):
        self._cancel_ref_event()
        self._cancel_timeout()

    def _cancel_ref_event(self):
        if not self.ref_event:
            return

        self.channel._off(self.ref_event, {})

    def _cancel_timeout(self):
        if not self.timeout_task:
            return

        self.timeout_task.cancel()
        self.timeout_task = None

    def _match_receive(self, status: str, response: Any):
        for hook in self.rec_hooks:
            if hook.status == status:
                hook.callback(response)

    def _has_received(self, status: str):
        return self.received_resp and self.received_resp.get("status") == status