aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/asgiref/timeout.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/asgiref/timeout.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/asgiref/timeout.py')
-rw-r--r--.venv/lib/python3.12/site-packages/asgiref/timeout.py118
1 files changed, 118 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/asgiref/timeout.py b/.venv/lib/python3.12/site-packages/asgiref/timeout.py
new file mode 100644
index 00000000..fd5381d0
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/asgiref/timeout.py
@@ -0,0 +1,118 @@
+# This code is originally sourced from the aio-libs project "async_timeout",
+# under the Apache 2.0 license. You may see the original project at
+# https://github.com/aio-libs/async-timeout
+
+# It is vendored here to reduce chain-dependencies on this library, and
+# modified slightly to remove some features we don't use.
+
+
+import asyncio
+import warnings
+from types import TracebackType
+from typing import Any # noqa
+from typing import Optional, Type
+
+
+class timeout:
+ """timeout context manager.
+
+ Useful in cases when you want to apply timeout logic around block
+ of code or in cases when asyncio.wait_for is not suitable. For example:
+
+ >>> with timeout(0.001):
+ ... async with aiohttp.get('https://github.com') as r:
+ ... await r.text()
+
+
+ timeout - value in seconds or None to disable timeout logic
+ loop - asyncio compatible event loop
+ """
+
+ def __init__(
+ self,
+ timeout: Optional[float],
+ *,
+ loop: Optional[asyncio.AbstractEventLoop] = None,
+ ) -> None:
+ self._timeout = timeout
+ if loop is None:
+ loop = asyncio.get_running_loop()
+ else:
+ warnings.warn(
+ """The loop argument to timeout() is deprecated.""", DeprecationWarning
+ )
+ self._loop = loop
+ self._task = None # type: Optional[asyncio.Task[Any]]
+ self._cancelled = False
+ self._cancel_handler = None # type: Optional[asyncio.Handle]
+ self._cancel_at = None # type: Optional[float]
+
+ def __enter__(self) -> "timeout":
+ return self._do_enter()
+
+ def __exit__(
+ self,
+ exc_type: Type[BaseException],
+ exc_val: BaseException,
+ exc_tb: TracebackType,
+ ) -> Optional[bool]:
+ self._do_exit(exc_type)
+ return None
+
+ async def __aenter__(self) -> "timeout":
+ return self._do_enter()
+
+ async def __aexit__(
+ self,
+ exc_type: Type[BaseException],
+ exc_val: BaseException,
+ exc_tb: TracebackType,
+ ) -> None:
+ self._do_exit(exc_type)
+
+ @property
+ def expired(self) -> bool:
+ return self._cancelled
+
+ @property
+ def remaining(self) -> Optional[float]:
+ if self._cancel_at is not None:
+ return max(self._cancel_at - self._loop.time(), 0.0)
+ else:
+ return None
+
+ def _do_enter(self) -> "timeout":
+ # Support Tornado 5- without timeout
+ # Details: https://github.com/python/asyncio/issues/392
+ if self._timeout is None:
+ return self
+
+ self._task = asyncio.current_task(self._loop)
+ if self._task is None:
+ raise RuntimeError(
+ "Timeout context manager should be used " "inside a task"
+ )
+
+ if self._timeout <= 0:
+ self._loop.call_soon(self._cancel_task)
+ return self
+
+ self._cancel_at = self._loop.time() + self._timeout
+ self._cancel_handler = self._loop.call_at(self._cancel_at, self._cancel_task)
+ return self
+
+ def _do_exit(self, exc_type: Type[BaseException]) -> None:
+ if exc_type is asyncio.CancelledError and self._cancelled:
+ self._cancel_handler = None
+ self._task = None
+ raise asyncio.TimeoutError
+ if self._timeout is not None and self._cancel_handler is not None:
+ self._cancel_handler.cancel()
+ self._cancel_handler = None
+ self._task = None
+ return None
+
+ def _cancel_task(self) -> None:
+ if self._task is not None:
+ self._task.cancel()
+ self._cancelled = True