diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/realtime')
17 files changed, 1711 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/realtime/__init__.py b/.venv/lib/python3.12/site-packages/realtime/__init__.py new file mode 100644 index 00000000..a7749e74 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/__init__.py @@ -0,0 +1,17 @@ +import logging + +# Configure the root logger for the module +logging.getLogger(__name__).addHandler(logging.NullHandler()) + +from realtime.version import __version__ + +from ._async.channel import AsyncRealtimeChannel +from ._async.client import AsyncRealtimeClient +from ._async.presence import AsyncRealtimePresence +from ._sync.channel import SyncRealtimeChannel +from ._sync.client import SyncRealtimeClient +from ._sync.presence import SyncRealtimePresence +from .exceptions import * +from .message import * +from .transformers import * +from .types import * diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/__init__.py b/.venv/lib/python3.12/site-packages/realtime/_async/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/_async/__init__.py diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/channel.py b/.venv/lib/python3.12/site-packages/realtime/_async/channel.py new file mode 100644 index 00000000..0050e5e2 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/_async/channel.py @@ -0,0 +1,565 @@ +from __future__ import annotations + +import asyncio +import json +import logging +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional + +from realtime.types import ( + Binding, + Callback, + ChannelEvents, + ChannelStates, + RealtimeChannelOptions, + RealtimePostgresChangesListenEvent, + RealtimePresenceState, + RealtimeSubscribeStates, +) + +from ..transformers import http_endpoint_url +from .presence import ( + AsyncRealtimePresence, + PresenceOnJoinCallback, + PresenceOnLeaveCallback, +) +from .push import AsyncPush +from .timer import AsyncTimer + +if TYPE_CHECKING: + from .client import AsyncRealtimeClient + +logger = logging.getLogger(__name__) + + +class AsyncRealtimeChannel: + """ + Channel is an abstraction for a topic subscription on an existing socket connection. + Each Channel has its own topic and a list of event-callbacks that respond to messages. + Should only be instantiated through `AsyncRealtimeClient.channel(topic)`. + """ + + def __init__( + self, + socket: AsyncRealtimeClient, + topic: str, + params: Optional[RealtimeChannelOptions] = None, + ) -> None: + """ + Initialize the Channel object. + + :param socket: RealtimeClient object + :param topic: Topic that it subscribes to on the realtime server + :param params: Optional parameters for connection. + """ + self.socket = socket + self.params = params or {} + if self.params.get("config") is None: + self.params["config"] = { + "broadcast": {"ack": False, "self": False}, + "presence": {"key": ""}, + "private": False, + } + + self.topic = topic + self._joined_once = False + self.bindings: Dict[str, List[Binding]] = {} + self.presence = AsyncRealtimePresence(self) + self.state = ChannelStates.CLOSED + self._push_buffer: List[AsyncPush] = [] + self.timeout = self.socket.timeout + + self.join_push = AsyncPush(self, ChannelEvents.join, self.params) + self.rejoin_timer = AsyncTimer( + self._rejoin_until_connected, lambda tries: 2**tries + ) + + self.broadcast_endpoint_url = self._broadcast_endpoint_url() + + def on_join_push_ok(payload: Dict[str, Any], *args): + self.state = ChannelStates.JOINED + self.rejoin_timer.reset() + for push in self._push_buffer: + asyncio.create_task(push.send()) + self._push_buffer = [] + + def on_join_push_timeout(*args): + if not self.is_joining: + return + + logger.error(f"join push timeout for channel {self.topic}") + self.state = ChannelStates.ERRORED + self.rejoin_timer.schedule_timeout() + + self.join_push.receive("ok", on_join_push_ok).receive( + "timeout", on_join_push_timeout + ) + + def on_close(*args): + logger.info(f"channel {self.topic} closed") + self.rejoin_timer.reset() + self.state = ChannelStates.CLOSED + self.socket._remove_channel(self) + + def on_error(payload, *args): + if self.is_leaving or self.is_closed: + return + + logger.info(f"channel {self.topic} error: {payload}") + self.state = ChannelStates.ERRORED + self.rejoin_timer.schedule_timeout() + + self._on("close", on_close) + self._on("error", on_error) + + def on_reply(payload, ref): + self._trigger(self._reply_event_name(ref), payload) + + self._on(ChannelEvents.reply, on_reply) + + # Properties + @property + def is_closed(self): + return self.state == ChannelStates.CLOSED + + @property + def is_joining(self): + return self.state == ChannelStates.JOINING + + @property + def is_leaving(self): + return self.state == ChannelStates.LEAVING + + @property + def is_errored(self): + return self.state == ChannelStates.ERRORED + + @property + def is_joined(self): + return self.state == ChannelStates.JOINED + + @property + def join_ref(self): + return self.join_push.ref + + # Core channel methods + async def subscribe( + self, + callback: Optional[ + Callable[[RealtimeSubscribeStates, Optional[Exception]], None] + ] = None, + ) -> AsyncRealtimeChannel: + """ + Subscribe to the channel. Can only be called once per channel instance. + + :param callback: Optional callback function that receives subscription state updates + and any errors that occur during subscription + :return: The Channel instance for method chaining + :raises: Exception if called multiple times on the same channel instance + """ + if not self.socket.is_connected: + await self.socket.connect() + + if self._joined_once: + raise Exception( + "Tried to subscribe multiple times. 'subscribe' can only be called a single time per channel instance" + ) + else: + config = self.params.get("config", {}) + broadcast = config.get("broadcast", {}) + presence = config.get("presence", {}) + private = config.get("private", False) + + access_token_payload = {} + config = { + "broadcast": broadcast, + "presence": presence, + "private": private, + "postgres_changes": list( + map(lambda x: x.filter, self.bindings.get("postgres_changes", [])) + ), + } + + if self.socket.access_token: + access_token_payload["access_token"] = self.socket.access_token + + self.join_push.update_payload( + {**{"config": config}, **access_token_payload} + ) + self._joined_once = True + + def on_join_push_ok(payload: Dict[str, Any]): + server_postgres_changes: List[Dict[str, Any]] = payload.get( + "postgres_changes", [] + ) + + if len(server_postgres_changes) == 0: + callback and callback(RealtimeSubscribeStates.SUBSCRIBED, None) + return + + client_postgres_changes = self.bindings.get("postgres_changes", []) + new_postgres_bindings = [] + + bindings_len = len(client_postgres_changes) + + for i in range(bindings_len): + client_binding = client_postgres_changes[i] + event = client_binding.filter.get("event") + schema = client_binding.filter.get("schema") + table = client_binding.filter.get("table") + filter = client_binding.filter.get("filter") + + server_binding = ( + server_postgres_changes[i] + if i < len(server_postgres_changes) + else None + ) + + if ( + server_binding + and server_binding.get("event") == event + and server_binding.get("schema") == schema + and server_binding.get("table") == table + and server_binding.get("filter") == filter + ): + client_binding.id = server_binding.get("id") + new_postgres_bindings.append(client_binding) + else: + asyncio.create_task(self.unsubscribe()) + callback and callback( + RealtimeSubscribeStates.CHANNEL_ERROR, + Exception( + "mismatch between server and client bindings for postgres changes" + ), + ) + return + + self.bindings["postgres_changes"] = new_postgres_bindings + callback and callback(RealtimeSubscribeStates.SUBSCRIBED, None) + + def on_join_push_error(payload: Dict[str, Any]): + callback and callback( + RealtimeSubscribeStates.CHANNEL_ERROR, + Exception(json.dumps(payload)), + ) + + def on_join_push_timeout(*args): + callback and callback(RealtimeSubscribeStates.TIMED_OUT, None) + + self.join_push.receive("ok", on_join_push_ok).receive( + "error", on_join_push_error + ).receive("timeout", on_join_push_timeout) + + await self._rejoin() + + return self + + async def unsubscribe(self): + """ + Unsubscribe from the channel and leave the topic. + Sets channel state to LEAVING and cleans up timers and pushes. + """ + self.state = ChannelStates.LEAVING + + self.rejoin_timer.reset() + self.join_push.destroy() + + def _close(*args): + logger.info(f"channel {self.topic} leave") + self._trigger(ChannelEvents.close, "leave") + + leave_push = AsyncPush(self, ChannelEvents.leave, {}) + leave_push.receive("ok", _close).receive("timeout", _close) + + await leave_push.send() + + if not self._can_push(): + leave_push.trigger("ok", {}) + + async def push( + self, event: str, payload: Dict[str, Any], timeout: Optional[int] = None + ) -> AsyncPush: + """ + Push a message to the channel. + + :param event: The event name to push + :param payload: The payload to send + :param timeout: Optional timeout in milliseconds + :return: AsyncPush instance representing the push operation + :raises: Exception if called before subscribing to the channel + """ + if not self._joined_once: + raise Exception( + f"tried to push '{event}' to '{self.topic}' before joining. Use channel.subscribe() before pushing events" + ) + + timeout = timeout or self.timeout + + push = AsyncPush(self, event, payload, timeout) + if self._can_push(): + await push.send() + else: + push.start_timeout() + self._push_buffer.append(push) + + return push + + async def join(self) -> AsyncRealtimeChannel: + """ + Coroutine that attempts to join Phoenix Realtime server via a certain topic. + + :return: Channel + """ + try: + await self.socket.send( + { + "topic": self.topic, + "event": "phx_join", + "payload": {"config": self.params}, + "ref": None, + } + ) + except Exception as e: + print(e) + return self + + # Event handling methods + def _on( + self, type: str, callback: Callback, filter: Optional[Dict[str, Any]] = None + ) -> AsyncRealtimeChannel: + """ + Set up a listener for a specific event. + + :param type: The type of the event to listen for. + :param filter: Additional parameters for the event. + :param callback: The callback function to execute when the event is received. + :return: The Channel instance for method chaining. + """ + + type_lowercase = type.lower() + binding = Binding(type=type_lowercase, filter=filter or {}, callback=callback) + self.bindings.setdefault(type_lowercase, []).append(binding) + + return self + + def _off(self, type: str, filter: Dict[str, Any]) -> AsyncRealtimeChannel: + """ + Remove a listener for a specific event type and filter. + + :param type: The type of the event to remove the listener for. + :param filter: The filter associated with the listener to remove. + :return: The Channel instance for method chaining. + + This method removes all bindings for the specified event type that match + the given filter. If no matching bindings are found, the method does nothing. + """ + type_lowercase = type.lower() + + if type_lowercase in self.bindings: + self.bindings[type_lowercase] = [ + binding + for binding in self.bindings[type_lowercase] + if binding.filter != filter + ] + return self + + def on_broadcast( + self, event: str, callback: Callable[[Dict[str, Any]], None] + ) -> AsyncRealtimeChannel: + """ + Set up a listener for a specific broadcast event. + + :param event: The name of the broadcast event to listen for + :param callback: Function called with the payload when a matching broadcast is received + :return: The Channel instance for method chaining + """ + return self._on( + "broadcast", + filter={"event": event}, + callback=lambda payload, _: callback(payload), + ) + + def on_postgres_changes( + self, + event: RealtimePostgresChangesListenEvent, + callback: Callable[[Dict[str, Any]], None], + table: str = "*", + schema: str = "public", + filter: Optional[str] = None, + ) -> AsyncRealtimeChannel: + """ + Set up a listener for Postgres database changes. + + :param event: The type of database event to listen for (INSERT, UPDATE, DELETE, or *) + :param callback: Function called with the payload when a matching change is detected + :param table: The table name to monitor. Defaults to "*" for all tables + :param schema: The database schema to monitor. Defaults to "public" + :param filter: Optional filter string to apply + :return: The Channel instance for method chaining + """ + + binding_filter = {"event": event, "schema": schema, "table": table} + if filter: + binding_filter["filter"] = filter + + return self._on( + "postgres_changes", + filter=binding_filter, + callback=lambda payload, _: callback(payload), + ) + + def on_system( + self, callback: Callable[[Dict[str, Any], None]] + ) -> AsyncRealtimeChannel: + """ + Set up a listener for system events. + + :param callback: The callback function to execute when a system event is received. + :return: The Channel instance for method chaining. + """ + return self._on("system", callback=lambda payload, _: callback(payload)) + + # Presence methods + async def track(self, user_status: Dict[str, Any]) -> None: + """ + Track presence status for the current user. + + :param user_status: Dictionary containing the user's presence information + """ + await self.send_presence("track", user_status) + + async def untrack(self) -> None: + """ + Stop tracking presence for the current user. + """ + await self.send_presence("untrack", {}) + + def presence_state(self) -> RealtimePresenceState: + """ + Get the current state of presence on this channel. + + :return: Dictionary mapping presence keys to lists of presence payloads + """ + return self.presence.state + + def on_presence_sync(self, callback: Callable[[], None]) -> AsyncRealtimeChannel: + """ + Register a callback for presence sync events. + + :param callback: The callback function to execute when a presence sync event occurs. + :return: The Channel instance for method chaining. + """ + self.presence.on_sync(callback) + return self + + def on_presence_join( + self, callback: PresenceOnJoinCallback + ) -> AsyncRealtimeChannel: + """ + Register a callback for presence join events. + + :param callback: The callback function to execute when a presence join event occurs. + :return: The Channel instance for method chaining. + """ + self.presence.on_join(callback) + return self + + def on_presence_leave( + self, callback: PresenceOnLeaveCallback + ) -> AsyncRealtimeChannel: + """ + Register a callback for presence leave events. + + :param callback: The callback function to execute when a presence leave event occurs. + :return: The Channel instance for method chaining. + """ + self.presence.on_leave(callback) + return self + + # Broadcast methods + async def send_broadcast(self, event: str, data: Any) -> None: + """ + Send a broadcast message through this channel. + + :param event: The name of the broadcast event + :param data: The payload to broadcast + """ + await self.push( + ChannelEvents.broadcast, + {"type": "broadcast", "event": event, "payload": data}, + ) + + # Internal methods + def _broadcast_endpoint_url(self): + return f"{http_endpoint_url(self.socket.http_endpoint)}/api/broadcast" + + async def _rejoin(self) -> None: + if self.is_leaving: + return + await self.socket._leave_open_topic(self.topic) + self.state = ChannelStates.JOINING + await self.join_push.resend() + + def _can_push(self): + return self.socket.is_connected and self._joined_once + + async def send_presence(self, event: str, data: Any) -> None: + await self.push(ChannelEvents.presence, {"event": event, "payload": data}) + + def _trigger(self, type: str, payload: Optional[Any], ref: Optional[str] = None): + type_lowercase = type.lower() + events = [ + ChannelEvents.close, + ChannelEvents.error, + ChannelEvents.leave, + ChannelEvents.join, + ] + + if ref is not None and type_lowercase in events and ref != self.join_push.ref: + return + + if type_lowercase in ["insert", "update", "delete"]: + postgres_changes = filter( + lambda binding: binding.filter.get("event", "").lower() + in [type_lowercase, "*"], + self.bindings.get("postgres_changes", []), + ) + for binding in postgres_changes: + binding.callback(payload, ref) + else: + bindings = self.bindings.get(type_lowercase, []) + for binding in bindings: + if type_lowercase in ["broadcast", "postgres_changes", "presence"]: + bind_id = binding.id + bind_event = ( + binding.filter.get("event", "").lower() + if binding.filter.get("event") + else None + ) + payload_event = ( + payload.get("event", "").lower() + if payload.get("event") + else None + ) + data_type = ( + payload.get("data", {}).get("type", "").lower() + if payload.get("data", {}).get("type") + else None + ) + if ( + bind_id + and bind_id in payload.get("ids", []) + and (bind_event == data_type or bind_event == "*") + ): + binding.callback(payload, ref) + elif bind_event in [payload_event, "*"]: + binding.callback(payload, ref) + elif binding.type == type_lowercase: + binding.callback(payload, ref) + + def _reply_event_name(self, ref: str): + return f"chan_reply_{ref}" + + async def _rejoin_until_connected(self): + self.rejoin_timer.schedule_timeout() + if self.socket.is_connected: + await self._rejoin() diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/client.py b/.venv/lib/python3.12/site-packages/realtime/_async/client.py new file mode 100644 index 00000000..9e8b669d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/_async/client.py @@ -0,0 +1,398 @@ +import asyncio +import json +import logging +import re +from base64 import b64decode +from datetime import datetime +from functools import wraps +from math import floor +from typing import Any, Callable, Dict, List, Optional +from urllib.parse import urlencode, urlparse, urlunparse + +import websockets +from websockets import connect +from websockets.client import ClientProtocol + +from ..message import Message +from ..transformers import http_endpoint_url +from ..types import ( + DEFAULT_HEARTBEAT_INTERVAL, + DEFAULT_TIMEOUT, + PHOENIX_CHANNEL, + VSN, + ChannelEvents, +) +from ..utils import is_ws_url +from .channel import AsyncRealtimeChannel, RealtimeChannelOptions + + +def deprecated(func: Callable) -> Callable: + @wraps(func) + def wrapper(*args, **kwargs): + logger.warning(f"Warning: {func.__name__} is deprecated.") + return func(*args, **kwargs) + + return wrapper + + +logger = logging.getLogger(__name__) + + +class AsyncRealtimeClient: + def __init__( + self, + url: str, + token: Optional[str] = None, + auto_reconnect: bool = True, + params: Optional[Dict[str, Any]] = None, + hb_interval: int = DEFAULT_HEARTBEAT_INTERVAL, + max_retries: int = 5, + initial_backoff: float = 1.0, + timeout: int = DEFAULT_TIMEOUT, + ) -> None: + """ + Initialize a RealtimeClient instance for WebSocket communication. + + :param url: WebSocket URL of the Realtime server. Starts with `ws://` or `wss://`. + Also accepts default Supabase URL: `http://` or `https://`. + :param token: Authentication token for the WebSocket connection. + :param auto_reconnect: If True, automatically attempt to reconnect on disconnection. Defaults to True. + :param params: Optional parameters for the connection. Defaults to None. + :param hb_interval: Interval (in seconds) for sending heartbeat messages to keep the connection alive. Defaults to 25. + :param max_retries: Maximum number of reconnection attempts. Defaults to 5. + :param initial_backoff: Initial backoff time (in seconds) for reconnection attempts. Defaults to 1.0. + :param timeout: Connection timeout in seconds. Defaults to DEFAULT_TIMEOUT. + """ + if not is_ws_url(url): + ValueError("url must be a valid WebSocket URL or HTTP URL string") + self.url = f"{re.sub(r'https://', 'wss://', re.sub(r'http://', 'ws://', url, flags=re.IGNORECASE), flags=re.IGNORECASE)}/websocket" + if token: + self.url += f"?apikey={token}" + self.http_endpoint = http_endpoint_url(url) + self.params = params or {} + self.apikey = token + self.access_token = token + self.send_buffer: List[Callable] = [] + self.hb_interval = hb_interval + self.ws_connection: Optional[ClientProtocol] = None + self.ref = 0 + self.auto_reconnect = auto_reconnect + self.channels: Dict[str, AsyncRealtimeChannel] = {} + self.max_retries = max_retries + self.initial_backoff = initial_backoff + self.timeout = timeout + self._listen_task: Optional[asyncio.Task] = None + self._heartbeat_task: Optional[asyncio.Task] = None + + @property + def is_connected(self) -> bool: + return self.ws_connection is not None + + async def _listen(self) -> None: + """ + An infinite loop that keeps listening. + :return: None + """ + + if not self.ws_connection: + raise Exception("WebSocket connection not established") + + try: + async for msg in self.ws_connection: + logger.info(f"receive: {msg}") + + msg = Message(**json.loads(msg)) + channel = self.channels.get(msg.topic) + + if channel: + channel._trigger(msg.event, msg.payload, msg.ref) + except websockets.exceptions.ConnectionClosedError as e: + logger.error( + f"WebSocket connection closed with code: {e.code}, reason: {e.reason}" + ) + if self.auto_reconnect: + logger.info("Initiating auto-reconnect sequence...") + + await self._reconnect() + else: + logger.error("Auto-reconnect disabled, terminating connection") + + async def _reconnect(self) -> None: + self.ws_connection = None + await self.connect() + + if self.is_connected: + for topic, channel in self.channels.items(): + logger.info(f"Rejoining channel after reconnection: {topic}") + await channel._rejoin() + + async def connect(self) -> None: + """ + Establishes a WebSocket connection with exponential backoff retry mechanism. + + This method attempts to connect to the WebSocket server. If the connection fails, + it will retry with an exponential backoff strategy up to a maximum number of retries. + + Returns: + None + + Raises: + Exception: If unable to establish a connection after max_retries attempts. + + Note: + - The initial backoff time and maximum retries are set during RealtimeClient initialization. + - The backoff time doubles after each failed attempt, up to a maximum of 60 seconds. + """ + + if self.is_connected: + logger.info("WebSocket connection already established") + return + + retries = 0 + backoff = self.initial_backoff + + logger.info(f"Attempting to connect to WebSocket at {self.url}") + + while retries < self.max_retries: + try: + ws = await connect(self.url) + self.ws_connection = ws + logger.info("WebSocket connection established successfully") + return await self._on_connect() + except Exception as e: + retries += 1 + logger.error(f"Connection attempt failed: {str(e)}") + + if retries >= self.max_retries or not self.auto_reconnect: + logger.error( + f"Connection failed permanently after {retries} attempts. Error: {str(e)}" + ) + raise + else: + wait_time = backoff * (2 ** (retries - 1)) + logger.info( + f"Retry {retries}/{self.max_retries}: Next attempt in {wait_time:.2f}s (backoff={backoff}s)" + ) + await asyncio.sleep(wait_time) + backoff = min(backoff * 2, 60) + + raise Exception( + f"Failed to establish WebSocket connection after {self.max_retries} attempts" + ) + + @deprecated + async def listen(self): + pass + + async def _on_connect(self) -> None: + self._listen_task = asyncio.create_task(self._listen()) + self._heartbeat_task = asyncio.create_task(self._heartbeat()) + + await self._flush_send_buffer() + + async def _flush_send_buffer(self): + if self.is_connected and len(self.send_buffer) > 0: + for callback in self.send_buffer: + await callback() + self.send_buffer = [] + + async def close(self) -> None: + """ + Close the WebSocket connection. + + Returns: + None + + Raises: + NotConnectedError: If the connection is not established when this method is called. + """ + + if self.ws_connection: + await self.ws_connection.close() + + self.ws_connection = None + + if self._listen_task: + self._listen_task.cancel() + self._listen_task = None + + if self._heartbeat_task: + self._heartbeat_task.cancel() + self._heartbeat_task = None + + async def _heartbeat(self) -> None: + if not self.ws_connection: + raise Exception("WebSocket connection not established") + + while self.is_connected: + try: + data = dict( + topic=PHOENIX_CHANNEL, + event=ChannelEvents.heartbeat, + payload={}, + ref=None, + ) + await self.send(data) + await asyncio.sleep(max(self.hb_interval, 15)) + + except websockets.exceptions.ConnectionClosed as e: + logger.error( + f"Connection closed during heartbeat. Code: {e.code}, reason: {e.reason}" + ) + + if self.auto_reconnect: + logger.info("Heartbeat failed - initiating reconnection sequence") + await self._reconnect() + else: + logger.error("Heartbeat failed - auto-reconnect disabled") + break + + def channel( + self, topic: str, params: Optional[RealtimeChannelOptions] = None + ) -> AsyncRealtimeChannel: + """ + Initialize a channel and create a two-way association with the socket. + + :param topic: The topic to subscribe to + :param params: Optional channel parameters + :return: AsyncRealtimeChannel instance + """ + topic = f"realtime:{topic}" + chan = AsyncRealtimeChannel(self, topic, params) + self.channels[topic] = chan + + return chan + + def get_channels(self) -> List[AsyncRealtimeChannel]: + return list(self.channels.values()) + + def _remove_channel(self, channel: AsyncRealtimeChannel) -> None: + del self.channels[channel.topic] + + async def remove_channel(self, channel: AsyncRealtimeChannel) -> None: + """ + Unsubscribes and removes a channel from the socket + :param channel: Channel to remove + :return: None + """ + if channel.topic in self.channels: + await self.channels[channel.topic].unsubscribe() + + if len(self.channels) == 0: + await self.close() + + async def remove_all_channels(self) -> None: + """ + Unsubscribes and removes all channels from the socket + :return: None + """ + for _, channel in self.channels.items(): + await channel.unsubscribe() + + await self.close() + + def summary(self) -> None: + """ + Prints a list of topics and event the socket is listening to + :return: None + """ + for topic, channel in self.channels.items(): + print(f"Topic: {topic} | Events: {[e for e, _ in channel.listeners]}]") + + async def set_auth(self, token: Optional[str]) -> None: + """ + Set the authentication token for the connection and update all joined channels. + + This method updates the access token for the current connection and sends the new token + to all joined channels. This is useful for refreshing authentication or changing users. + + Args: + token (Optional[str]): The new authentication token. Can be None to remove authentication. + + Returns: + None + """ + # No empty string tokens. + if isinstance(token, str) and len(token.strip()) == 0: + raise ValueError("Provide a valid jwt token") + + if token: + parsed = None + try: + payload = token.split(".")[1] + "==" + parsed = json.loads(b64decode(payload).decode("utf-8")) + except Exception: + raise ValueError("InvalidJWTToken") + + if parsed: + # Handle expired token if any. + if "exp" in parsed: + now = floor(datetime.now().timestamp()) + valid = now - parsed["exp"] < 0 + if not valid: + raise ValueError( + f"InvalidJWTToken: Invalid value for JWT claim 'exp' with value { parsed['exp'] }" + ) + else: + raise ValueError("InvalidJWTToken: expected claim 'exp'") + + self.access_token = token + + for _, channel in self.channels.items(): + if channel._joined_once and channel.is_joined: + await channel.push(ChannelEvents.access_token, {"access_token": token}) + + def _make_ref(self) -> str: + self.ref += 1 + return f"{self.ref}" + + async def send(self, message: Dict[str, Any]) -> None: + """ + Send a message through the WebSocket connection. + + This method serializes the given message dictionary to JSON, + and sends it through the WebSocket connection. If the connection + is not currently established, the message will be buffered and sent + once the connection is re-established. + + Args: + message (Dict[str, Any]): The message to be sent, as a dictionary. + + Returns: + None + """ + + message = json.dumps(message) + logger.info(f"send: {message}") + + async def send_message(): + await self.ws_connection.send(message) + + if self.is_connected: + await send_message() + else: + self.send_buffer.append(send_message) + + async def _leave_open_topic(self, topic: str): + dup_channels = [ + ch + for ch in self.channels.values() + if ch.topic == topic and (ch.is_joined or ch.is_joining) + ] + + for ch in dup_channels: + await ch.unsubscribe() + + def endpoint_url(self) -> str: + parsed_url = urlparse(self.url) + query = urlencode({**self.params, "vsn": VSN}, doseq=True) + return urlunparse( + ( + parsed_url.scheme, + parsed_url.netloc, + parsed_url.path, + parsed_url.params, + query, + parsed_url.fragment, + ) + ) diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/presence.py b/.venv/lib/python3.12/site-packages/realtime/_async/presence.py new file mode 100644 index 00000000..4063db4a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/_async/presence.py @@ -0,0 +1,238 @@ +""" + Defines the RealtimePresence class and its dependencies. +""" + +import logging +from typing import Any, Callable, Dict, List, Optional, Union + +from ..types import ( + PresenceDiff, + PresenceEvents, + PresenceOnJoinCallback, + PresenceOnLeaveCallback, + PresenceOpts, + RawPresenceDiff, + RawPresenceState, + RealtimePresenceState, +) + +logger = logging.getLogger(__name__) + + +class AsyncRealtimePresence: + def __init__(self, channel, opts: Optional[PresenceOpts] = None): + self.channel = channel + self.state: RealtimePresenceState = {} + self.pending_diffs: List[RawPresenceDiff] = [] + self.join_ref: Optional[str] = None + self.caller = { + "onJoin": lambda *args: None, + "onLeave": lambda *args: None, + "onSync": lambda: None, + "onAuthSuccess": lambda: None, + "onAuthFailure": lambda: None, + } + # Initialize with default events if not provided + events = ( + opts.events + if opts + else PresenceEvents(state="presence_state", diff="presence_diff") + ) + # Set up event listeners for presence state and diff + self.channel._on(events.state, callback=self._on_state_event) + self.channel._on(events.diff, callback=self._on_diff_event) + self.channel._on("phx_auth", callback=self._on_auth_event) + + def on_join(self, callback: PresenceOnJoinCallback): + self.caller["onJoin"] = callback + + def on_leave(self, callback: PresenceOnLeaveCallback): + self.caller["onLeave"] = callback + + def on_sync(self, callback: Callable[[], None]): + self.caller["onSync"] = callback + + def on_auth_success(self, callback: Callable[[], None]): + self.caller["onAuthSuccess"] = callback + + def on_auth_failure(self, callback: Callable[[], None]): + self.caller["onAuthFailure"] = callback + + def _on_state_event(self, payload: RawPresenceState, *args): + onJoin = self.caller["onJoin"] + onLeave = self.caller["onLeave"] + onSync = self.caller["onSync"] + + self.join_ref = self.channel.join_ref + self.state = self._sync_state(self.state, payload, onJoin, onLeave) + + for diff in self.pending_diffs: + self.state = self._sync_diff(self.state, diff, onJoin, onLeave) + self.pending_diffs = [] + onSync() + + def _on_diff_event(self, payload: Dict[str, Any], *args): + onJoin = self.caller["onJoin"] + onLeave = self.caller["onLeave"] + onSync = self.caller["onSync"] + + if self.in_pending_sync_state(): + self.pending_diffs.append(payload) + else: + self.state = self._sync_diff(self.state, payload, onJoin, onLeave) + onSync() + + def _on_auth_event(self, payload: Dict[str, Any], *args): + if payload.get("status") == "ok": + self.caller["onAuthSuccess"]() + else: + self.caller["onAuthFailure"]() + + def _sync_state( + self, + current_state: RealtimePresenceState, + new_state: Union[RawPresenceState, RealtimePresenceState], + onJoin: PresenceOnJoinCallback, + onLeave: PresenceOnLeaveCallback, + ) -> RealtimePresenceState: + state = {key: list(value) for key, value in current_state.items()} + transformed_state = AsyncRealtimePresence._transform_state(new_state) + + joins: Dict[str, Any] = {} + leaves: Dict[str, Any] = { + k: v for k, v in state.items() if k not in transformed_state + } + + for key, value in transformed_state.items(): + current_presences = state.get(key, []) + + if len(current_presences) > 0: + new_presence_refs = {presence.get("presence_ref") for presence in value} + cur_presence_refs = { + presence.get("presence_ref") for presence in current_presences + } + + joined_presences = [ + p for p in value if p.get("presence_ref") not in cur_presence_refs + ] + left_presences = [ + p + for p in current_presences + if p.get("presence_ref") not in new_presence_refs + ] + + if joined_presences: + joins[key] = joined_presences + if left_presences: + leaves[key] = left_presences + else: + joins[key] = value + + return self._sync_diff( + state, {"joins": joins, "leaves": leaves}, onJoin, onLeave + ) + + def _sync_diff( + self, + state: RealtimePresenceState, + diff: Union[RawPresenceDiff, PresenceDiff], + onJoin: PresenceOnJoinCallback, + onLeave: PresenceOnLeaveCallback, + ) -> RealtimePresenceState: + joins = AsyncRealtimePresence._transform_state(diff.get("joins", {})) + leaves = AsyncRealtimePresence._transform_state(diff.get("leaves", {})) + + for key, new_presences in joins.items(): + current_presences = state.get(key, []) + state[key] = new_presences + + if len(current_presences) > 0: + joined_presence_refs = { + presence.get("presence_ref") for presence in state.get(key) + } + cur_presences = list( + presence + for presence in current_presences + if presence.get("presence_ref") not in joined_presence_refs + ) + state[key] = cur_presences + state[key] + + onJoin(key, current_presences, new_presences) + + for key, left_presences in leaves.items(): + current_presences = state.get(key, []) + + if len(current_presences) == 0: + break + + presence_refs_to_remove = { + presence.get("presence_ref") for presence in left_presences + } + current_presences = [ + presence + for presence in current_presences + if presence.get("presence_ref") not in presence_refs_to_remove + ] + state[key] = current_presences + + onLeave(key, current_presences, left_presences) + + if len(current_presences) == 0: + del state[key] + + return state + + def in_pending_sync_state(self) -> bool: + return self.join_ref is None or self.join_ref != self.channel.join_ref + + @staticmethod + def _transform_state( + state: Union[RawPresenceState, RealtimePresenceState] + ) -> RealtimePresenceState: + """ + Transform the raw presence state into a standardized RealtimePresenceState format. + + This method processes the input state, which can be either a RawPresenceState or + an already transformed RealtimePresenceState. It handles the conversion of the + Phoenix channel's presence format to our internal representation. + + Args: + state (Union[RawPresenceState, RealtimePresenceState[T]]): The presence state to transform. + + Returns: + RealtimePresenceState[T]: The transformed presence state. + + Example: + Input (RawPresenceState): + { + "user1": { + "metas": [ + {"phx_ref": "ABC123", "user_id": "user1", "status": "online"}, + {"phx_ref": "DEF456", "phx_ref_prev": "ABC123", "user_id": "user1", "status": "away"} + ] + }, + "user2": [{"user_id": "user2", "status": "offline"}] + } + + Output (RealtimePresenceState): + { + "user1": [ + {"presence_ref": "ABC123", "user_id": "user1", "status": "online"}, + {"presence_ref": "DEF456", "user_id": "user1", "status": "away"} + ], + "user2": [{"user_id": "user2", "status": "offline"}] + } + """ + new_state: RealtimePresenceState = {} + for key, presences in state.items(): + if isinstance(presences, dict) and "metas" in presences: + new_state[key] = [] + + for presence in presences["metas"]: + presence["presence_ref"] = presence.pop("phx_ref", None) + presence.pop("phx_ref_prev", None) + new_state[key].append(presence) + + else: + new_state[key] = presences + return new_state diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/push.py b/.venv/lib/python3.12/site-packages/realtime/_async/push.py new file mode 100644 index 00000000..8e7a68f0 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/_async/push.py @@ -0,0 +1,122 @@ +import asyncio +import logging +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +from ..types import DEFAULT_TIMEOUT, Callback, _Hook + +if TYPE_CHECKING: + from .channel import AsyncRealtimeChannel + +logger = logging.getLogger(__name__) + + +class AsyncPush: + def __init__( + self, + channel: "AsyncRealtimeChannel", + event: str, + payload: Optional[Dict[str, Any]] = None, + timeout: int = DEFAULT_TIMEOUT, + ): + self.channel = channel + self.event = event + self.payload = payload or {} + self.timeout = timeout + self.rec_hooks: List[_Hook] = [] + self.ref: Optional[str] = None + self.ref_event: Optional[str] = None + self.received_resp: Optional[Dict[str, Any]] = None + self.sent = False + self.timeout_task: Optional[asyncio.Task] = None + + async def resend(self): + self._cancel_ref_event() + self.ref = "" + self.ref_event = None + self.received_resp = None + self.sent = False + await self.send() + + async def send(self): + if self._has_received("timeout"): + return + + self.start_timeout() + self.sent = True + + try: + await self.channel.socket.send( + { + "topic": self.channel.topic, + "event": self.event, + "payload": self.payload, + "ref": self.ref, + "join_ref": self.channel.join_push.ref, + } + ) + except Exception as e: + logger.error(f"send push failed: {e}") + + def update_payload(self, payload: Dict[str, Any]): + self.payload = {**self.payload, **payload} + + def receive(self, status: str, callback: Callback) -> "AsyncPush": + if self._has_received(status): + callback(self.received_resp.get("response", {})) + + self.rec_hooks.append(_Hook(status, callback)) + return self + + def start_timeout(self): + if self.timeout_task: + return + + self.ref = self.channel.socket._make_ref() + self.ref_event = self.channel._reply_event_name(self.ref) + + def on_reply(payload, *args): + self._cancel_ref_event() + self._cancel_timeout() + self.received_resp = payload + self._match_receive(**self.received_resp) + + self.channel._on(self.ref_event, on_reply) + + async def timeout(self): + await asyncio.sleep(self.timeout) + self.trigger("timeout", {}) + + self.timeout_task = asyncio.create_task(timeout(self)) + + def trigger(self, status: str, response: Any): + if self.ref_event: + payload = { + "status": status, + "response": response, + } + self.channel._trigger(self.ref_event, payload) + + def destroy(self): + self._cancel_ref_event() + self._cancel_timeout() + + def _cancel_ref_event(self): + if not self.ref_event: + return + + self.channel._off(self.ref_event, {}) + + def _cancel_timeout(self): + if not self.timeout_task: + return + + self.timeout_task.cancel() + self.timeout_task = None + + def _match_receive(self, status: str, response: Any): + for hook in self.rec_hooks: + if hook.status == status: + hook.callback(response) + + def _has_received(self, status: str): + return self.received_resp and self.received_resp.get("status") == status diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/timer.py b/.venv/lib/python3.12/site-packages/realtime/_async/timer.py new file mode 100644 index 00000000..bd18ca0b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/_async/timer.py @@ -0,0 +1,40 @@ +import asyncio +import logging +from typing import Callable, Optional + +logger = logging.getLogger(__name__) + + +class AsyncTimer: + def __init__(self, callback: Callable, timer_calc: Callable[[int], int]): + self.callback = callback + self.timer_calc = timer_calc + self.timer: Optional[asyncio.Task] = None + self.tries: int = 0 + + def reset(self): + self.tries = 0 + if self.timer and not self.timer.done(): + self.timer.cancel() + self.timer = None + logger.debug( + "AsyncTimer has been reset and any scheduler tasks have been cancelled" + ) + + def schedule_timeout(self): + if self.timer: + self.timer.cancel() + + self.tries += 1 + delay = self.timer_calc(self.tries + 1) + logger.debug(f"Scheduling callback to run after {delay} seconds.") + self.timer = asyncio.create_task(self._run_timer(delay)) + + async def _run_timer(self, delay: float): + try: + await asyncio.sleep(delay) + await self.callback() + except asyncio.CancelledError: + logger.debug("AsyncTimer task was cancelled.") + except Exception as e: + logger.exception(f"Error in AsyncTimer callback: {e}") diff --git a/.venv/lib/python3.12/site-packages/realtime/_sync/__init__.py b/.venv/lib/python3.12/site-packages/realtime/_sync/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/_sync/__init__.py diff --git a/.venv/lib/python3.12/site-packages/realtime/_sync/channel.py b/.venv/lib/python3.12/site-packages/realtime/_sync/channel.py new file mode 100644 index 00000000..3512f9b7 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/_sync/channel.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, Optional + +from realtime.types import RealtimeChannelOptions + +if TYPE_CHECKING: + from .client import SyncRealtimeClient + + +class SyncRealtimeChannel: + """ + `Channel` is an abstraction for a topic listener for an existing socket connection. + Each Channel has its own topic and a list of event-callbacks that responds to messages. + Should only be instantiated through `connection.RealtimeClient().channel(topic)`. + """ + + def __init__( + self, + socket: SyncRealtimeClient, + topic: str, + params: Optional[RealtimeChannelOptions] = None, + ) -> None: + """ + Initialize the Channel object. + + :param socket: RealtimeClient object + :param topic: Topic that it subscribes to on the realtime server + :param params: Optional parameters for connection. + """ diff --git a/.venv/lib/python3.12/site-packages/realtime/_sync/client.py b/.venv/lib/python3.12/site-packages/realtime/_sync/client.py new file mode 100644 index 00000000..634a5343 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/_sync/client.py @@ -0,0 +1,71 @@ +from typing import Any, Dict, List, Optional + +from .channel import RealtimeChannelOptions, SyncRealtimeChannel + +NOT_IMPLEMENTED_MESSAGE = "This feature isn't available in the sync client. You can use the realtime feature in the async client only." + + +class SyncRealtimeClient: + def __init__( + self, + url: str, + token: str, + auto_reconnect: bool = True, + params: Optional[Dict[str, Any]] = None, + hb_interval: int = 30, + max_retries: int = 5, + initial_backoff: float = 1.0, + ) -> None: + """ + Initialize a RealtimeClient instance for WebSocket communication. + + :param url: WebSocket URL of the Realtime server. Starts with `ws://` or `wss://`. + Also accepts default Supabase URL: `http://` or `https://`. + :param token: Authentication token for the WebSocket connection. + :param auto_reconnect: If True, automatically attempt to reconnect on disconnection. Defaults to False. + :param params: Optional parameters for the connection. Defaults to an empty dictionary. + :param hb_interval: Interval (in seconds) for sending heartbeat messages to keep the connection alive. Defaults to 30. + :param max_retries: Maximum number of reconnection attempts. Defaults to 5. + :param initial_backoff: Initial backoff time (in seconds) for reconnection attempts. Defaults to 1.0. + """ + + def channel( + self, topic: str, params: Optional[RealtimeChannelOptions] = None + ) -> SyncRealtimeChannel: + """ + :param topic: Initializes a channel and creates a two-way association with the socket + :return: Channel + """ + raise NotImplementedError(NOT_IMPLEMENTED_MESSAGE) + + def get_channels(self) -> List[SyncRealtimeChannel]: + raise NotImplementedError(NOT_IMPLEMENTED_MESSAGE) + + def remove_channel(self, channel: SyncRealtimeChannel) -> None: + """ + Unsubscribes and removes a channel from the socket + :param channel: Channel to remove + :return: None + """ + raise NotImplementedError(NOT_IMPLEMENTED_MESSAGE) + + def remove_all_channels(self) -> None: + """ + Unsubscribes and removes all channels from the socket + :return: None + """ + raise NotImplementedError(NOT_IMPLEMENTED_MESSAGE) + + def set_auth(self, token: Optional[str]) -> None: + """ + Set the authentication token for the connection and update all joined channels. + + This method updates the access token for the current connection and sends the new token + to all joined channels. This is useful for refreshing authentication or changing users. + + Args: + token (Optional[str]): The new authentication token. Can be None to remove authentication. + + Returns: + None + """ diff --git a/.venv/lib/python3.12/site-packages/realtime/_sync/presence.py b/.venv/lib/python3.12/site-packages/realtime/_sync/presence.py new file mode 100644 index 00000000..db02a1ce --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/_sync/presence.py @@ -0,0 +1,12 @@ +""" + Defines the RealtimePresence class and its dependencies. +""" + +from typing import Optional + +from ..types import PresenceOpts + + +class SyncRealtimePresence: + def __init__(self, channel, opts: Optional[PresenceOpts] = None): + pass diff --git a/.venv/lib/python3.12/site-packages/realtime/exceptions.py b/.venv/lib/python3.12/site-packages/realtime/exceptions.py new file mode 100644 index 00000000..9fec53d2 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/exceptions.py @@ -0,0 +1,22 @@ +class NotConnectedError(Exception): + """ + Raised when operations requiring a connection are executed when socket is not connected + """ + + def __init__(self, func_name: str): + self.offending_func_name: str = func_name + + def __str__(self): + return f"A WS connection has not been established. Ensure you call RealtimeClient.connect() before calling RealtimeClient.{self.offending_func_name}()" + + +class AuthorizationError(Exception): + """ + Raised when there is an authorization failure for private channels + """ + + def __init__(self, message: str = None): + self.message: str = message or "Authorization failed for private channel" + + def __str__(self): + return self.message diff --git a/.venv/lib/python3.12/site-packages/realtime/message.py b/.venv/lib/python3.12/site-packages/realtime/message.py new file mode 100644 index 00000000..f4ed22fd --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/message.py @@ -0,0 +1,26 @@ +from dataclasses import dataclass +from typing import Any, Dict, Optional + + +@dataclass +class Message: + """ + Dataclass abstraction for message + """ + + event: str + payload: Dict[str, Any] + ref: Any + topic: str + join_ref: Optional[str] = None + + def __hash__(self): + return hash( + ( + self.event, + tuple(list(self.payload.values())), + self.ref, + self.topic, + self.join_ref, + ) + ) diff --git a/.venv/lib/python3.12/site-packages/realtime/transformers.py b/.venv/lib/python3.12/site-packages/realtime/transformers.py new file mode 100644 index 00000000..bd541377 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/transformers.py @@ -0,0 +1,9 @@ +import re + + +def http_endpoint_url(socket_url: str) -> str: + url = re.sub(r"^ws", "http", socket_url, flags=re.IGNORECASE) + url = re.sub( + r"(\/socket\/websocket|\/socket|\/websocket)\/?$", "", url, flags=re.IGNORECASE + ) + return re.sub(r"\/+$", "", url) diff --git a/.venv/lib/python3.12/site-packages/realtime/types.py b/.venv/lib/python3.12/site-packages/realtime/types.py new file mode 100644 index 00000000..e40159ae --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/types.py @@ -0,0 +1,155 @@ +from enum import Enum +from typing import Any, Callable, Dict, List, Literal, Optional, TypedDict, TypeVar + +from typing_extensions import ParamSpec + +# Constants +DEFAULT_TIMEOUT = 10 +PHOENIX_CHANNEL = "phoenix" +VSN = "1.0.0" +DEFAULT_HEARTBEAT_INTERVAL = 25 + +# Type variables and custom types +T_ParamSpec = ParamSpec("T_ParamSpec") +T_Retval = TypeVar("T_Retval") +Callback = Callable[T_ParamSpec, T_Retval] + + +# Enums +class ChannelEvents(str, Enum): + """ + ChannelEvents are a bunch of constant strings that are defined according to + what the Phoenix realtime server expects. + """ + + close = "phx_close" + error = "phx_error" + join = "phx_join" + reply = "phx_reply" + leave = "phx_leave" + heartbeat = "heartbeat" + access_token = "access_token" + broadcast = "broadcast" + presence = "presence" + + +class ChannelStates(str, Enum): + JOINED = "joined" + CLOSED = "closed" + ERRORED = "errored" + JOINING = "joining" + LEAVING = "leaving" + + +class RealtimeSubscribeStates(str, Enum): + SUBSCRIBED = "SUBSCRIBED" + TIMED_OUT = "TIMED_OUT" + CLOSED = "CLOSED" + CHANNEL_ERROR = "CHANNEL_ERROR" + + +class RealtimePresenceListenEvents(str, Enum): + SYNC = "SYNC" + JOIN = "JOIN" + LEAVE = "LEAVE" + + +# Literals +RealtimePostgresChangesListenEvent = Literal["*", "INSERT", "UPDATE", "DELETE"] + + +# Classes +class Binding: + def __init__( + self, + type: str, + filter: Dict[str, Any], + callback: Callback, + id: Optional[str] = None, + ): + self.type = type + self.filter = filter + self.callback = callback + self.id = id + + +class _Hook: + def __init__(self, status: str, callback: Callback): + self.status = status + self.callback = callback + + +class Presence(Dict[str, Any]): + presence_ref: str + + +class PresenceEvents: + def __init__(self, state: str, diff: str): + self.state = state + self.diff = diff + + +class PresenceOpts: + def __init__(self, events: PresenceEvents): + self.events = events + + +# TypedDicts +class RealtimeChannelBroadcastConfig(TypedDict): + ack: bool + self: bool + + +class RealtimeChannelPresenceConfig(TypedDict): + key: str + + +class RealtimeChannelConfig(TypedDict): + broadcast: RealtimeChannelBroadcastConfig + presence: RealtimeChannelPresenceConfig + private: bool + + +class RealtimeChannelOptions(TypedDict): + config: RealtimeChannelConfig + + +class PresenceMeta(TypedDict, total=False): + phx_ref: str + phx_ref_prev: str + + +class RawPresenceStateEntry(TypedDict): + metas: List[PresenceMeta] + + +# Custom types +PresenceOnJoinCallback = Callable[[str, List[Any], List[Any]], None] +PresenceOnLeaveCallback = Callable[[str, List[Any], List[Any]], None] +RealtimePresenceState = Dict[str, List[Presence]] +RawPresenceState = Dict[str, RawPresenceStateEntry] + + +class RawPresenceDiff(TypedDict): + joins: RawPresenceState + leaves: RawPresenceState + + +class PresenceDiff(TypedDict): + joins: RealtimePresenceState + leaves: RealtimePresenceState + + +# Specific payload types +class RealtimePresenceJoinPayload(Dict[str, Any]): + event: Literal[RealtimePresenceListenEvents.JOIN] + key: str + current_presences: List[Presence] + new_presences: List[Presence] + + +class RealtimePresenceLeavePayload(Dict[str, Any]): + event: Literal[RealtimePresenceListenEvents.LEAVE] + key: str + current_presences: List[Presence] + left_presences: List[Presence] diff --git a/.venv/lib/python3.12/site-packages/realtime/utils.py b/.venv/lib/python3.12/site-packages/realtime/utils.py new file mode 100644 index 00000000..67b22018 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/utils.py @@ -0,0 +1,5 @@ +from urllib.parse import urlparse + + +def is_ws_url(url: str) -> bool: + return urlparse(url).scheme in {"wss", "ws", "http", "https"} diff --git a/.venv/lib/python3.12/site-packages/realtime/version.py b/.venv/lib/python3.12/site-packages/realtime/version.py new file mode 100644 index 00000000..91ab439a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/realtime/version.py @@ -0,0 +1 @@ +__version__ = "2.4.1" # {x-release-please-version} |