about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/aiohttp/_websocket/helpers.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiohttp/_websocket/helpers.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiohttp/_websocket/helpers.py147
1 files changed, 147 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiohttp/_websocket/helpers.py b/.venv/lib/python3.12/site-packages/aiohttp/_websocket/helpers.py
new file mode 100644
index 00000000..0bb58df9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiohttp/_websocket/helpers.py
@@ -0,0 +1,147 @@
+"""Helpers for WebSocket protocol versions 13 and 8."""
+
+import functools
+import re
+from struct import Struct
+from typing import TYPE_CHECKING, Final, List, Optional, Pattern, Tuple
+
+from ..helpers import NO_EXTENSIONS
+from .models import WSHandshakeError
+
+UNPACK_LEN3 = Struct("!Q").unpack_from
+UNPACK_CLOSE_CODE = Struct("!H").unpack
+PACK_LEN1 = Struct("!BB").pack
+PACK_LEN2 = Struct("!BBH").pack
+PACK_LEN3 = Struct("!BBQ").pack
+PACK_CLOSE_CODE = Struct("!H").pack
+PACK_RANDBITS = Struct("!L").pack
+MSG_SIZE: Final[int] = 2**14
+MASK_LEN: Final[int] = 4
+
+WS_KEY: Final[bytes] = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
+
+
+# Used by _websocket_mask_python
+@functools.lru_cache
+def _xor_table() -> List[bytes]:
+    return [bytes(a ^ b for a in range(256)) for b in range(256)]
+
+
+def _websocket_mask_python(mask: bytes, data: bytearray) -> None:
+    """Websocket masking function.
+
+    `mask` is a `bytes` object of length 4; `data` is a `bytearray`
+    object of any length. The contents of `data` are masked with `mask`,
+    as specified in section 5.3 of RFC 6455.
+
+    Note that this function mutates the `data` argument.
+
+    This pure-python implementation may be replaced by an optimized
+    version when available.
+
+    """
+    assert isinstance(data, bytearray), data
+    assert len(mask) == 4, mask
+
+    if data:
+        _XOR_TABLE = _xor_table()
+        a, b, c, d = (_XOR_TABLE[n] for n in mask)
+        data[::4] = data[::4].translate(a)
+        data[1::4] = data[1::4].translate(b)
+        data[2::4] = data[2::4].translate(c)
+        data[3::4] = data[3::4].translate(d)
+
+
+if TYPE_CHECKING or NO_EXTENSIONS:  # pragma: no cover
+    websocket_mask = _websocket_mask_python
+else:
+    try:
+        from .mask import _websocket_mask_cython  # type: ignore[import-not-found]
+
+        websocket_mask = _websocket_mask_cython
+    except ImportError:  # pragma: no cover
+        websocket_mask = _websocket_mask_python
+
+
+_WS_EXT_RE: Final[Pattern[str]] = re.compile(
+    r"^(?:;\s*(?:"
+    r"(server_no_context_takeover)|"
+    r"(client_no_context_takeover)|"
+    r"(server_max_window_bits(?:=(\d+))?)|"
+    r"(client_max_window_bits(?:=(\d+))?)))*$"
+)
+
+_WS_EXT_RE_SPLIT: Final[Pattern[str]] = re.compile(r"permessage-deflate([^,]+)?")
+
+
+def ws_ext_parse(extstr: Optional[str], isserver: bool = False) -> Tuple[int, bool]:
+    if not extstr:
+        return 0, False
+
+    compress = 0
+    notakeover = False
+    for ext in _WS_EXT_RE_SPLIT.finditer(extstr):
+        defext = ext.group(1)
+        # Return compress = 15 when get `permessage-deflate`
+        if not defext:
+            compress = 15
+            break
+        match = _WS_EXT_RE.match(defext)
+        if match:
+            compress = 15
+            if isserver:
+                # Server never fail to detect compress handshake.
+                # Server does not need to send max wbit to client
+                if match.group(4):
+                    compress = int(match.group(4))
+                    # Group3 must match if group4 matches
+                    # Compress wbit 8 does not support in zlib
+                    # If compress level not support,
+                    # CONTINUE to next extension
+                    if compress > 15 or compress < 9:
+                        compress = 0
+                        continue
+                if match.group(1):
+                    notakeover = True
+                # Ignore regex group 5 & 6 for client_max_window_bits
+                break
+            else:
+                if match.group(6):
+                    compress = int(match.group(6))
+                    # Group5 must match if group6 matches
+                    # Compress wbit 8 does not support in zlib
+                    # If compress level not support,
+                    # FAIL the parse progress
+                    if compress > 15 or compress < 9:
+                        raise WSHandshakeError("Invalid window size")
+                if match.group(2):
+                    notakeover = True
+                # Ignore regex group 5 & 6 for client_max_window_bits
+                break
+        # Return Fail if client side and not match
+        elif not isserver:
+            raise WSHandshakeError("Extension for deflate not supported" + ext.group(1))
+
+    return compress, notakeover
+
+
+def ws_ext_gen(
+    compress: int = 15, isserver: bool = False, server_notakeover: bool = False
+) -> str:
+    # client_notakeover=False not used for server
+    # compress wbit 8 does not support in zlib
+    if compress < 9 or compress > 15:
+        raise ValueError(
+            "Compress wbits must between 9 and 15, zlib does not support wbits=8"
+        )
+    enabledext = ["permessage-deflate"]
+    if not isserver:
+        enabledext.append("client_max_window_bits")
+
+    if compress < 15:
+        enabledext.append("server_max_window_bits=" + str(compress))
+    if server_notakeover:
+        enabledext.append("server_no_context_takeover")
+    # if client_notakeover:
+    #     enabledext.append('client_no_context_takeover')
+    return "; ".join(enabledext)