about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/asgiref/timeout.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/asgiref/timeout.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/asgiref/timeout.py')
-rw-r--r--.venv/lib/python3.12/site-packages/asgiref/timeout.py118
1 files changed, 118 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/asgiref/timeout.py b/.venv/lib/python3.12/site-packages/asgiref/timeout.py
new file mode 100644
index 00000000..fd5381d0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/asgiref/timeout.py
@@ -0,0 +1,118 @@
+# This code is originally sourced from the aio-libs project "async_timeout",
+# under the Apache 2.0 license. You may see the original project at
+# https://github.com/aio-libs/async-timeout
+
+# It is vendored here to reduce chain-dependencies on this library, and
+# modified slightly to remove some features we don't use.
+
+
+import asyncio
+import warnings
+from types import TracebackType
+from typing import Any  # noqa
+from typing import Optional, Type
+
+
+class timeout:
+    """timeout context manager.
+
+    Useful in cases when you want to apply timeout logic around block
+    of code or in cases when asyncio.wait_for is not suitable. For example:
+
+    >>> with timeout(0.001):
+    ...     async with aiohttp.get('https://github.com') as r:
+    ...         await r.text()
+
+
+    timeout - value in seconds or None to disable timeout logic
+    loop - asyncio compatible event loop
+    """
+
+    def __init__(
+        self,
+        timeout: Optional[float],
+        *,
+        loop: Optional[asyncio.AbstractEventLoop] = None,
+    ) -> None:
+        self._timeout = timeout
+        if loop is None:
+            loop = asyncio.get_running_loop()
+        else:
+            warnings.warn(
+                """The loop argument to timeout() is deprecated.""", DeprecationWarning
+            )
+        self._loop = loop
+        self._task = None  # type: Optional[asyncio.Task[Any]]
+        self._cancelled = False
+        self._cancel_handler = None  # type: Optional[asyncio.Handle]
+        self._cancel_at = None  # type: Optional[float]
+
+    def __enter__(self) -> "timeout":
+        return self._do_enter()
+
+    def __exit__(
+        self,
+        exc_type: Type[BaseException],
+        exc_val: BaseException,
+        exc_tb: TracebackType,
+    ) -> Optional[bool]:
+        self._do_exit(exc_type)
+        return None
+
+    async def __aenter__(self) -> "timeout":
+        return self._do_enter()
+
+    async def __aexit__(
+        self,
+        exc_type: Type[BaseException],
+        exc_val: BaseException,
+        exc_tb: TracebackType,
+    ) -> None:
+        self._do_exit(exc_type)
+
+    @property
+    def expired(self) -> bool:
+        return self._cancelled
+
+    @property
+    def remaining(self) -> Optional[float]:
+        if self._cancel_at is not None:
+            return max(self._cancel_at - self._loop.time(), 0.0)
+        else:
+            return None
+
+    def _do_enter(self) -> "timeout":
+        # Support Tornado 5- without timeout
+        # Details: https://github.com/python/asyncio/issues/392
+        if self._timeout is None:
+            return self
+
+        self._task = asyncio.current_task(self._loop)
+        if self._task is None:
+            raise RuntimeError(
+                "Timeout context manager should be used " "inside a task"
+            )
+
+        if self._timeout <= 0:
+            self._loop.call_soon(self._cancel_task)
+            return self
+
+        self._cancel_at = self._loop.time() + self._timeout
+        self._cancel_handler = self._loop.call_at(self._cancel_at, self._cancel_task)
+        return self
+
+    def _do_exit(self, exc_type: Type[BaseException]) -> None:
+        if exc_type is asyncio.CancelledError and self._cancelled:
+            self._cancel_handler = None
+            self._task = None
+            raise asyncio.TimeoutError
+        if self._timeout is not None and self._cancel_handler is not None:
+            self._cancel_handler.cancel()
+            self._cancel_handler = None
+        self._task = None
+        return None
+
+    def _cancel_task(self) -> None:
+        if self._task is not None:
+            self._task.cancel()
+            self._cancelled = True