about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/aiohttp/compression_utils.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/aiohttp/compression_utils.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiohttp/compression_utils.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiohttp/compression_utils.py173
1 files changed, 173 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiohttp/compression_utils.py b/.venv/lib/python3.12/site-packages/aiohttp/compression_utils.py
new file mode 100644
index 00000000..ebe8857f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiohttp/compression_utils.py
@@ -0,0 +1,173 @@
+import asyncio
+import zlib
+from concurrent.futures import Executor
+from typing import Optional, cast
+
+try:
+    try:
+        import brotlicffi as brotli
+    except ImportError:
+        import brotli
+
+    HAS_BROTLI = True
+except ImportError:  # pragma: no cover
+    HAS_BROTLI = False
+
+MAX_SYNC_CHUNK_SIZE = 1024
+
+
+def encoding_to_mode(
+    encoding: Optional[str] = None,
+    suppress_deflate_header: bool = False,
+) -> int:
+    if encoding == "gzip":
+        return 16 + zlib.MAX_WBITS
+
+    return -zlib.MAX_WBITS if suppress_deflate_header else zlib.MAX_WBITS
+
+
+class ZlibBaseHandler:
+    def __init__(
+        self,
+        mode: int,
+        executor: Optional[Executor] = None,
+        max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE,
+    ):
+        self._mode = mode
+        self._executor = executor
+        self._max_sync_chunk_size = max_sync_chunk_size
+
+
+class ZLibCompressor(ZlibBaseHandler):
+    def __init__(
+        self,
+        encoding: Optional[str] = None,
+        suppress_deflate_header: bool = False,
+        level: Optional[int] = None,
+        wbits: Optional[int] = None,
+        strategy: int = zlib.Z_DEFAULT_STRATEGY,
+        executor: Optional[Executor] = None,
+        max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE,
+    ):
+        super().__init__(
+            mode=(
+                encoding_to_mode(encoding, suppress_deflate_header)
+                if wbits is None
+                else wbits
+            ),
+            executor=executor,
+            max_sync_chunk_size=max_sync_chunk_size,
+        )
+        if level is None:
+            self._compressor = zlib.compressobj(wbits=self._mode, strategy=strategy)
+        else:
+            self._compressor = zlib.compressobj(
+                wbits=self._mode, strategy=strategy, level=level
+            )
+        self._compress_lock = asyncio.Lock()
+
+    def compress_sync(self, data: bytes) -> bytes:
+        return self._compressor.compress(data)
+
+    async def compress(self, data: bytes) -> bytes:
+        """Compress the data and returned the compressed bytes.
+
+        Note that flush() must be called after the last call to compress()
+
+        If the data size is large than the max_sync_chunk_size, the compression
+        will be done in the executor. Otherwise, the compression will be done
+        in the event loop.
+        """
+        async with self._compress_lock:
+            # To ensure the stream is consistent in the event
+            # there are multiple writers, we need to lock
+            # the compressor so that only one writer can
+            # compress at a time.
+            if (
+                self._max_sync_chunk_size is not None
+                and len(data) > self._max_sync_chunk_size
+            ):
+                return await asyncio.get_running_loop().run_in_executor(
+                    self._executor, self._compressor.compress, data
+                )
+            return self.compress_sync(data)
+
+    def flush(self, mode: int = zlib.Z_FINISH) -> bytes:
+        return self._compressor.flush(mode)
+
+
+class ZLibDecompressor(ZlibBaseHandler):
+    def __init__(
+        self,
+        encoding: Optional[str] = None,
+        suppress_deflate_header: bool = False,
+        executor: Optional[Executor] = None,
+        max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE,
+    ):
+        super().__init__(
+            mode=encoding_to_mode(encoding, suppress_deflate_header),
+            executor=executor,
+            max_sync_chunk_size=max_sync_chunk_size,
+        )
+        self._decompressor = zlib.decompressobj(wbits=self._mode)
+
+    def decompress_sync(self, data: bytes, max_length: int = 0) -> bytes:
+        return self._decompressor.decompress(data, max_length)
+
+    async def decompress(self, data: bytes, max_length: int = 0) -> bytes:
+        """Decompress the data and return the decompressed bytes.
+
+        If the data size is large than the max_sync_chunk_size, the decompression
+        will be done in the executor. Otherwise, the decompression will be done
+        in the event loop.
+        """
+        if (
+            self._max_sync_chunk_size is not None
+            and len(data) > self._max_sync_chunk_size
+        ):
+            return await asyncio.get_running_loop().run_in_executor(
+                self._executor, self._decompressor.decompress, data, max_length
+            )
+        return self.decompress_sync(data, max_length)
+
+    def flush(self, length: int = 0) -> bytes:
+        return (
+            self._decompressor.flush(length)
+            if length > 0
+            else self._decompressor.flush()
+        )
+
+    @property
+    def eof(self) -> bool:
+        return self._decompressor.eof
+
+    @property
+    def unconsumed_tail(self) -> bytes:
+        return self._decompressor.unconsumed_tail
+
+    @property
+    def unused_data(self) -> bytes:
+        return self._decompressor.unused_data
+
+
+class BrotliDecompressor:
+    # Supports both 'brotlipy' and 'Brotli' packages
+    # since they share an import name. The top branches
+    # are for 'brotlipy' and bottom branches for 'Brotli'
+    def __init__(self) -> None:
+        if not HAS_BROTLI:
+            raise RuntimeError(
+                "The brotli decompression is not available. "
+                "Please install `Brotli` module"
+            )
+        self._obj = brotli.Decompressor()
+
+    def decompress_sync(self, data: bytes) -> bytes:
+        if hasattr(self._obj, "decompress"):
+            return cast(bytes, self._obj.decompress(data))
+        return cast(bytes, self._obj.process(data))
+
+    def flush(self) -> bytes:
+        if hasattr(self._obj, "flush"):
+            return cast(bytes, self._obj.flush())
+        return b""