1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
import asyncio
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional
from ..types import DEFAULT_TIMEOUT, Callback, _Hook
if TYPE_CHECKING:
from .channel import AsyncRealtimeChannel
logger = logging.getLogger(__name__)
class AsyncPush:
def __init__(
self,
channel: "AsyncRealtimeChannel",
event: str,
payload: Optional[Dict[str, Any]] = None,
timeout: int = DEFAULT_TIMEOUT,
):
self.channel = channel
self.event = event
self.payload = payload or {}
self.timeout = timeout
self.rec_hooks: List[_Hook] = []
self.ref: Optional[str] = None
self.ref_event: Optional[str] = None
self.received_resp: Optional[Dict[str, Any]] = None
self.sent = False
self.timeout_task: Optional[asyncio.Task] = None
async def resend(self):
self._cancel_ref_event()
self.ref = ""
self.ref_event = None
self.received_resp = None
self.sent = False
await self.send()
async def send(self):
if self._has_received("timeout"):
return
self.start_timeout()
self.sent = True
try:
await self.channel.socket.send(
{
"topic": self.channel.topic,
"event": self.event,
"payload": self.payload,
"ref": self.ref,
"join_ref": self.channel.join_push.ref,
}
)
except Exception as e:
logger.error(f"send push failed: {e}")
def update_payload(self, payload: Dict[str, Any]):
self.payload = {**self.payload, **payload}
def receive(self, status: str, callback: Callback) -> "AsyncPush":
if self._has_received(status):
callback(self.received_resp.get("response", {}))
self.rec_hooks.append(_Hook(status, callback))
return self
def start_timeout(self):
if self.timeout_task:
return
self.ref = self.channel.socket._make_ref()
self.ref_event = self.channel._reply_event_name(self.ref)
def on_reply(payload, *args):
self._cancel_ref_event()
self._cancel_timeout()
self.received_resp = payload
self._match_receive(**self.received_resp)
self.channel._on(self.ref_event, on_reply)
async def timeout(self):
await asyncio.sleep(self.timeout)
self.trigger("timeout", {})
self.timeout_task = asyncio.create_task(timeout(self))
def trigger(self, status: str, response: Any):
if self.ref_event:
payload = {
"status": status,
"response": response,
}
self.channel._trigger(self.ref_event, payload)
def destroy(self):
self._cancel_ref_event()
self._cancel_timeout()
def _cancel_ref_event(self):
if not self.ref_event:
return
self.channel._off(self.ref_event, {})
def _cancel_timeout(self):
if not self.timeout_task:
return
self.timeout_task.cancel()
self.timeout_task = None
def _match_receive(self, status: str, response: Any):
for hook in self.rec_hooks:
if hook.status == status:
hook.callback(response)
def _has_received(self, status: str):
return self.received_resp and self.received_resp.get("status") == status
|