about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py262
1 files changed, 262 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py b/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py
new file mode 100644
index 00000000..f68ea846
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py
@@ -0,0 +1,262 @@
+"""Utilities for asynchronous iteration."""
+from __future__ import annotations
+from types import TracebackType
+
+import warnings
+import functools
+from typing import (
+    TYPE_CHECKING,
+    AsyncContextManager,
+    AsyncGenerator,
+    AsyncIterable,
+    Awaitable,
+    Callable,
+    Type,
+    TypeVar,
+    AsyncIterator,
+    Any,
+)
+
+if TYPE_CHECKING:
+    from typing_extensions import ParamSpec
+
+    P = ParamSpec("P")
+
+from contextlib import AsyncExitStack
+
+__all__ = [
+    "aiter",
+    "anext",
+    "await_",
+    "async_",
+    "is_async_iterable",
+    "assert_async_iterable",
+    "is_async_iterator",
+    "assert_async_iterator",
+    "AsyncIteratorContext",
+    "aitercontext",
+    "AsyncExitStack",
+]
+
+
+# Magic method shorcuts
+
+
+def aiter(obj: AsyncIterable[T]) -> AsyncIterator[T]:
+    """Access aiter magic method."""
+    assert_async_iterable(obj)
+    return obj.__aiter__()
+
+
+def anext(obj: AsyncIterator[T]) -> Awaitable[T]:
+    """Access anext magic method."""
+    assert_async_iterator(obj)
+    return obj.__anext__()
+
+
+# Async / await helper functions
+
+
+async def await_(obj: Awaitable[T]) -> T:
+    """Identity coroutine function."""
+    return await obj
+
+
+def async_(fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
+    """Wrap the given function into a coroutine function."""
+
+    @functools.wraps(fn)
+    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
+        return await fn(*args, **kwargs)
+
+    return wrapper
+
+
+# Iterability helpers
+
+
+def is_async_iterable(obj: object) -> bool:
+    """Check if the given object is an asynchronous iterable."""
+    return hasattr(obj, "__aiter__")
+
+
+def assert_async_iterable(obj: object) -> None:
+    """Raise a TypeError if the given object is not an
+    asynchronous iterable.
+    """
+    if not is_async_iterable(obj):
+        raise TypeError(f"{type(obj).__name__!r} object is not async iterable")
+
+
+def is_async_iterator(obj: object) -> bool:
+    """Check if the given object is an asynchronous iterator."""
+    return hasattr(obj, "__anext__")
+
+
+def assert_async_iterator(obj: object) -> None:
+    """Raise a TypeError if the given object is not an
+    asynchronous iterator.
+    """
+    if not is_async_iterator(obj):
+        raise TypeError(f"{type(obj).__name__!r} object is not an async iterator")
+
+
+# Async iterator context
+
+T = TypeVar("T")
+Self = TypeVar("Self", bound="AsyncIteratorContext[Any]")
+
+
+class AsyncIteratorContext(AsyncIterator[T], AsyncContextManager[Any]):
+    """Asynchronous iterator with context management.
+
+    The context management makes sure the aclose asynchronous method
+    of the corresponding iterator has run before it exits. It also issues
+    warnings and RuntimeError if it is used incorrectly.
+
+    Correct usage::
+
+        ait = some_asynchronous_iterable()
+        async with AsyncIteratorContext(ait) as safe_ait:
+            async for item in safe_ait:
+                <block>
+
+    It is nonetheless not meant to use directly.
+    Prefer aitercontext helper instead.
+    """
+
+    _STANDBY = "STANDBY"
+    _RUNNING = "RUNNING"
+    _FINISHED = "FINISHED"
+
+    def __init__(self, aiterator: AsyncIterator[T]):
+        """Initialize with an asynchrnous iterator."""
+        assert_async_iterator(aiterator)
+        if isinstance(aiterator, AsyncIteratorContext):
+            raise TypeError(f"{aiterator!r} is already an AsyncIteratorContext")
+        self._state = self._STANDBY
+        self._aiterator = aiterator
+
+    def __aiter__(self: Self) -> Self:
+        return self
+
+    def __anext__(self) -> Awaitable[T]:
+        if self._state == self._FINISHED:
+            raise RuntimeError(
+                f"{type(self).__name__} is closed and cannot be iterated"
+            )
+        if self._state == self._STANDBY:
+            warnings.warn(
+                f"{type(self).__name__} is iterated outside of its context",
+                stacklevel=2,
+            )
+        return anext(self._aiterator)
+
+    async def __aenter__(self: Self) -> Self:
+        if self._state == self._RUNNING:
+            raise RuntimeError(f"{type(self).__name__} has already been entered")
+        if self._state == self._FINISHED:
+            raise RuntimeError(
+                f"{type(self).__name__} is closed and cannot be iterated"
+            )
+        self._state = self._RUNNING
+        return self
+
+    async def __aexit__(
+        self,
+        typ: Type[BaseException] | None,
+        value: BaseException | None,
+        traceback: TracebackType | None,
+    ) -> bool:
+        try:
+            if self._state == self._FINISHED:
+                return False
+            try:
+                # No exception to throw
+                if typ is None:
+                    return False
+
+                # Prevent GeneratorExit from being silenced
+                if typ is GeneratorExit:
+                    return False
+
+                # No method to throw
+                if not hasattr(self._aiterator, "athrow"):
+                    return False
+
+                # No frame to throw
+                if not getattr(self._aiterator, "ag_frame", True):
+                    return False
+
+                # Cannot throw at the moment
+                if getattr(self._aiterator, "ag_running", False):
+                    return False
+
+                # Throw
+                try:
+                    assert isinstance(self._aiterator, AsyncGenerator)
+                    await self._aiterator.athrow(typ, value, traceback)
+                    raise RuntimeError("Async iterator didn't stop after athrow()")
+
+                # Exception has been (most probably) silenced
+                except StopAsyncIteration as exc:
+                    return exc is not value
+
+                # A (possibly new) exception has been raised
+                except BaseException as exc:
+                    if exc is value:
+                        return False
+                    raise
+            finally:
+                # Look for an aclose method
+                aclose = getattr(self._aiterator, "aclose", None)
+
+                # The ag_running attribute has been introduced with python 3.8
+                running = getattr(self._aiterator, "ag_running", False)
+                closed = not getattr(self._aiterator, "ag_frame", True)
+
+                # A RuntimeError is raised if aiterator is running or closed
+                if aclose and not running and not closed:
+                    try:
+                        await aclose()
+
+                    # Work around bpo-35409
+                    except GeneratorExit:
+                        pass  # pragma: no cover
+        finally:
+            self._state = self._FINISHED
+
+    async def aclose(self) -> None:
+        await self.__aexit__(None, None, None)
+
+    async def athrow(self, exc: Exception) -> T:
+        if self._state == self._FINISHED:
+            raise RuntimeError(f"{type(self).__name__} is closed and cannot be used")
+        assert isinstance(self._aiterator, AsyncGenerator)
+        item: T = await self._aiterator.athrow(exc)
+        return item
+
+
+def aitercontext(
+    aiterable: AsyncIterable[T],
+) -> AsyncIteratorContext[T]:
+    """Return an asynchronous context manager from an asynchronous iterable.
+
+    The context management makes sure the aclose asynchronous method
+    has run before it exits. It also issues warnings and RuntimeError
+    if it is used incorrectly.
+
+    It is safe to use with any asynchronous iterable and prevent
+    asynchronous iterator context to be wrapped twice.
+
+    Correct usage::
+
+        ait = some_asynchronous_iterable()
+        async with aitercontext(ait) as safe_ait:
+            async for item in safe_ait:
+                <block>
+    """
+    aiterator = aiter(aiterable)
+    if isinstance(aiterator, AsyncIteratorContext):
+        return aiterator
+    return AsyncIteratorContext(aiterator)