aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/realtime
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/realtime')
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/__init__.py17
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/_async/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/_async/channel.py565
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/_async/client.py398
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/_async/presence.py238
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/_async/push.py122
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/_async/timer.py40
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/_sync/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/_sync/channel.py30
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/_sync/client.py71
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/_sync/presence.py12
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/exceptions.py22
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/message.py26
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/transformers.py9
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/types.py155
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/utils.py5
-rw-r--r--.venv/lib/python3.12/site-packages/realtime/version.py1
17 files changed, 1711 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/realtime/__init__.py b/.venv/lib/python3.12/site-packages/realtime/__init__.py
new file mode 100644
index 00000000..a7749e74
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/__init__.py
@@ -0,0 +1,17 @@
+import logging
+
+# Configure the root logger for the module
+logging.getLogger(__name__).addHandler(logging.NullHandler())
+
+from realtime.version import __version__
+
+from ._async.channel import AsyncRealtimeChannel
+from ._async.client import AsyncRealtimeClient
+from ._async.presence import AsyncRealtimePresence
+from ._sync.channel import SyncRealtimeChannel
+from ._sync.client import SyncRealtimeClient
+from ._sync.presence import SyncRealtimePresence
+from .exceptions import *
+from .message import *
+from .transformers import *
+from .types import *
diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/__init__.py b/.venv/lib/python3.12/site-packages/realtime/_async/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/_async/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/channel.py b/.venv/lib/python3.12/site-packages/realtime/_async/channel.py
new file mode 100644
index 00000000..0050e5e2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/_async/channel.py
@@ -0,0 +1,565 @@
+from __future__ import annotations
+
+import asyncio
+import json
+import logging
+from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
+
+from realtime.types import (
+ Binding,
+ Callback,
+ ChannelEvents,
+ ChannelStates,
+ RealtimeChannelOptions,
+ RealtimePostgresChangesListenEvent,
+ RealtimePresenceState,
+ RealtimeSubscribeStates,
+)
+
+from ..transformers import http_endpoint_url
+from .presence import (
+ AsyncRealtimePresence,
+ PresenceOnJoinCallback,
+ PresenceOnLeaveCallback,
+)
+from .push import AsyncPush
+from .timer import AsyncTimer
+
+if TYPE_CHECKING:
+ from .client import AsyncRealtimeClient
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncRealtimeChannel:
+ """
+ Channel is an abstraction for a topic subscription on an existing socket connection.
+ Each Channel has its own topic and a list of event-callbacks that respond to messages.
+ Should only be instantiated through `AsyncRealtimeClient.channel(topic)`.
+ """
+
+ def __init__(
+ self,
+ socket: AsyncRealtimeClient,
+ topic: str,
+ params: Optional[RealtimeChannelOptions] = None,
+ ) -> None:
+ """
+ Initialize the Channel object.
+
+ :param socket: RealtimeClient object
+ :param topic: Topic that it subscribes to on the realtime server
+ :param params: Optional parameters for connection.
+ """
+ self.socket = socket
+ self.params = params or {}
+ if self.params.get("config") is None:
+ self.params["config"] = {
+ "broadcast": {"ack": False, "self": False},
+ "presence": {"key": ""},
+ "private": False,
+ }
+
+ self.topic = topic
+ self._joined_once = False
+ self.bindings: Dict[str, List[Binding]] = {}
+ self.presence = AsyncRealtimePresence(self)
+ self.state = ChannelStates.CLOSED
+ self._push_buffer: List[AsyncPush] = []
+ self.timeout = self.socket.timeout
+
+ self.join_push = AsyncPush(self, ChannelEvents.join, self.params)
+ self.rejoin_timer = AsyncTimer(
+ self._rejoin_until_connected, lambda tries: 2**tries
+ )
+
+ self.broadcast_endpoint_url = self._broadcast_endpoint_url()
+
+ def on_join_push_ok(payload: Dict[str, Any], *args):
+ self.state = ChannelStates.JOINED
+ self.rejoin_timer.reset()
+ for push in self._push_buffer:
+ asyncio.create_task(push.send())
+ self._push_buffer = []
+
+ def on_join_push_timeout(*args):
+ if not self.is_joining:
+ return
+
+ logger.error(f"join push timeout for channel {self.topic}")
+ self.state = ChannelStates.ERRORED
+ self.rejoin_timer.schedule_timeout()
+
+ self.join_push.receive("ok", on_join_push_ok).receive(
+ "timeout", on_join_push_timeout
+ )
+
+ def on_close(*args):
+ logger.info(f"channel {self.topic} closed")
+ self.rejoin_timer.reset()
+ self.state = ChannelStates.CLOSED
+ self.socket._remove_channel(self)
+
+ def on_error(payload, *args):
+ if self.is_leaving or self.is_closed:
+ return
+
+ logger.info(f"channel {self.topic} error: {payload}")
+ self.state = ChannelStates.ERRORED
+ self.rejoin_timer.schedule_timeout()
+
+ self._on("close", on_close)
+ self._on("error", on_error)
+
+ def on_reply(payload, ref):
+ self._trigger(self._reply_event_name(ref), payload)
+
+ self._on(ChannelEvents.reply, on_reply)
+
+ # Properties
+ @property
+ def is_closed(self):
+ return self.state == ChannelStates.CLOSED
+
+ @property
+ def is_joining(self):
+ return self.state == ChannelStates.JOINING
+
+ @property
+ def is_leaving(self):
+ return self.state == ChannelStates.LEAVING
+
+ @property
+ def is_errored(self):
+ return self.state == ChannelStates.ERRORED
+
+ @property
+ def is_joined(self):
+ return self.state == ChannelStates.JOINED
+
+ @property
+ def join_ref(self):
+ return self.join_push.ref
+
+ # Core channel methods
+ async def subscribe(
+ self,
+ callback: Optional[
+ Callable[[RealtimeSubscribeStates, Optional[Exception]], None]
+ ] = None,
+ ) -> AsyncRealtimeChannel:
+ """
+ Subscribe to the channel. Can only be called once per channel instance.
+
+ :param callback: Optional callback function that receives subscription state updates
+ and any errors that occur during subscription
+ :return: The Channel instance for method chaining
+ :raises: Exception if called multiple times on the same channel instance
+ """
+ if not self.socket.is_connected:
+ await self.socket.connect()
+
+ if self._joined_once:
+ raise Exception(
+ "Tried to subscribe multiple times. 'subscribe' can only be called a single time per channel instance"
+ )
+ else:
+ config = self.params.get("config", {})
+ broadcast = config.get("broadcast", {})
+ presence = config.get("presence", {})
+ private = config.get("private", False)
+
+ access_token_payload = {}
+ config = {
+ "broadcast": broadcast,
+ "presence": presence,
+ "private": private,
+ "postgres_changes": list(
+ map(lambda x: x.filter, self.bindings.get("postgres_changes", []))
+ ),
+ }
+
+ if self.socket.access_token:
+ access_token_payload["access_token"] = self.socket.access_token
+
+ self.join_push.update_payload(
+ {**{"config": config}, **access_token_payload}
+ )
+ self._joined_once = True
+
+ def on_join_push_ok(payload: Dict[str, Any]):
+ server_postgres_changes: List[Dict[str, Any]] = payload.get(
+ "postgres_changes", []
+ )
+
+ if len(server_postgres_changes) == 0:
+ callback and callback(RealtimeSubscribeStates.SUBSCRIBED, None)
+ return
+
+ client_postgres_changes = self.bindings.get("postgres_changes", [])
+ new_postgres_bindings = []
+
+ bindings_len = len(client_postgres_changes)
+
+ for i in range(bindings_len):
+ client_binding = client_postgres_changes[i]
+ event = client_binding.filter.get("event")
+ schema = client_binding.filter.get("schema")
+ table = client_binding.filter.get("table")
+ filter = client_binding.filter.get("filter")
+
+ server_binding = (
+ server_postgres_changes[i]
+ if i < len(server_postgres_changes)
+ else None
+ )
+
+ if (
+ server_binding
+ and server_binding.get("event") == event
+ and server_binding.get("schema") == schema
+ and server_binding.get("table") == table
+ and server_binding.get("filter") == filter
+ ):
+ client_binding.id = server_binding.get("id")
+ new_postgres_bindings.append(client_binding)
+ else:
+ asyncio.create_task(self.unsubscribe())
+ callback and callback(
+ RealtimeSubscribeStates.CHANNEL_ERROR,
+ Exception(
+ "mismatch between server and client bindings for postgres changes"
+ ),
+ )
+ return
+
+ self.bindings["postgres_changes"] = new_postgres_bindings
+ callback and callback(RealtimeSubscribeStates.SUBSCRIBED, None)
+
+ def on_join_push_error(payload: Dict[str, Any]):
+ callback and callback(
+ RealtimeSubscribeStates.CHANNEL_ERROR,
+ Exception(json.dumps(payload)),
+ )
+
+ def on_join_push_timeout(*args):
+ callback and callback(RealtimeSubscribeStates.TIMED_OUT, None)
+
+ self.join_push.receive("ok", on_join_push_ok).receive(
+ "error", on_join_push_error
+ ).receive("timeout", on_join_push_timeout)
+
+ await self._rejoin()
+
+ return self
+
+ async def unsubscribe(self):
+ """
+ Unsubscribe from the channel and leave the topic.
+ Sets channel state to LEAVING and cleans up timers and pushes.
+ """
+ self.state = ChannelStates.LEAVING
+
+ self.rejoin_timer.reset()
+ self.join_push.destroy()
+
+ def _close(*args):
+ logger.info(f"channel {self.topic} leave")
+ self._trigger(ChannelEvents.close, "leave")
+
+ leave_push = AsyncPush(self, ChannelEvents.leave, {})
+ leave_push.receive("ok", _close).receive("timeout", _close)
+
+ await leave_push.send()
+
+ if not self._can_push():
+ leave_push.trigger("ok", {})
+
+ async def push(
+ self, event: str, payload: Dict[str, Any], timeout: Optional[int] = None
+ ) -> AsyncPush:
+ """
+ Push a message to the channel.
+
+ :param event: The event name to push
+ :param payload: The payload to send
+ :param timeout: Optional timeout in milliseconds
+ :return: AsyncPush instance representing the push operation
+ :raises: Exception if called before subscribing to the channel
+ """
+ if not self._joined_once:
+ raise Exception(
+ f"tried to push '{event}' to '{self.topic}' before joining. Use channel.subscribe() before pushing events"
+ )
+
+ timeout = timeout or self.timeout
+
+ push = AsyncPush(self, event, payload, timeout)
+ if self._can_push():
+ await push.send()
+ else:
+ push.start_timeout()
+ self._push_buffer.append(push)
+
+ return push
+
+ async def join(self) -> AsyncRealtimeChannel:
+ """
+ Coroutine that attempts to join Phoenix Realtime server via a certain topic.
+
+ :return: Channel
+ """
+ try:
+ await self.socket.send(
+ {
+ "topic": self.topic,
+ "event": "phx_join",
+ "payload": {"config": self.params},
+ "ref": None,
+ }
+ )
+ except Exception as e:
+ print(e)
+ return self
+
+ # Event handling methods
+ def _on(
+ self, type: str, callback: Callback, filter: Optional[Dict[str, Any]] = None
+ ) -> AsyncRealtimeChannel:
+ """
+ Set up a listener for a specific event.
+
+ :param type: The type of the event to listen for.
+ :param filter: Additional parameters for the event.
+ :param callback: The callback function to execute when the event is received.
+ :return: The Channel instance for method chaining.
+ """
+
+ type_lowercase = type.lower()
+ binding = Binding(type=type_lowercase, filter=filter or {}, callback=callback)
+ self.bindings.setdefault(type_lowercase, []).append(binding)
+
+ return self
+
+ def _off(self, type: str, filter: Dict[str, Any]) -> AsyncRealtimeChannel:
+ """
+ Remove a listener for a specific event type and filter.
+
+ :param type: The type of the event to remove the listener for.
+ :param filter: The filter associated with the listener to remove.
+ :return: The Channel instance for method chaining.
+
+ This method removes all bindings for the specified event type that match
+ the given filter. If no matching bindings are found, the method does nothing.
+ """
+ type_lowercase = type.lower()
+
+ if type_lowercase in self.bindings:
+ self.bindings[type_lowercase] = [
+ binding
+ for binding in self.bindings[type_lowercase]
+ if binding.filter != filter
+ ]
+ return self
+
+ def on_broadcast(
+ self, event: str, callback: Callable[[Dict[str, Any]], None]
+ ) -> AsyncRealtimeChannel:
+ """
+ Set up a listener for a specific broadcast event.
+
+ :param event: The name of the broadcast event to listen for
+ :param callback: Function called with the payload when a matching broadcast is received
+ :return: The Channel instance for method chaining
+ """
+ return self._on(
+ "broadcast",
+ filter={"event": event},
+ callback=lambda payload, _: callback(payload),
+ )
+
+ def on_postgres_changes(
+ self,
+ event: RealtimePostgresChangesListenEvent,
+ callback: Callable[[Dict[str, Any]], None],
+ table: str = "*",
+ schema: str = "public",
+ filter: Optional[str] = None,
+ ) -> AsyncRealtimeChannel:
+ """
+ Set up a listener for Postgres database changes.
+
+ :param event: The type of database event to listen for (INSERT, UPDATE, DELETE, or *)
+ :param callback: Function called with the payload when a matching change is detected
+ :param table: The table name to monitor. Defaults to "*" for all tables
+ :param schema: The database schema to monitor. Defaults to "public"
+ :param filter: Optional filter string to apply
+ :return: The Channel instance for method chaining
+ """
+
+ binding_filter = {"event": event, "schema": schema, "table": table}
+ if filter:
+ binding_filter["filter"] = filter
+
+ return self._on(
+ "postgres_changes",
+ filter=binding_filter,
+ callback=lambda payload, _: callback(payload),
+ )
+
+ def on_system(
+ self, callback: Callable[[Dict[str, Any], None]]
+ ) -> AsyncRealtimeChannel:
+ """
+ Set up a listener for system events.
+
+ :param callback: The callback function to execute when a system event is received.
+ :return: The Channel instance for method chaining.
+ """
+ return self._on("system", callback=lambda payload, _: callback(payload))
+
+ # Presence methods
+ async def track(self, user_status: Dict[str, Any]) -> None:
+ """
+ Track presence status for the current user.
+
+ :param user_status: Dictionary containing the user's presence information
+ """
+ await self.send_presence("track", user_status)
+
+ async def untrack(self) -> None:
+ """
+ Stop tracking presence for the current user.
+ """
+ await self.send_presence("untrack", {})
+
+ def presence_state(self) -> RealtimePresenceState:
+ """
+ Get the current state of presence on this channel.
+
+ :return: Dictionary mapping presence keys to lists of presence payloads
+ """
+ return self.presence.state
+
+ def on_presence_sync(self, callback: Callable[[], None]) -> AsyncRealtimeChannel:
+ """
+ Register a callback for presence sync events.
+
+ :param callback: The callback function to execute when a presence sync event occurs.
+ :return: The Channel instance for method chaining.
+ """
+ self.presence.on_sync(callback)
+ return self
+
+ def on_presence_join(
+ self, callback: PresenceOnJoinCallback
+ ) -> AsyncRealtimeChannel:
+ """
+ Register a callback for presence join events.
+
+ :param callback: The callback function to execute when a presence join event occurs.
+ :return: The Channel instance for method chaining.
+ """
+ self.presence.on_join(callback)
+ return self
+
+ def on_presence_leave(
+ self, callback: PresenceOnLeaveCallback
+ ) -> AsyncRealtimeChannel:
+ """
+ Register a callback for presence leave events.
+
+ :param callback: The callback function to execute when a presence leave event occurs.
+ :return: The Channel instance for method chaining.
+ """
+ self.presence.on_leave(callback)
+ return self
+
+ # Broadcast methods
+ async def send_broadcast(self, event: str, data: Any) -> None:
+ """
+ Send a broadcast message through this channel.
+
+ :param event: The name of the broadcast event
+ :param data: The payload to broadcast
+ """
+ await self.push(
+ ChannelEvents.broadcast,
+ {"type": "broadcast", "event": event, "payload": data},
+ )
+
+ # Internal methods
+ def _broadcast_endpoint_url(self):
+ return f"{http_endpoint_url(self.socket.http_endpoint)}/api/broadcast"
+
+ async def _rejoin(self) -> None:
+ if self.is_leaving:
+ return
+ await self.socket._leave_open_topic(self.topic)
+ self.state = ChannelStates.JOINING
+ await self.join_push.resend()
+
+ def _can_push(self):
+ return self.socket.is_connected and self._joined_once
+
+ async def send_presence(self, event: str, data: Any) -> None:
+ await self.push(ChannelEvents.presence, {"event": event, "payload": data})
+
+ def _trigger(self, type: str, payload: Optional[Any], ref: Optional[str] = None):
+ type_lowercase = type.lower()
+ events = [
+ ChannelEvents.close,
+ ChannelEvents.error,
+ ChannelEvents.leave,
+ ChannelEvents.join,
+ ]
+
+ if ref is not None and type_lowercase in events and ref != self.join_push.ref:
+ return
+
+ if type_lowercase in ["insert", "update", "delete"]:
+ postgres_changes = filter(
+ lambda binding: binding.filter.get("event", "").lower()
+ in [type_lowercase, "*"],
+ self.bindings.get("postgres_changes", []),
+ )
+ for binding in postgres_changes:
+ binding.callback(payload, ref)
+ else:
+ bindings = self.bindings.get(type_lowercase, [])
+ for binding in bindings:
+ if type_lowercase in ["broadcast", "postgres_changes", "presence"]:
+ bind_id = binding.id
+ bind_event = (
+ binding.filter.get("event", "").lower()
+ if binding.filter.get("event")
+ else None
+ )
+ payload_event = (
+ payload.get("event", "").lower()
+ if payload.get("event")
+ else None
+ )
+ data_type = (
+ payload.get("data", {}).get("type", "").lower()
+ if payload.get("data", {}).get("type")
+ else None
+ )
+ if (
+ bind_id
+ and bind_id in payload.get("ids", [])
+ and (bind_event == data_type or bind_event == "*")
+ ):
+ binding.callback(payload, ref)
+ elif bind_event in [payload_event, "*"]:
+ binding.callback(payload, ref)
+ elif binding.type == type_lowercase:
+ binding.callback(payload, ref)
+
+ def _reply_event_name(self, ref: str):
+ return f"chan_reply_{ref}"
+
+ async def _rejoin_until_connected(self):
+ self.rejoin_timer.schedule_timeout()
+ if self.socket.is_connected:
+ await self._rejoin()
diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/client.py b/.venv/lib/python3.12/site-packages/realtime/_async/client.py
new file mode 100644
index 00000000..9e8b669d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/_async/client.py
@@ -0,0 +1,398 @@
+import asyncio
+import json
+import logging
+import re
+from base64 import b64decode
+from datetime import datetime
+from functools import wraps
+from math import floor
+from typing import Any, Callable, Dict, List, Optional
+from urllib.parse import urlencode, urlparse, urlunparse
+
+import websockets
+from websockets import connect
+from websockets.client import ClientProtocol
+
+from ..message import Message
+from ..transformers import http_endpoint_url
+from ..types import (
+ DEFAULT_HEARTBEAT_INTERVAL,
+ DEFAULT_TIMEOUT,
+ PHOENIX_CHANNEL,
+ VSN,
+ ChannelEvents,
+)
+from ..utils import is_ws_url
+from .channel import AsyncRealtimeChannel, RealtimeChannelOptions
+
+
+def deprecated(func: Callable) -> Callable:
+ @wraps(func)
+ def wrapper(*args, **kwargs):
+ logger.warning(f"Warning: {func.__name__} is deprecated.")
+ return func(*args, **kwargs)
+
+ return wrapper
+
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncRealtimeClient:
+ def __init__(
+ self,
+ url: str,
+ token: Optional[str] = None,
+ auto_reconnect: bool = True,
+ params: Optional[Dict[str, Any]] = None,
+ hb_interval: int = DEFAULT_HEARTBEAT_INTERVAL,
+ max_retries: int = 5,
+ initial_backoff: float = 1.0,
+ timeout: int = DEFAULT_TIMEOUT,
+ ) -> None:
+ """
+ Initialize a RealtimeClient instance for WebSocket communication.
+
+ :param url: WebSocket URL of the Realtime server. Starts with `ws://` or `wss://`.
+ Also accepts default Supabase URL: `http://` or `https://`.
+ :param token: Authentication token for the WebSocket connection.
+ :param auto_reconnect: If True, automatically attempt to reconnect on disconnection. Defaults to True.
+ :param params: Optional parameters for the connection. Defaults to None.
+ :param hb_interval: Interval (in seconds) for sending heartbeat messages to keep the connection alive. Defaults to 25.
+ :param max_retries: Maximum number of reconnection attempts. Defaults to 5.
+ :param initial_backoff: Initial backoff time (in seconds) for reconnection attempts. Defaults to 1.0.
+ :param timeout: Connection timeout in seconds. Defaults to DEFAULT_TIMEOUT.
+ """
+ if not is_ws_url(url):
+ ValueError("url must be a valid WebSocket URL or HTTP URL string")
+ self.url = f"{re.sub(r'https://', 'wss://', re.sub(r'http://', 'ws://', url, flags=re.IGNORECASE), flags=re.IGNORECASE)}/websocket"
+ if token:
+ self.url += f"?apikey={token}"
+ self.http_endpoint = http_endpoint_url(url)
+ self.params = params or {}
+ self.apikey = token
+ self.access_token = token
+ self.send_buffer: List[Callable] = []
+ self.hb_interval = hb_interval
+ self.ws_connection: Optional[ClientProtocol] = None
+ self.ref = 0
+ self.auto_reconnect = auto_reconnect
+ self.channels: Dict[str, AsyncRealtimeChannel] = {}
+ self.max_retries = max_retries
+ self.initial_backoff = initial_backoff
+ self.timeout = timeout
+ self._listen_task: Optional[asyncio.Task] = None
+ self._heartbeat_task: Optional[asyncio.Task] = None
+
+ @property
+ def is_connected(self) -> bool:
+ return self.ws_connection is not None
+
+ async def _listen(self) -> None:
+ """
+ An infinite loop that keeps listening.
+ :return: None
+ """
+
+ if not self.ws_connection:
+ raise Exception("WebSocket connection not established")
+
+ try:
+ async for msg in self.ws_connection:
+ logger.info(f"receive: {msg}")
+
+ msg = Message(**json.loads(msg))
+ channel = self.channels.get(msg.topic)
+
+ if channel:
+ channel._trigger(msg.event, msg.payload, msg.ref)
+ except websockets.exceptions.ConnectionClosedError as e:
+ logger.error(
+ f"WebSocket connection closed with code: {e.code}, reason: {e.reason}"
+ )
+ if self.auto_reconnect:
+ logger.info("Initiating auto-reconnect sequence...")
+
+ await self._reconnect()
+ else:
+ logger.error("Auto-reconnect disabled, terminating connection")
+
+ async def _reconnect(self) -> None:
+ self.ws_connection = None
+ await self.connect()
+
+ if self.is_connected:
+ for topic, channel in self.channels.items():
+ logger.info(f"Rejoining channel after reconnection: {topic}")
+ await channel._rejoin()
+
+ async def connect(self) -> None:
+ """
+ Establishes a WebSocket connection with exponential backoff retry mechanism.
+
+ This method attempts to connect to the WebSocket server. If the connection fails,
+ it will retry with an exponential backoff strategy up to a maximum number of retries.
+
+ Returns:
+ None
+
+ Raises:
+ Exception: If unable to establish a connection after max_retries attempts.
+
+ Note:
+ - The initial backoff time and maximum retries are set during RealtimeClient initialization.
+ - The backoff time doubles after each failed attempt, up to a maximum of 60 seconds.
+ """
+
+ if self.is_connected:
+ logger.info("WebSocket connection already established")
+ return
+
+ retries = 0
+ backoff = self.initial_backoff
+
+ logger.info(f"Attempting to connect to WebSocket at {self.url}")
+
+ while retries < self.max_retries:
+ try:
+ ws = await connect(self.url)
+ self.ws_connection = ws
+ logger.info("WebSocket connection established successfully")
+ return await self._on_connect()
+ except Exception as e:
+ retries += 1
+ logger.error(f"Connection attempt failed: {str(e)}")
+
+ if retries >= self.max_retries or not self.auto_reconnect:
+ logger.error(
+ f"Connection failed permanently after {retries} attempts. Error: {str(e)}"
+ )
+ raise
+ else:
+ wait_time = backoff * (2 ** (retries - 1))
+ logger.info(
+ f"Retry {retries}/{self.max_retries}: Next attempt in {wait_time:.2f}s (backoff={backoff}s)"
+ )
+ await asyncio.sleep(wait_time)
+ backoff = min(backoff * 2, 60)
+
+ raise Exception(
+ f"Failed to establish WebSocket connection after {self.max_retries} attempts"
+ )
+
+ @deprecated
+ async def listen(self):
+ pass
+
+ async def _on_connect(self) -> None:
+ self._listen_task = asyncio.create_task(self._listen())
+ self._heartbeat_task = asyncio.create_task(self._heartbeat())
+
+ await self._flush_send_buffer()
+
+ async def _flush_send_buffer(self):
+ if self.is_connected and len(self.send_buffer) > 0:
+ for callback in self.send_buffer:
+ await callback()
+ self.send_buffer = []
+
+ async def close(self) -> None:
+ """
+ Close the WebSocket connection.
+
+ Returns:
+ None
+
+ Raises:
+ NotConnectedError: If the connection is not established when this method is called.
+ """
+
+ if self.ws_connection:
+ await self.ws_connection.close()
+
+ self.ws_connection = None
+
+ if self._listen_task:
+ self._listen_task.cancel()
+ self._listen_task = None
+
+ if self._heartbeat_task:
+ self._heartbeat_task.cancel()
+ self._heartbeat_task = None
+
+ async def _heartbeat(self) -> None:
+ if not self.ws_connection:
+ raise Exception("WebSocket connection not established")
+
+ while self.is_connected:
+ try:
+ data = dict(
+ topic=PHOENIX_CHANNEL,
+ event=ChannelEvents.heartbeat,
+ payload={},
+ ref=None,
+ )
+ await self.send(data)
+ await asyncio.sleep(max(self.hb_interval, 15))
+
+ except websockets.exceptions.ConnectionClosed as e:
+ logger.error(
+ f"Connection closed during heartbeat. Code: {e.code}, reason: {e.reason}"
+ )
+
+ if self.auto_reconnect:
+ logger.info("Heartbeat failed - initiating reconnection sequence")
+ await self._reconnect()
+ else:
+ logger.error("Heartbeat failed - auto-reconnect disabled")
+ break
+
+ def channel(
+ self, topic: str, params: Optional[RealtimeChannelOptions] = None
+ ) -> AsyncRealtimeChannel:
+ """
+ Initialize a channel and create a two-way association with the socket.
+
+ :param topic: The topic to subscribe to
+ :param params: Optional channel parameters
+ :return: AsyncRealtimeChannel instance
+ """
+ topic = f"realtime:{topic}"
+ chan = AsyncRealtimeChannel(self, topic, params)
+ self.channels[topic] = chan
+
+ return chan
+
+ def get_channels(self) -> List[AsyncRealtimeChannel]:
+ return list(self.channels.values())
+
+ def _remove_channel(self, channel: AsyncRealtimeChannel) -> None:
+ del self.channels[channel.topic]
+
+ async def remove_channel(self, channel: AsyncRealtimeChannel) -> None:
+ """
+ Unsubscribes and removes a channel from the socket
+ :param channel: Channel to remove
+ :return: None
+ """
+ if channel.topic in self.channels:
+ await self.channels[channel.topic].unsubscribe()
+
+ if len(self.channels) == 0:
+ await self.close()
+
+ async def remove_all_channels(self) -> None:
+ """
+ Unsubscribes and removes all channels from the socket
+ :return: None
+ """
+ for _, channel in self.channels.items():
+ await channel.unsubscribe()
+
+ await self.close()
+
+ def summary(self) -> None:
+ """
+ Prints a list of topics and event the socket is listening to
+ :return: None
+ """
+ for topic, channel in self.channels.items():
+ print(f"Topic: {topic} | Events: {[e for e, _ in channel.listeners]}]")
+
+ async def set_auth(self, token: Optional[str]) -> None:
+ """
+ Set the authentication token for the connection and update all joined channels.
+
+ This method updates the access token for the current connection and sends the new token
+ to all joined channels. This is useful for refreshing authentication or changing users.
+
+ Args:
+ token (Optional[str]): The new authentication token. Can be None to remove authentication.
+
+ Returns:
+ None
+ """
+ # No empty string tokens.
+ if isinstance(token, str) and len(token.strip()) == 0:
+ raise ValueError("Provide a valid jwt token")
+
+ if token:
+ parsed = None
+ try:
+ payload = token.split(".")[1] + "=="
+ parsed = json.loads(b64decode(payload).decode("utf-8"))
+ except Exception:
+ raise ValueError("InvalidJWTToken")
+
+ if parsed:
+ # Handle expired token if any.
+ if "exp" in parsed:
+ now = floor(datetime.now().timestamp())
+ valid = now - parsed["exp"] < 0
+ if not valid:
+ raise ValueError(
+ f"InvalidJWTToken: Invalid value for JWT claim 'exp' with value { parsed['exp'] }"
+ )
+ else:
+ raise ValueError("InvalidJWTToken: expected claim 'exp'")
+
+ self.access_token = token
+
+ for _, channel in self.channels.items():
+ if channel._joined_once and channel.is_joined:
+ await channel.push(ChannelEvents.access_token, {"access_token": token})
+
+ def _make_ref(self) -> str:
+ self.ref += 1
+ return f"{self.ref}"
+
+ async def send(self, message: Dict[str, Any]) -> None:
+ """
+ Send a message through the WebSocket connection.
+
+ This method serializes the given message dictionary to JSON,
+ and sends it through the WebSocket connection. If the connection
+ is not currently established, the message will be buffered and sent
+ once the connection is re-established.
+
+ Args:
+ message (Dict[str, Any]): The message to be sent, as a dictionary.
+
+ Returns:
+ None
+ """
+
+ message = json.dumps(message)
+ logger.info(f"send: {message}")
+
+ async def send_message():
+ await self.ws_connection.send(message)
+
+ if self.is_connected:
+ await send_message()
+ else:
+ self.send_buffer.append(send_message)
+
+ async def _leave_open_topic(self, topic: str):
+ dup_channels = [
+ ch
+ for ch in self.channels.values()
+ if ch.topic == topic and (ch.is_joined or ch.is_joining)
+ ]
+
+ for ch in dup_channels:
+ await ch.unsubscribe()
+
+ def endpoint_url(self) -> str:
+ parsed_url = urlparse(self.url)
+ query = urlencode({**self.params, "vsn": VSN}, doseq=True)
+ return urlunparse(
+ (
+ parsed_url.scheme,
+ parsed_url.netloc,
+ parsed_url.path,
+ parsed_url.params,
+ query,
+ parsed_url.fragment,
+ )
+ )
diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/presence.py b/.venv/lib/python3.12/site-packages/realtime/_async/presence.py
new file mode 100644
index 00000000..4063db4a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/_async/presence.py
@@ -0,0 +1,238 @@
+"""
+ Defines the RealtimePresence class and its dependencies.
+"""
+
+import logging
+from typing import Any, Callable, Dict, List, Optional, Union
+
+from ..types import (
+ PresenceDiff,
+ PresenceEvents,
+ PresenceOnJoinCallback,
+ PresenceOnLeaveCallback,
+ PresenceOpts,
+ RawPresenceDiff,
+ RawPresenceState,
+ RealtimePresenceState,
+)
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncRealtimePresence:
+ def __init__(self, channel, opts: Optional[PresenceOpts] = None):
+ self.channel = channel
+ self.state: RealtimePresenceState = {}
+ self.pending_diffs: List[RawPresenceDiff] = []
+ self.join_ref: Optional[str] = None
+ self.caller = {
+ "onJoin": lambda *args: None,
+ "onLeave": lambda *args: None,
+ "onSync": lambda: None,
+ "onAuthSuccess": lambda: None,
+ "onAuthFailure": lambda: None,
+ }
+ # Initialize with default events if not provided
+ events = (
+ opts.events
+ if opts
+ else PresenceEvents(state="presence_state", diff="presence_diff")
+ )
+ # Set up event listeners for presence state and diff
+ self.channel._on(events.state, callback=self._on_state_event)
+ self.channel._on(events.diff, callback=self._on_diff_event)
+ self.channel._on("phx_auth", callback=self._on_auth_event)
+
+ def on_join(self, callback: PresenceOnJoinCallback):
+ self.caller["onJoin"] = callback
+
+ def on_leave(self, callback: PresenceOnLeaveCallback):
+ self.caller["onLeave"] = callback
+
+ def on_sync(self, callback: Callable[[], None]):
+ self.caller["onSync"] = callback
+
+ def on_auth_success(self, callback: Callable[[], None]):
+ self.caller["onAuthSuccess"] = callback
+
+ def on_auth_failure(self, callback: Callable[[], None]):
+ self.caller["onAuthFailure"] = callback
+
+ def _on_state_event(self, payload: RawPresenceState, *args):
+ onJoin = self.caller["onJoin"]
+ onLeave = self.caller["onLeave"]
+ onSync = self.caller["onSync"]
+
+ self.join_ref = self.channel.join_ref
+ self.state = self._sync_state(self.state, payload, onJoin, onLeave)
+
+ for diff in self.pending_diffs:
+ self.state = self._sync_diff(self.state, diff, onJoin, onLeave)
+ self.pending_diffs = []
+ onSync()
+
+ def _on_diff_event(self, payload: Dict[str, Any], *args):
+ onJoin = self.caller["onJoin"]
+ onLeave = self.caller["onLeave"]
+ onSync = self.caller["onSync"]
+
+ if self.in_pending_sync_state():
+ self.pending_diffs.append(payload)
+ else:
+ self.state = self._sync_diff(self.state, payload, onJoin, onLeave)
+ onSync()
+
+ def _on_auth_event(self, payload: Dict[str, Any], *args):
+ if payload.get("status") == "ok":
+ self.caller["onAuthSuccess"]()
+ else:
+ self.caller["onAuthFailure"]()
+
+ def _sync_state(
+ self,
+ current_state: RealtimePresenceState,
+ new_state: Union[RawPresenceState, RealtimePresenceState],
+ onJoin: PresenceOnJoinCallback,
+ onLeave: PresenceOnLeaveCallback,
+ ) -> RealtimePresenceState:
+ state = {key: list(value) for key, value in current_state.items()}
+ transformed_state = AsyncRealtimePresence._transform_state(new_state)
+
+ joins: Dict[str, Any] = {}
+ leaves: Dict[str, Any] = {
+ k: v for k, v in state.items() if k not in transformed_state
+ }
+
+ for key, value in transformed_state.items():
+ current_presences = state.get(key, [])
+
+ if len(current_presences) > 0:
+ new_presence_refs = {presence.get("presence_ref") for presence in value}
+ cur_presence_refs = {
+ presence.get("presence_ref") for presence in current_presences
+ }
+
+ joined_presences = [
+ p for p in value if p.get("presence_ref") not in cur_presence_refs
+ ]
+ left_presences = [
+ p
+ for p in current_presences
+ if p.get("presence_ref") not in new_presence_refs
+ ]
+
+ if joined_presences:
+ joins[key] = joined_presences
+ if left_presences:
+ leaves[key] = left_presences
+ else:
+ joins[key] = value
+
+ return self._sync_diff(
+ state, {"joins": joins, "leaves": leaves}, onJoin, onLeave
+ )
+
+ def _sync_diff(
+ self,
+ state: RealtimePresenceState,
+ diff: Union[RawPresenceDiff, PresenceDiff],
+ onJoin: PresenceOnJoinCallback,
+ onLeave: PresenceOnLeaveCallback,
+ ) -> RealtimePresenceState:
+ joins = AsyncRealtimePresence._transform_state(diff.get("joins", {}))
+ leaves = AsyncRealtimePresence._transform_state(diff.get("leaves", {}))
+
+ for key, new_presences in joins.items():
+ current_presences = state.get(key, [])
+ state[key] = new_presences
+
+ if len(current_presences) > 0:
+ joined_presence_refs = {
+ presence.get("presence_ref") for presence in state.get(key)
+ }
+ cur_presences = list(
+ presence
+ for presence in current_presences
+ if presence.get("presence_ref") not in joined_presence_refs
+ )
+ state[key] = cur_presences + state[key]
+
+ onJoin(key, current_presences, new_presences)
+
+ for key, left_presences in leaves.items():
+ current_presences = state.get(key, [])
+
+ if len(current_presences) == 0:
+ break
+
+ presence_refs_to_remove = {
+ presence.get("presence_ref") for presence in left_presences
+ }
+ current_presences = [
+ presence
+ for presence in current_presences
+ if presence.get("presence_ref") not in presence_refs_to_remove
+ ]
+ state[key] = current_presences
+
+ onLeave(key, current_presences, left_presences)
+
+ if len(current_presences) == 0:
+ del state[key]
+
+ return state
+
+ def in_pending_sync_state(self) -> bool:
+ return self.join_ref is None or self.join_ref != self.channel.join_ref
+
+ @staticmethod
+ def _transform_state(
+ state: Union[RawPresenceState, RealtimePresenceState]
+ ) -> RealtimePresenceState:
+ """
+ Transform the raw presence state into a standardized RealtimePresenceState format.
+
+ This method processes the input state, which can be either a RawPresenceState or
+ an already transformed RealtimePresenceState. It handles the conversion of the
+ Phoenix channel's presence format to our internal representation.
+
+ Args:
+ state (Union[RawPresenceState, RealtimePresenceState[T]]): The presence state to transform.
+
+ Returns:
+ RealtimePresenceState[T]: The transformed presence state.
+
+ Example:
+ Input (RawPresenceState):
+ {
+ "user1": {
+ "metas": [
+ {"phx_ref": "ABC123", "user_id": "user1", "status": "online"},
+ {"phx_ref": "DEF456", "phx_ref_prev": "ABC123", "user_id": "user1", "status": "away"}
+ ]
+ },
+ "user2": [{"user_id": "user2", "status": "offline"}]
+ }
+
+ Output (RealtimePresenceState):
+ {
+ "user1": [
+ {"presence_ref": "ABC123", "user_id": "user1", "status": "online"},
+ {"presence_ref": "DEF456", "user_id": "user1", "status": "away"}
+ ],
+ "user2": [{"user_id": "user2", "status": "offline"}]
+ }
+ """
+ new_state: RealtimePresenceState = {}
+ for key, presences in state.items():
+ if isinstance(presences, dict) and "metas" in presences:
+ new_state[key] = []
+
+ for presence in presences["metas"]:
+ presence["presence_ref"] = presence.pop("phx_ref", None)
+ presence.pop("phx_ref_prev", None)
+ new_state[key].append(presence)
+
+ else:
+ new_state[key] = presences
+ return new_state
diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/push.py b/.venv/lib/python3.12/site-packages/realtime/_async/push.py
new file mode 100644
index 00000000..8e7a68f0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/_async/push.py
@@ -0,0 +1,122 @@
+import asyncio
+import logging
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
+
+from ..types import DEFAULT_TIMEOUT, Callback, _Hook
+
+if TYPE_CHECKING:
+ from .channel import AsyncRealtimeChannel
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncPush:
+ def __init__(
+ self,
+ channel: "AsyncRealtimeChannel",
+ event: str,
+ payload: Optional[Dict[str, Any]] = None,
+ timeout: int = DEFAULT_TIMEOUT,
+ ):
+ self.channel = channel
+ self.event = event
+ self.payload = payload or {}
+ self.timeout = timeout
+ self.rec_hooks: List[_Hook] = []
+ self.ref: Optional[str] = None
+ self.ref_event: Optional[str] = None
+ self.received_resp: Optional[Dict[str, Any]] = None
+ self.sent = False
+ self.timeout_task: Optional[asyncio.Task] = None
+
+ async def resend(self):
+ self._cancel_ref_event()
+ self.ref = ""
+ self.ref_event = None
+ self.received_resp = None
+ self.sent = False
+ await self.send()
+
+ async def send(self):
+ if self._has_received("timeout"):
+ return
+
+ self.start_timeout()
+ self.sent = True
+
+ try:
+ await self.channel.socket.send(
+ {
+ "topic": self.channel.topic,
+ "event": self.event,
+ "payload": self.payload,
+ "ref": self.ref,
+ "join_ref": self.channel.join_push.ref,
+ }
+ )
+ except Exception as e:
+ logger.error(f"send push failed: {e}")
+
+ def update_payload(self, payload: Dict[str, Any]):
+ self.payload = {**self.payload, **payload}
+
+ def receive(self, status: str, callback: Callback) -> "AsyncPush":
+ if self._has_received(status):
+ callback(self.received_resp.get("response", {}))
+
+ self.rec_hooks.append(_Hook(status, callback))
+ return self
+
+ def start_timeout(self):
+ if self.timeout_task:
+ return
+
+ self.ref = self.channel.socket._make_ref()
+ self.ref_event = self.channel._reply_event_name(self.ref)
+
+ def on_reply(payload, *args):
+ self._cancel_ref_event()
+ self._cancel_timeout()
+ self.received_resp = payload
+ self._match_receive(**self.received_resp)
+
+ self.channel._on(self.ref_event, on_reply)
+
+ async def timeout(self):
+ await asyncio.sleep(self.timeout)
+ self.trigger("timeout", {})
+
+ self.timeout_task = asyncio.create_task(timeout(self))
+
+ def trigger(self, status: str, response: Any):
+ if self.ref_event:
+ payload = {
+ "status": status,
+ "response": response,
+ }
+ self.channel._trigger(self.ref_event, payload)
+
+ def destroy(self):
+ self._cancel_ref_event()
+ self._cancel_timeout()
+
+ def _cancel_ref_event(self):
+ if not self.ref_event:
+ return
+
+ self.channel._off(self.ref_event, {})
+
+ def _cancel_timeout(self):
+ if not self.timeout_task:
+ return
+
+ self.timeout_task.cancel()
+ self.timeout_task = None
+
+ def _match_receive(self, status: str, response: Any):
+ for hook in self.rec_hooks:
+ if hook.status == status:
+ hook.callback(response)
+
+ def _has_received(self, status: str):
+ return self.received_resp and self.received_resp.get("status") == status
diff --git a/.venv/lib/python3.12/site-packages/realtime/_async/timer.py b/.venv/lib/python3.12/site-packages/realtime/_async/timer.py
new file mode 100644
index 00000000..bd18ca0b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/_async/timer.py
@@ -0,0 +1,40 @@
+import asyncio
+import logging
+from typing import Callable, Optional
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncTimer:
+ def __init__(self, callback: Callable, timer_calc: Callable[[int], int]):
+ self.callback = callback
+ self.timer_calc = timer_calc
+ self.timer: Optional[asyncio.Task] = None
+ self.tries: int = 0
+
+ def reset(self):
+ self.tries = 0
+ if self.timer and not self.timer.done():
+ self.timer.cancel()
+ self.timer = None
+ logger.debug(
+ "AsyncTimer has been reset and any scheduler tasks have been cancelled"
+ )
+
+ def schedule_timeout(self):
+ if self.timer:
+ self.timer.cancel()
+
+ self.tries += 1
+ delay = self.timer_calc(self.tries + 1)
+ logger.debug(f"Scheduling callback to run after {delay} seconds.")
+ self.timer = asyncio.create_task(self._run_timer(delay))
+
+ async def _run_timer(self, delay: float):
+ try:
+ await asyncio.sleep(delay)
+ await self.callback()
+ except asyncio.CancelledError:
+ logger.debug("AsyncTimer task was cancelled.")
+ except Exception as e:
+ logger.exception(f"Error in AsyncTimer callback: {e}")
diff --git a/.venv/lib/python3.12/site-packages/realtime/_sync/__init__.py b/.venv/lib/python3.12/site-packages/realtime/_sync/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/_sync/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/realtime/_sync/channel.py b/.venv/lib/python3.12/site-packages/realtime/_sync/channel.py
new file mode 100644
index 00000000..3512f9b7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/_sync/channel.py
@@ -0,0 +1,30 @@
+from __future__ import annotations
+
+from typing import TYPE_CHECKING, Optional
+
+from realtime.types import RealtimeChannelOptions
+
+if TYPE_CHECKING:
+ from .client import SyncRealtimeClient
+
+
+class SyncRealtimeChannel:
+ """
+ `Channel` is an abstraction for a topic listener for an existing socket connection.
+ Each Channel has its own topic and a list of event-callbacks that responds to messages.
+ Should only be instantiated through `connection.RealtimeClient().channel(topic)`.
+ """
+
+ def __init__(
+ self,
+ socket: SyncRealtimeClient,
+ topic: str,
+ params: Optional[RealtimeChannelOptions] = None,
+ ) -> None:
+ """
+ Initialize the Channel object.
+
+ :param socket: RealtimeClient object
+ :param topic: Topic that it subscribes to on the realtime server
+ :param params: Optional parameters for connection.
+ """
diff --git a/.venv/lib/python3.12/site-packages/realtime/_sync/client.py b/.venv/lib/python3.12/site-packages/realtime/_sync/client.py
new file mode 100644
index 00000000..634a5343
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/_sync/client.py
@@ -0,0 +1,71 @@
+from typing import Any, Dict, List, Optional
+
+from .channel import RealtimeChannelOptions, SyncRealtimeChannel
+
+NOT_IMPLEMENTED_MESSAGE = "This feature isn't available in the sync client. You can use the realtime feature in the async client only."
+
+
+class SyncRealtimeClient:
+ def __init__(
+ self,
+ url: str,
+ token: str,
+ auto_reconnect: bool = True,
+ params: Optional[Dict[str, Any]] = None,
+ hb_interval: int = 30,
+ max_retries: int = 5,
+ initial_backoff: float = 1.0,
+ ) -> None:
+ """
+ Initialize a RealtimeClient instance for WebSocket communication.
+
+ :param url: WebSocket URL of the Realtime server. Starts with `ws://` or `wss://`.
+ Also accepts default Supabase URL: `http://` or `https://`.
+ :param token: Authentication token for the WebSocket connection.
+ :param auto_reconnect: If True, automatically attempt to reconnect on disconnection. Defaults to False.
+ :param params: Optional parameters for the connection. Defaults to an empty dictionary.
+ :param hb_interval: Interval (in seconds) for sending heartbeat messages to keep the connection alive. Defaults to 30.
+ :param max_retries: Maximum number of reconnection attempts. Defaults to 5.
+ :param initial_backoff: Initial backoff time (in seconds) for reconnection attempts. Defaults to 1.0.
+ """
+
+ def channel(
+ self, topic: str, params: Optional[RealtimeChannelOptions] = None
+ ) -> SyncRealtimeChannel:
+ """
+ :param topic: Initializes a channel and creates a two-way association with the socket
+ :return: Channel
+ """
+ raise NotImplementedError(NOT_IMPLEMENTED_MESSAGE)
+
+ def get_channels(self) -> List[SyncRealtimeChannel]:
+ raise NotImplementedError(NOT_IMPLEMENTED_MESSAGE)
+
+ def remove_channel(self, channel: SyncRealtimeChannel) -> None:
+ """
+ Unsubscribes and removes a channel from the socket
+ :param channel: Channel to remove
+ :return: None
+ """
+ raise NotImplementedError(NOT_IMPLEMENTED_MESSAGE)
+
+ def remove_all_channels(self) -> None:
+ """
+ Unsubscribes and removes all channels from the socket
+ :return: None
+ """
+ raise NotImplementedError(NOT_IMPLEMENTED_MESSAGE)
+
+ def set_auth(self, token: Optional[str]) -> None:
+ """
+ Set the authentication token for the connection and update all joined channels.
+
+ This method updates the access token for the current connection and sends the new token
+ to all joined channels. This is useful for refreshing authentication or changing users.
+
+ Args:
+ token (Optional[str]): The new authentication token. Can be None to remove authentication.
+
+ Returns:
+ None
+ """
diff --git a/.venv/lib/python3.12/site-packages/realtime/_sync/presence.py b/.venv/lib/python3.12/site-packages/realtime/_sync/presence.py
new file mode 100644
index 00000000..db02a1ce
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/_sync/presence.py
@@ -0,0 +1,12 @@
+"""
+ Defines the RealtimePresence class and its dependencies.
+"""
+
+from typing import Optional
+
+from ..types import PresenceOpts
+
+
+class SyncRealtimePresence:
+ def __init__(self, channel, opts: Optional[PresenceOpts] = None):
+ pass
diff --git a/.venv/lib/python3.12/site-packages/realtime/exceptions.py b/.venv/lib/python3.12/site-packages/realtime/exceptions.py
new file mode 100644
index 00000000..9fec53d2
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/exceptions.py
@@ -0,0 +1,22 @@
+class NotConnectedError(Exception):
+ """
+ Raised when operations requiring a connection are executed when socket is not connected
+ """
+
+ def __init__(self, func_name: str):
+ self.offending_func_name: str = func_name
+
+ def __str__(self):
+ return f"A WS connection has not been established. Ensure you call RealtimeClient.connect() before calling RealtimeClient.{self.offending_func_name}()"
+
+
+class AuthorizationError(Exception):
+ """
+ Raised when there is an authorization failure for private channels
+ """
+
+ def __init__(self, message: str = None):
+ self.message: str = message or "Authorization failed for private channel"
+
+ def __str__(self):
+ return self.message
diff --git a/.venv/lib/python3.12/site-packages/realtime/message.py b/.venv/lib/python3.12/site-packages/realtime/message.py
new file mode 100644
index 00000000..f4ed22fd
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/message.py
@@ -0,0 +1,26 @@
+from dataclasses import dataclass
+from typing import Any, Dict, Optional
+
+
+@dataclass
+class Message:
+ """
+ Dataclass abstraction for message
+ """
+
+ event: str
+ payload: Dict[str, Any]
+ ref: Any
+ topic: str
+ join_ref: Optional[str] = None
+
+ def __hash__(self):
+ return hash(
+ (
+ self.event,
+ tuple(list(self.payload.values())),
+ self.ref,
+ self.topic,
+ self.join_ref,
+ )
+ )
diff --git a/.venv/lib/python3.12/site-packages/realtime/transformers.py b/.venv/lib/python3.12/site-packages/realtime/transformers.py
new file mode 100644
index 00000000..bd541377
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/transformers.py
@@ -0,0 +1,9 @@
+import re
+
+
+def http_endpoint_url(socket_url: str) -> str:
+ url = re.sub(r"^ws", "http", socket_url, flags=re.IGNORECASE)
+ url = re.sub(
+ r"(\/socket\/websocket|\/socket|\/websocket)\/?$", "", url, flags=re.IGNORECASE
+ )
+ return re.sub(r"\/+$", "", url)
diff --git a/.venv/lib/python3.12/site-packages/realtime/types.py b/.venv/lib/python3.12/site-packages/realtime/types.py
new file mode 100644
index 00000000..e40159ae
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/types.py
@@ -0,0 +1,155 @@
+from enum import Enum
+from typing import Any, Callable, Dict, List, Literal, Optional, TypedDict, TypeVar
+
+from typing_extensions import ParamSpec
+
+# Constants
+DEFAULT_TIMEOUT = 10
+PHOENIX_CHANNEL = "phoenix"
+VSN = "1.0.0"
+DEFAULT_HEARTBEAT_INTERVAL = 25
+
+# Type variables and custom types
+T_ParamSpec = ParamSpec("T_ParamSpec")
+T_Retval = TypeVar("T_Retval")
+Callback = Callable[T_ParamSpec, T_Retval]
+
+
+# Enums
+class ChannelEvents(str, Enum):
+ """
+ ChannelEvents are a bunch of constant strings that are defined according to
+ what the Phoenix realtime server expects.
+ """
+
+ close = "phx_close"
+ error = "phx_error"
+ join = "phx_join"
+ reply = "phx_reply"
+ leave = "phx_leave"
+ heartbeat = "heartbeat"
+ access_token = "access_token"
+ broadcast = "broadcast"
+ presence = "presence"
+
+
+class ChannelStates(str, Enum):
+ JOINED = "joined"
+ CLOSED = "closed"
+ ERRORED = "errored"
+ JOINING = "joining"
+ LEAVING = "leaving"
+
+
+class RealtimeSubscribeStates(str, Enum):
+ SUBSCRIBED = "SUBSCRIBED"
+ TIMED_OUT = "TIMED_OUT"
+ CLOSED = "CLOSED"
+ CHANNEL_ERROR = "CHANNEL_ERROR"
+
+
+class RealtimePresenceListenEvents(str, Enum):
+ SYNC = "SYNC"
+ JOIN = "JOIN"
+ LEAVE = "LEAVE"
+
+
+# Literals
+RealtimePostgresChangesListenEvent = Literal["*", "INSERT", "UPDATE", "DELETE"]
+
+
+# Classes
+class Binding:
+ def __init__(
+ self,
+ type: str,
+ filter: Dict[str, Any],
+ callback: Callback,
+ id: Optional[str] = None,
+ ):
+ self.type = type
+ self.filter = filter
+ self.callback = callback
+ self.id = id
+
+
+class _Hook:
+ def __init__(self, status: str, callback: Callback):
+ self.status = status
+ self.callback = callback
+
+
+class Presence(Dict[str, Any]):
+ presence_ref: str
+
+
+class PresenceEvents:
+ def __init__(self, state: str, diff: str):
+ self.state = state
+ self.diff = diff
+
+
+class PresenceOpts:
+ def __init__(self, events: PresenceEvents):
+ self.events = events
+
+
+# TypedDicts
+class RealtimeChannelBroadcastConfig(TypedDict):
+ ack: bool
+ self: bool
+
+
+class RealtimeChannelPresenceConfig(TypedDict):
+ key: str
+
+
+class RealtimeChannelConfig(TypedDict):
+ broadcast: RealtimeChannelBroadcastConfig
+ presence: RealtimeChannelPresenceConfig
+ private: bool
+
+
+class RealtimeChannelOptions(TypedDict):
+ config: RealtimeChannelConfig
+
+
+class PresenceMeta(TypedDict, total=False):
+ phx_ref: str
+ phx_ref_prev: str
+
+
+class RawPresenceStateEntry(TypedDict):
+ metas: List[PresenceMeta]
+
+
+# Custom types
+PresenceOnJoinCallback = Callable[[str, List[Any], List[Any]], None]
+PresenceOnLeaveCallback = Callable[[str, List[Any], List[Any]], None]
+RealtimePresenceState = Dict[str, List[Presence]]
+RawPresenceState = Dict[str, RawPresenceStateEntry]
+
+
+class RawPresenceDiff(TypedDict):
+ joins: RawPresenceState
+ leaves: RawPresenceState
+
+
+class PresenceDiff(TypedDict):
+ joins: RealtimePresenceState
+ leaves: RealtimePresenceState
+
+
+# Specific payload types
+class RealtimePresenceJoinPayload(Dict[str, Any]):
+ event: Literal[RealtimePresenceListenEvents.JOIN]
+ key: str
+ current_presences: List[Presence]
+ new_presences: List[Presence]
+
+
+class RealtimePresenceLeavePayload(Dict[str, Any]):
+ event: Literal[RealtimePresenceListenEvents.LEAVE]
+ key: str
+ current_presences: List[Presence]
+ left_presences: List[Presence]
diff --git a/.venv/lib/python3.12/site-packages/realtime/utils.py b/.venv/lib/python3.12/site-packages/realtime/utils.py
new file mode 100644
index 00000000..67b22018
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/utils.py
@@ -0,0 +1,5 @@
+from urllib.parse import urlparse
+
+
+def is_ws_url(url: str) -> bool:
+ return urlparse(url).scheme in {"wss", "ws", "http", "https"}
diff --git a/.venv/lib/python3.12/site-packages/realtime/version.py b/.venv/lib/python3.12/site-packages/realtime/version.py
new file mode 100644
index 00000000..91ab439a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/realtime/version.py
@@ -0,0 +1 @@
+__version__ = "2.4.1" # {x-release-please-version}