aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/realtime/_async/presence.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/realtime/_async/presence.py')
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/_async/presence.py238
1 files changed, 238 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/presence.py b/.venv/lib/python3.12/site-packages/realtime/_async/presence.py
new file mode 100644
index 00000000..4063db4a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/_async/presence.py
@@ -0,0 +1,238 @@
+"""
+ Defines the RealtimePresence class and its dependencies.
+"""
+
+import logging
+from typing import Any, Callable, Dict, List, Optional, Union
+
+from ..types import (
+ PresenceDiff,
+ PresenceEvents,
+ PresenceOnJoinCallback,
+ PresenceOnLeaveCallback,
+ PresenceOpts,
+ RawPresenceDiff,
+ RawPresenceState,
+ RealtimePresenceState,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncRealtimePresence:
+ def __init__(self, channel, opts: Optional[PresenceOpts] = None):
+ self.channel = channel
+ self.state: RealtimePresenceState = {}
+ self.pending_diffs: List[RawPresenceDiff] = []
+ self.join_ref: Optional[str] = None
+ self.caller = {
+ "onJoin": lambda *args: None,
+ "onLeave": lambda *args: None,
+ "onSync": lambda: None,
+ "onAuthSuccess": lambda: None,
+ "onAuthFailure": lambda: None,
+ }
+ # Initialize with default events if not provided
+ events = (
+ opts.events
+ if opts
+ else PresenceEvents(state="presence_state", diff="presence_diff")
+ )
+ # Set up event listeners for presence state and diff
+ self.channel._on(events.state, callback=self._on_state_event)
+ self.channel._on(events.diff, callback=self._on_diff_event)
+ self.channel._on("phx_auth", callback=self._on_auth_event)
+
+ def on_join(self, callback: PresenceOnJoinCallback):
+ self.caller["onJoin"] = callback
+
+ def on_leave(self, callback: PresenceOnLeaveCallback):
+ self.caller["onLeave"] = callback
+
+ def on_sync(self, callback: Callable[[], None]):
+ self.caller["onSync"] = callback
+
+ def on_auth_success(self, callback: Callable[[], None]):
+ self.caller["onAuthSuccess"] = callback
+
+ def on_auth_failure(self, callback: Callable[[], None]):
+ self.caller["onAuthFailure"] = callback
+
+ def _on_state_event(self, payload: RawPresenceState, *args):
+ onJoin = self.caller["onJoin"]
+ onLeave = self.caller["onLeave"]
+ onSync = self.caller["onSync"]
+
+ self.join_ref = self.channel.join_ref
+ self.state = self._sync_state(self.state, payload, onJoin, onLeave)
+
+ for diff in self.pending_diffs:
+ self.state = self._sync_diff(self.state, diff, onJoin, onLeave)
+ self.pending_diffs = []
+ onSync()
+
+ def _on_diff_event(self, payload: Dict[str, Any], *args):
+ onJoin = self.caller["onJoin"]
+ onLeave = self.caller["onLeave"]
+ onSync = self.caller["onSync"]
+
+ if self.in_pending_sync_state():
+ self.pending_diffs.append(payload)
+ else:
+ self.state = self._sync_diff(self.state, payload, onJoin, onLeave)
+ onSync()
+
+ def _on_auth_event(self, payload: Dict[str, Any], *args):
+ if payload.get("status") == "ok":
+ self.caller["onAuthSuccess"]()
+ else:
+ self.caller["onAuthFailure"]()
+
+ def _sync_state(
+ self,
+ current_state: RealtimePresenceState,
+ new_state: Union[RawPresenceState, RealtimePresenceState],
+ onJoin: PresenceOnJoinCallback,
+ onLeave: PresenceOnLeaveCallback,
+ ) -> RealtimePresenceState:
+ state = {key: list(value) for key, value in current_state.items()}
+ transformed_state = AsyncRealtimePresence._transform_state(new_state)
+
+ joins: Dict[str, Any] = {}
+ leaves: Dict[str, Any] = {
+ k: v for k, v in state.items() if k not in transformed_state
+ }
+
+ for key, value in transformed_state.items():
+ current_presences = state.get(key, [])
+
+ if len(current_presences) > 0:
+ new_presence_refs = {presence.get("presence_ref") for presence in value}
+ cur_presence_refs = {
+ presence.get("presence_ref") for presence in current_presences
+ }
+
+ joined_presences = [
+ p for p in value if p.get("presence_ref") not in cur_presence_refs
+ ]
+ left_presences = [
+ p
+ for p in current_presences
+ if p.get("presence_ref") not in new_presence_refs
+ ]
+
+ if joined_presences:
+ joins[key] = joined_presences
+ if left_presences:
+ leaves[key] = left_presences
+ else:
+ joins[key] = value
+
+ return self._sync_diff(
+ state, {"joins": joins, "leaves": leaves}, onJoin, onLeave
+ )
+
+ def _sync_diff(
+ self,
+ state: RealtimePresenceState,
+ diff: Union[RawPresenceDiff, PresenceDiff],
+ onJoin: PresenceOnJoinCallback,
+ onLeave: PresenceOnLeaveCallback,
+ ) -> RealtimePresenceState:
+ joins = AsyncRealtimePresence._transform_state(diff.get("joins", {}))
+ leaves = AsyncRealtimePresence._transform_state(diff.get("leaves", {}))
+
+ for key, new_presences in joins.items():
+ current_presences = state.get(key, [])
+ state[key] = new_presences
+
+ if len(current_presences) > 0:
+ joined_presence_refs = {
+ presence.get("presence_ref") for presence in state.get(key)
+ }
+ cur_presences = list(
+ presence
+ for presence in current_presences
+ if presence.get("presence_ref") not in joined_presence_refs
+ )
+ state[key] = cur_presences + state[key]
+
+ onJoin(key, current_presences, new_presences)
+
+ for key, left_presences in leaves.items():
+ current_presences = state.get(key, [])
+
+ if len(current_presences) == 0:
+ break
+
+ presence_refs_to_remove = {
+ presence.get("presence_ref") for presence in left_presences
+ }
+ current_presences = [
+ presence
+ for presence in current_presences
+ if presence.get("presence_ref") not in presence_refs_to_remove
+ ]
+ state[key] = current_presences
+
+ onLeave(key, current_presences, left_presences)
+
+ if len(current_presences) == 0:
+ del state[key]
+
+ return state
+
+ def in_pending_sync_state(self) -> bool:
+ return self.join_ref is None or self.join_ref != self.channel.join_ref
+
+ @staticmethod
+ def _transform_state(
+ state: Union[RawPresenceState, RealtimePresenceState]
+ ) -> RealtimePresenceState:
+ """
+ Transform the raw presence state into a standardized RealtimePresenceState format.
+
+ This method processes the input state, which can be either a RawPresenceState or
+ an already transformed RealtimePresenceState. It handles the conversion of the
+ Phoenix channel's presence format to our internal representation.
+
+ Args:
+ state (Union[RawPresenceState, RealtimePresenceState[T]]): The presence state to transform.
+
+ Returns:
+ RealtimePresenceState[T]: The transformed presence state.
+
+ Example:
+ Input (RawPresenceState):
+ {
+ "user1": {
+ "metas": [
+ {"phx_ref": "ABC123", "user_id": "user1", "status": "online"},
+ {"phx_ref": "DEF456", "phx_ref_prev": "ABC123", "user_id": "user1", "status": "away"}
+ ]
+ },
+ "user2": [{"user_id": "user2", "status": "offline"}]
+ }
+
+ Output (RealtimePresenceState):
+ {
+ "user1": [
+ {"presence_ref": "ABC123", "user_id": "user1", "status": "online"},
+ {"presence_ref": "DEF456", "user_id": "user1", "status": "away"}
+ ],
+ "user2": [{"user_id": "user2", "status": "offline"}]
+ }
+ """
+ new_state: RealtimePresenceState = {}
+ for key, presences in state.items():
+ if isinstance(presences, dict) and "metas" in presences:
+ new_state[key] = []
+
+ for presence in presences["metas"]:
+ presence["presence_ref"] = presence.pop("phx_ref", None)
+ presence.pop("phx_ref_prev", None)
+ new_state[key].append(presence)
+
+ else:
+ new_state[key] = presences
+ return new_state