1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
from enum import Enum
from typing import Any, Callable, Dict, List, Literal, Optional, TypedDict, TypeVar
from typing_extensions import ParamSpec
# Constants
DEFAULT_TIMEOUT = 10
PHOENIX_CHANNEL = "phoenix"
VSN = "1.0.0"
DEFAULT_HEARTBEAT_INTERVAL = 25
# Type variables and custom types
T_ParamSpec = ParamSpec("T_ParamSpec")
T_Retval = TypeVar("T_Retval")
Callback = Callable[T_ParamSpec, T_Retval]
# Enums
class ChannelEvents(str, Enum):
"""
ChannelEvents are a bunch of constant strings that are defined according to
what the Phoenix realtime server expects.
"""
close = "phx_close"
error = "phx_error"
join = "phx_join"
reply = "phx_reply"
leave = "phx_leave"
heartbeat = "heartbeat"
access_token = "access_token"
broadcast = "broadcast"
presence = "presence"
class ChannelStates(str, Enum):
JOINED = "joined"
CLOSED = "closed"
ERRORED = "errored"
JOINING = "joining"
LEAVING = "leaving"
class RealtimeSubscribeStates(str, Enum):
SUBSCRIBED = "SUBSCRIBED"
TIMED_OUT = "TIMED_OUT"
CLOSED = "CLOSED"
CHANNEL_ERROR = "CHANNEL_ERROR"
class RealtimePresenceListenEvents(str, Enum):
SYNC = "SYNC"
JOIN = "JOIN"
LEAVE = "LEAVE"
# Literals
RealtimePostgresChangesListenEvent = Literal["*", "INSERT", "UPDATE", "DELETE"]
# Classes
class Binding:
def __init__(
self,
type: str,
filter: Dict[str, Any],
callback: Callback,
id: Optional[str] = None,
):
self.type = type
self.filter = filter
self.callback = callback
self.id = id
class _Hook:
def __init__(self, status: str, callback: Callback):
self.status = status
self.callback = callback
class Presence(Dict[str, Any]):
presence_ref: str
class PresenceEvents:
def __init__(self, state: str, diff: str):
self.state = state
self.diff = diff
class PresenceOpts:
def __init__(self, events: PresenceEvents):
self.events = events
# TypedDicts
class RealtimeChannelBroadcastConfig(TypedDict):
ack: bool
self: bool
class RealtimeChannelPresenceConfig(TypedDict):
key: str
class RealtimeChannelConfig(TypedDict):
broadcast: RealtimeChannelBroadcastConfig
presence: RealtimeChannelPresenceConfig
private: bool
class RealtimeChannelOptions(TypedDict):
config: RealtimeChannelConfig
class PresenceMeta(TypedDict, total=False):
phx_ref: str
phx_ref_prev: str
class RawPresenceStateEntry(TypedDict):
metas: List[PresenceMeta]
# Custom types
PresenceOnJoinCallback = Callable[[str, List[Any], List[Any]], None]
PresenceOnLeaveCallback = Callable[[str, List[Any], List[Any]], None]
RealtimePresenceState = Dict[str, List[Presence]]
RawPresenceState = Dict[str, RawPresenceStateEntry]
class RawPresenceDiff(TypedDict):
joins: RawPresenceState
leaves: RawPresenceState
class PresenceDiff(TypedDict):
joins: RealtimePresenceState
leaves: RealtimePresenceState
# Specific payload types
class RealtimePresenceJoinPayload(Dict[str, Any]):
event: Literal[RealtimePresenceListenEvents.JOIN]
key: str
current_presences: List[Presence]
new_presences: List[Presence]
class RealtimePresenceLeavePayload(Dict[str, Any]):
event: Literal[RealtimePresenceListenEvents.LEAVE]
key: str
current_presences: List[Presence]
left_presences: List[Presence]
|