about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/aiostream
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/aiostream
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiostream')
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/__init__.py31
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py262
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/core.py567
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/manager.py159
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/pipe.py39
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/py.typed0
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py10
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py222
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py90
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/combine.py282
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/create.py191
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/misc.py83
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/select.py284
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/time.py56
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/transform.py128
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/test_utils.py173
16 files changed, 2577 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiostream/__init__.py b/.venv/lib/python3.12/site-packages/aiostream/__init__.py
new file mode 100644
index 00000000..3ac80c16
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/__init__.py
@@ -0,0 +1,31 @@
+"""Generator-based operators for asynchronous iteration.
+
+The two main modules are:
+- stream: provide all the stream operators (to create new stream objects)
+- pipe: provides all the pipe operators (to combine operators using '|')
+
+Additionally, three core objects are exposed:
+- streamcontext: a context for safe stream iteration
+- StreamEmpty: the exception raised when an empty stream is awaited
+- operator: a decorator to create stream operators from async generators
+
+Some utility modules are also provided:
+- aiter_utils: utilties for asynchronous iteration
+- context_utils: utilites for asynchronous context
+- test_utils: utilities for testing stream operators (require pytest)
+"""
+
+from . import stream, pipe
+from .aiter_utils import async_, await_
+from .core import StreamEmpty, operator, pipable_operator, streamcontext
+
+__all__ = [
+    "stream",
+    "pipe",
+    "async_",
+    "await_",
+    "operator",
+    "pipable_operator",
+    "streamcontext",
+    "StreamEmpty",
+]
diff --git a/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py b/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py
new file mode 100644
index 00000000..f68ea846
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py
@@ -0,0 +1,262 @@
+"""Utilities for asynchronous iteration."""
+from __future__ import annotations
+from types import TracebackType
+
+import warnings
+import functools
+from typing import (
+    TYPE_CHECKING,
+    AsyncContextManager,
+    AsyncGenerator,
+    AsyncIterable,
+    Awaitable,
+    Callable,
+    Type,
+    TypeVar,
+    AsyncIterator,
+    Any,
+)
+
+if TYPE_CHECKING:
+    from typing_extensions import ParamSpec
+
+    P = ParamSpec("P")
+
+from contextlib import AsyncExitStack
+
+__all__ = [
+    "aiter",
+    "anext",
+    "await_",
+    "async_",
+    "is_async_iterable",
+    "assert_async_iterable",
+    "is_async_iterator",
+    "assert_async_iterator",
+    "AsyncIteratorContext",
+    "aitercontext",
+    "AsyncExitStack",
+]
+
+
+# Magic method shorcuts
+
+
+def aiter(obj: AsyncIterable[T]) -> AsyncIterator[T]:
+    """Access aiter magic method."""
+    assert_async_iterable(obj)
+    return obj.__aiter__()
+
+
+def anext(obj: AsyncIterator[T]) -> Awaitable[T]:
+    """Access anext magic method."""
+    assert_async_iterator(obj)
+    return obj.__anext__()
+
+
+# Async / await helper functions
+
+
+async def await_(obj: Awaitable[T]) -> T:
+    """Identity coroutine function."""
+    return await obj
+
+
+def async_(fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
+    """Wrap the given function into a coroutine function."""
+
+    @functools.wraps(fn)
+    async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
+        return await fn(*args, **kwargs)
+
+    return wrapper
+
+
+# Iterability helpers
+
+
+def is_async_iterable(obj: object) -> bool:
+    """Check if the given object is an asynchronous iterable."""
+    return hasattr(obj, "__aiter__")
+
+
+def assert_async_iterable(obj: object) -> None:
+    """Raise a TypeError if the given object is not an
+    asynchronous iterable.
+    """
+    if not is_async_iterable(obj):
+        raise TypeError(f"{type(obj).__name__!r} object is not async iterable")
+
+
+def is_async_iterator(obj: object) -> bool:
+    """Check if the given object is an asynchronous iterator."""
+    return hasattr(obj, "__anext__")
+
+
+def assert_async_iterator(obj: object) -> None:
+    """Raise a TypeError if the given object is not an
+    asynchronous iterator.
+    """
+    if not is_async_iterator(obj):
+        raise TypeError(f"{type(obj).__name__!r} object is not an async iterator")
+
+
+# Async iterator context
+
+T = TypeVar("T")
+Self = TypeVar("Self", bound="AsyncIteratorContext[Any]")
+
+
+class AsyncIteratorContext(AsyncIterator[T], AsyncContextManager[Any]):
+    """Asynchronous iterator with context management.
+
+    The context management makes sure the aclose asynchronous method
+    of the corresponding iterator has run before it exits. It also issues
+    warnings and RuntimeError if it is used incorrectly.
+
+    Correct usage::
+
+        ait = some_asynchronous_iterable()
+        async with AsyncIteratorContext(ait) as safe_ait:
+            async for item in safe_ait:
+                <block>
+
+    It is nonetheless not meant to use directly.
+    Prefer aitercontext helper instead.
+    """
+
+    _STANDBY = "STANDBY"
+    _RUNNING = "RUNNING"
+    _FINISHED = "FINISHED"
+
+    def __init__(self, aiterator: AsyncIterator[T]):
+        """Initialize with an asynchrnous iterator."""
+        assert_async_iterator(aiterator)
+        if isinstance(aiterator, AsyncIteratorContext):
+            raise TypeError(f"{aiterator!r} is already an AsyncIteratorContext")
+        self._state = self._STANDBY
+        self._aiterator = aiterator
+
+    def __aiter__(self: Self) -> Self:
+        return self
+
+    def __anext__(self) -> Awaitable[T]:
+        if self._state == self._FINISHED:
+            raise RuntimeError(
+                f"{type(self).__name__} is closed and cannot be iterated"
+            )
+        if self._state == self._STANDBY:
+            warnings.warn(
+                f"{type(self).__name__} is iterated outside of its context",
+                stacklevel=2,
+            )
+        return anext(self._aiterator)
+
+    async def __aenter__(self: Self) -> Self:
+        if self._state == self._RUNNING:
+            raise RuntimeError(f"{type(self).__name__} has already been entered")
+        if self._state == self._FINISHED:
+            raise RuntimeError(
+                f"{type(self).__name__} is closed and cannot be iterated"
+            )
+        self._state = self._RUNNING
+        return self
+
+    async def __aexit__(
+        self,
+        typ: Type[BaseException] | None,
+        value: BaseException | None,
+        traceback: TracebackType | None,
+    ) -> bool:
+        try:
+            if self._state == self._FINISHED:
+                return False
+            try:
+                # No exception to throw
+                if typ is None:
+                    return False
+
+                # Prevent GeneratorExit from being silenced
+                if typ is GeneratorExit:
+                    return False
+
+                # No method to throw
+                if not hasattr(self._aiterator, "athrow"):
+                    return False
+
+                # No frame to throw
+                if not getattr(self._aiterator, "ag_frame", True):
+                    return False
+
+                # Cannot throw at the moment
+                if getattr(self._aiterator, "ag_running", False):
+                    return False
+
+                # Throw
+                try:
+                    assert isinstance(self._aiterator, AsyncGenerator)
+                    await self._aiterator.athrow(typ, value, traceback)
+                    raise RuntimeError("Async iterator didn't stop after athrow()")
+
+                # Exception has been (most probably) silenced
+                except StopAsyncIteration as exc:
+                    return exc is not value
+
+                # A (possibly new) exception has been raised
+                except BaseException as exc:
+                    if exc is value:
+                        return False
+                    raise
+            finally:
+                # Look for an aclose method
+                aclose = getattr(self._aiterator, "aclose", None)
+
+                # The ag_running attribute has been introduced with python 3.8
+                running = getattr(self._aiterator, "ag_running", False)
+                closed = not getattr(self._aiterator, "ag_frame", True)
+
+                # A RuntimeError is raised if aiterator is running or closed
+                if aclose and not running and not closed:
+                    try:
+                        await aclose()
+
+                    # Work around bpo-35409
+                    except GeneratorExit:
+                        pass  # pragma: no cover
+        finally:
+            self._state = self._FINISHED
+
+    async def aclose(self) -> None:
+        await self.__aexit__(None, None, None)
+
+    async def athrow(self, exc: Exception) -> T:
+        if self._state == self._FINISHED:
+            raise RuntimeError(f"{type(self).__name__} is closed and cannot be used")
+        assert isinstance(self._aiterator, AsyncGenerator)
+        item: T = await self._aiterator.athrow(exc)
+        return item
+
+
+def aitercontext(
+    aiterable: AsyncIterable[T],
+) -> AsyncIteratorContext[T]:
+    """Return an asynchronous context manager from an asynchronous iterable.
+
+    The context management makes sure the aclose asynchronous method
+    has run before it exits. It also issues warnings and RuntimeError
+    if it is used incorrectly.
+
+    It is safe to use with any asynchronous iterable and prevent
+    asynchronous iterator context to be wrapped twice.
+
+    Correct usage::
+
+        ait = some_asynchronous_iterable()
+        async with aitercontext(ait) as safe_ait:
+            async for item in safe_ait:
+                <block>
+    """
+    aiterator = aiter(aiterable)
+    if isinstance(aiterator, AsyncIteratorContext):
+        return aiterator
+    return AsyncIteratorContext(aiterator)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/core.py b/.venv/lib/python3.12/site-packages/aiostream/core.py
new file mode 100644
index 00000000..a11f0823
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/core.py
@@ -0,0 +1,567 @@
+"""Core objects for stream operators."""
+from __future__ import annotations
+
+import inspect
+import functools
+import sys
+import warnings
+
+from .aiter_utils import AsyncIteratorContext, aiter, assert_async_iterable
+from typing import (
+    Any,
+    AsyncIterator,
+    Callable,
+    Generator,
+    Iterator,
+    Protocol,
+    Union,
+    TypeVar,
+    cast,
+    AsyncIterable,
+    Awaitable,
+)
+
+from typing_extensions import ParamSpec, Concatenate
+
+
+__all__ = ["Stream", "Streamer", "StreamEmpty", "operator", "streamcontext"]
+
+
+# Exception
+
+
+class StreamEmpty(Exception):
+    """Exception raised when awaiting an empty stream."""
+
+    pass
+
+
+# Helpers
+
+T = TypeVar("T")
+X = TypeVar("X")
+A = TypeVar("A", contravariant=True)
+P = ParamSpec("P")
+Q = ParamSpec("Q")
+
+# Hack for python 3.8 compatibility
+if sys.version_info < (3, 9):
+    P = TypeVar("P")
+
+
+async def wait_stream(aiterable: BaseStream[T]) -> T:
+    """Wait for an asynchronous iterable to finish and return the last item.
+
+    The iterable is executed within a safe stream context.
+    A StreamEmpty exception is raised if the sequence is empty.
+    """
+
+    class Unassigned:
+        pass
+
+    last_item: Unassigned | T = Unassigned()
+
+    async with streamcontext(aiterable) as streamer:
+        async for item in streamer:
+            last_item = item
+
+    if isinstance(last_item, Unassigned):
+        raise StreamEmpty()
+    return last_item
+
+
+# Core objects
+
+
+class BaseStream(AsyncIterable[T], Awaitable[T]):
+    """
+    Base class for streams.
+
+    See `Stream` and `Streamer` for more information.
+    """
+
+    def __init__(self, factory: Callable[[], AsyncIterable[T]]) -> None:
+        """Initialize the stream with an asynchronous iterable factory.
+
+        The factory is a callable and takes no argument.
+        The factory return value is an asynchronous iterable.
+        """
+        aiter = factory()
+        assert_async_iterable(aiter)
+        self._generator = self._make_generator(aiter, factory)
+
+    def _make_generator(
+        self, first: AsyncIterable[T], factory: Callable[[], AsyncIterable[T]]
+    ) -> Iterator[AsyncIterable[T]]:
+        """Generate asynchronous iterables when required.
+
+        The first iterable is created beforehand for extra checking.
+        """
+        yield first
+        del first
+        while True:
+            yield factory()
+
+    def __await__(self) -> Generator[Any, None, T]:
+        """Await protocol.
+
+        Safely iterate and return the last element.
+        """
+        return wait_stream(self).__await__()
+
+    def __or__(self, func: Callable[[BaseStream[T]], X]) -> X:
+        """Pipe protocol.
+
+        Allow to pipe stream operators.
+        """
+        return func(self)
+
+    def __add__(self, value: AsyncIterable[X]) -> Stream[Union[X, T]]:
+        """Addition protocol.
+
+        Concatenate with a given asynchronous sequence.
+        """
+        from .stream import chain
+
+        return chain(self, value)
+
+    def __getitem__(self, value: Union[int, slice]) -> Stream[T]:
+        """Get item protocol.
+
+        Accept index or slice to extract the corresponding item(s)
+        """
+        from .stream import getitem
+
+        return getitem(self, value)
+
+    # Disable sync iteration
+    # This is necessary because __getitem__ is defined
+    # which is a valid fallback for for-loops in python
+    __iter__: None = None
+
+
+class Stream(BaseStream[T]):
+    """Enhanced asynchronous iterable.
+
+    It provides the following features:
+
+      - **Operator pipe-lining** - using pipe symbol ``|``
+      - **Repeatability** - every iteration creates a different iterator
+      - **Safe iteration context** - using ``async with`` and the ``stream``
+        method
+      - **Simplified execution** - get the last element from a stream using
+        ``await``
+      - **Slicing and indexing** - using square brackets ``[]``
+      - **Concatenation** - using addition symbol ``+``
+
+    It is not meant to be instanciated directly.
+    Use the stream operators instead.
+
+    Example::
+
+        xs = stream.count()    # xs is a stream object
+        ys = xs | pipe.skip(5) # pipe xs and skip the first 5 elements
+        zs = ys[5:10:2]        # slice ys using start, stop and step
+
+        async with zs.stream() as streamer:  # stream zs in a safe context
+            async for z in streamer:         # iterate the zs streamer
+                print(z)                     # Prints 10, 12, 14
+
+        result = await zs  # await zs and return its last element
+        print(result)      # Prints 14
+        result = await zs  # zs can be used several times
+        print(result)      # Prints 14
+    """
+
+    def stream(self) -> Streamer[T]:
+        """Return a streamer context for safe iteration.
+
+        Example::
+
+            xs = stream.count()
+            async with xs.stream() as streamer:
+                async for item in streamer:
+                    <block>
+
+        """
+        return self.__aiter__()
+
+    def __aiter__(self) -> Streamer[T]:
+        """Asynchronous iteration protocol.
+
+        Return a streamer context for safe iteration.
+        """
+        return streamcontext(next(self._generator))
+
+    # Advertise the proper synthax for entering a stream context
+
+    __aexit__: None = None
+
+    async def __aenter__(self) -> None:
+        raise TypeError(
+            "A stream object cannot be used as a context manager. "
+            "Use the `stream` method instead: "
+            "`async with xs.stream() as streamer`"
+        )
+
+
+class Streamer(AsyncIteratorContext[T], BaseStream[T]):
+    """Enhanced asynchronous iterator context.
+
+    It is similar to AsyncIteratorContext but provides the stream
+    magic methods for concatenation, indexing and awaiting.
+
+    It's not meant to be instanciated directly, use streamcontext instead.
+
+    Example::
+
+        ait = some_asynchronous_iterable()
+        async with streamcontext(ait) as streamer:
+            async for item in streamer:
+                await streamer[5]
+    """
+
+    pass
+
+
+def streamcontext(aiterable: AsyncIterable[T]) -> Streamer[T]:
+    """Return a stream context manager from an asynchronous iterable.
+
+    The context management makes sure the aclose asynchronous method
+    of the corresponding iterator has run before it exits. It also issues
+    warnings and RuntimeError if it is used incorrectly.
+
+    It is safe to use with any asynchronous iterable and prevent
+    asynchronous iterator context to be wrapped twice.
+
+    Correct usage::
+
+        ait = some_asynchronous_iterable()
+        async with streamcontext(ait) as streamer:
+            async for item in streamer:
+                <block>
+
+    For streams objects, it is possible to use the stream method instead::
+
+        xs = stream.count()
+        async with xs.stream() as streamer:
+            async for item in streamer:
+                <block>
+    """
+    aiterator = aiter(aiterable)
+    if isinstance(aiterator, Streamer):
+        return aiterator
+    return Streamer(aiterator)
+
+
+# Operator type protocol
+
+
+class OperatorType(Protocol[P, T]):
+    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Stream[T]:
+        ...
+
+    def raw(self, *args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]:
+        ...
+
+
+class PipableOperatorType(Protocol[A, P, T]):
+    def __call__(
+        self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs
+    ) -> Stream[T]:
+        ...
+
+    def raw(
+        self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs
+    ) -> AsyncIterator[T]:
+        ...
+
+    def pipe(
+        self, *args: P.args, **kwargs: P.kwargs
+    ) -> Callable[[AsyncIterable[A]], Stream[T]]:
+        ...
+
+
+# Operator decorator
+
+
+def operator(
+    func: Callable[P, AsyncIterator[T]] | None = None,
+    pipable: bool | None = None,
+) -> OperatorType[P, T]:
+    """Create a stream operator from an asynchronous generator
+    (or any function returning an asynchronous iterable).
+
+    Decorator usage::
+
+        @operator
+        async def random(offset=0., width=1.):
+            while True:
+                yield offset + width * random.random()
+
+    The return value is a dynamically created class.
+    It has the same name, module and doc as the original function.
+
+    A new stream is created by simply instanciating the operator::
+
+        xs = random()
+
+    The original function is called at instanciation to check that
+    signature match. Other methods are available:
+
+      - `original`: the original function as a static method
+      - `raw`: same as original but add extra checking
+
+    The `pipable` argument is deprecated, use `pipable_operator` instead.
+    """
+
+    # Handle compatibility with legacy (aiostream <= 0.4)
+    if pipable is not None or func is None:
+        warnings.warn(
+            "The `pipable` argument is deprecated. Use either `@operator` or `@pipable_operator` directly.",
+            DeprecationWarning,
+        )
+    if func is None:
+        return pipable_operator if pipable else operator  # type: ignore
+    if pipable is True:
+        return pipable_operator(func)  # type: ignore
+
+    # First check for classmethod instance, to avoid more confusing errors later on
+    if isinstance(func, classmethod):
+        raise ValueError(
+            "An operator cannot be created from a class method, "
+            "since the decorated function becomes an operator class"
+        )
+
+    # Gather data
+    bases = (Stream,)
+    name = func.__name__
+    module = func.__module__
+    extra_doc = func.__doc__
+    doc = extra_doc or f"Regular {name} stream operator."
+
+    # Extract signature
+    signature = inspect.signature(func)
+    parameters = list(signature.parameters.values())
+    if parameters and parameters[0].name in ("self", "cls"):
+        raise ValueError(
+            "An operator cannot be created from a method, "
+            "since the decorated function becomes an operator class"
+        )
+
+    # Look for "more_sources"
+    for i, p in enumerate(parameters):
+        if p.name == "more_sources" and p.kind == inspect.Parameter.VAR_POSITIONAL:
+            more_sources_index = i
+            break
+    else:
+        more_sources_index = None
+
+    # Injected parameters
+    self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
+    inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD)
+
+    # Wrapped static method
+    original = func
+    original.__qualname__ = name + ".original"
+
+    # Raw static method
+    raw = func
+    raw.__qualname__ = name + ".raw"
+
+    # Init method
+    def init(self: BaseStream[T], *args: P.args, **kwargs: P.kwargs) -> None:
+        if more_sources_index is not None:
+            for source in args[more_sources_index:]:
+                assert_async_iterable(source)
+        factory = functools.partial(raw, *args, **kwargs)
+        return BaseStream.__init__(self, factory)
+
+    # Customize init signature
+    new_parameters = [self_parameter] + parameters
+    init.__signature__ = signature.replace(parameters=new_parameters)  # type: ignore[attr-defined]
+
+    # Customize init method
+    init.__qualname__ = name + ".__init__"
+    init.__name__ = "__init__"
+    init.__module__ = module
+    init.__doc__ = f"Initialize the {name} stream."
+
+    # Gather attributes
+    attrs = {
+        "__init__": init,
+        "__module__": module,
+        "__doc__": doc,
+        "raw": staticmethod(raw),
+        "original": staticmethod(original),
+    }
+
+    # Create operator class
+    return cast("OperatorType[P, T]", type(name, bases, attrs))
+
+
+def pipable_operator(
+    func: Callable[Concatenate[AsyncIterable[X], P], AsyncIterator[T]],
+) -> PipableOperatorType[X, P, T]:
+    """Create a pipable stream operator from an asynchronous generator
+    (or any function returning an asynchronous iterable).
+
+    Decorator usage::
+
+        @pipable_operator
+        async def multiply(source, factor):
+            async with streamcontext(source) as streamer:
+                 async for item in streamer:
+                     yield factor * item
+
+    The first argument is expected to be the asynchronous iteratable used
+    for piping.
+
+    The return value is a dynamically created class.
+    It has the same name, module and doc as the original function.
+
+    A new stream is created by simply instanciating the operator::
+
+        xs = random()
+        ys = multiply(xs, 2)
+
+    The original function is called at instanciation to check that
+    signature match. The source is also checked for asynchronous iteration.
+
+    The operator also have a pipe class method that can be used along
+    with the piping synthax::
+
+        xs = random()
+        ys = xs | multiply.pipe(2)
+
+    This is strictly equivalent to the previous example.
+
+    Other methods are available:
+
+      - `original`: the original function as a static method
+      - `raw`: same as original but add extra checking
+
+    The raw method is useful to create new operators from existing ones::
+
+        @pipable_operator
+        def double(source):
+            return multiply.raw(source, 2)
+    """
+
+    # First check for classmethod instance, to avoid more confusing errors later on
+    if isinstance(func, classmethod):
+        raise ValueError(
+            "An operator cannot be created from a class method, "
+            "since the decorated function becomes an operator class"
+        )
+
+    # Gather data
+    bases = (Stream,)
+    name = func.__name__
+    module = func.__module__
+    extra_doc = func.__doc__
+    doc = extra_doc or f"Regular {name} stream operator."
+
+    # Extract signature
+    signature = inspect.signature(func)
+    parameters = list(signature.parameters.values())
+    if parameters and parameters[0].name in ("self", "cls"):
+        raise ValueError(
+            "An operator cannot be created from a method, "
+            "since the decorated function becomes an operator class"
+        )
+
+    # Look for "more_sources"
+    for i, p in enumerate(parameters):
+        if p.name == "more_sources" and p.kind == inspect.Parameter.VAR_POSITIONAL:
+            more_sources_index = i
+            break
+    else:
+        more_sources_index = None
+
+    # Injected parameters
+    self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
+    cls_parameter = inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD)
+
+    # Wrapped static method
+    original = func
+    original.__qualname__ = name + ".original"
+
+    # Raw static method
+    def raw(
+        arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs
+    ) -> AsyncIterator[T]:
+        assert_async_iterable(arg)
+        if more_sources_index is not None:
+            for source in args[more_sources_index - 1 :]:
+                assert_async_iterable(source)
+        return func(arg, *args, **kwargs)
+
+    # Custonize raw method
+    raw.__signature__ = signature  # type: ignore[attr-defined]
+    raw.__qualname__ = name + ".raw"
+    raw.__module__ = module
+    raw.__doc__ = doc
+
+    # Init method
+    def init(
+        self: BaseStream[T], arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs
+    ) -> None:
+        assert_async_iterable(arg)
+        if more_sources_index is not None:
+            for source in args[more_sources_index - 1 :]:
+                assert_async_iterable(source)
+        factory = functools.partial(raw, arg, *args, **kwargs)
+        return BaseStream.__init__(self, factory)
+
+    # Customize init signature
+    new_parameters = [self_parameter] + parameters
+    init.__signature__ = signature.replace(parameters=new_parameters)  # type: ignore[attr-defined]
+
+    # Customize init method
+    init.__qualname__ = name + ".__init__"
+    init.__name__ = "__init__"
+    init.__module__ = module
+    init.__doc__ = f"Initialize the {name} stream."
+
+    # Pipe class method
+    def pipe(
+        cls: PipableOperatorType[X, P, T],
+        /,
+        *args: P.args,
+        **kwargs: P.kwargs,
+    ) -> Callable[[AsyncIterable[X]], Stream[T]]:
+        return lambda source: cls(source, *args, **kwargs)
+
+    # Customize pipe signature
+    if parameters and parameters[0].kind in (
+        inspect.Parameter.POSITIONAL_ONLY,
+        inspect.Parameter.POSITIONAL_OR_KEYWORD,
+    ):
+        new_parameters = [cls_parameter] + parameters[1:]
+    else:
+        new_parameters = [cls_parameter] + parameters
+    pipe.__signature__ = signature.replace(parameters=new_parameters)  # type: ignore[attr-defined]
+
+    # Customize pipe method
+    pipe.__qualname__ = name + ".pipe"
+    pipe.__module__ = module
+    pipe.__doc__ = f'Pipable "{name}" stream operator.'
+    if extra_doc:
+        pipe.__doc__ += "\n\n    " + extra_doc
+
+    # Gather attributes
+    attrs = {
+        "__init__": init,
+        "__module__": module,
+        "__doc__": doc,
+        "raw": staticmethod(raw),
+        "original": staticmethod(original),
+        "pipe": classmethod(pipe),  # type: ignore[arg-type]
+    }
+
+    # Create operator class
+    return cast(
+        "PipableOperatorType[X, P, T]",
+        type(name, bases, attrs),
+    )
diff --git a/.venv/lib/python3.12/site-packages/aiostream/manager.py b/.venv/lib/python3.12/site-packages/aiostream/manager.py
new file mode 100644
index 00000000..bab224a5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/manager.py
@@ -0,0 +1,159 @@
+"""Provide a context to easily manage several streamers running
+concurrently.
+"""
+from __future__ import annotations
+
+import asyncio
+from .aiter_utils import AsyncExitStack
+
+from .aiter_utils import anext
+from .core import streamcontext
+from typing import (
+    TYPE_CHECKING,
+    Awaitable,
+    List,
+    Set,
+    Tuple,
+    Generic,
+    TypeVar,
+    Any,
+    Type,
+    AsyncIterable,
+)
+from types import TracebackType
+
+if TYPE_CHECKING:
+    from asyncio import Task
+    from aiostream.core import Streamer
+
+T = TypeVar("T")
+
+
+class TaskGroup:
+    def __init__(self) -> None:
+        self._pending: set[Task[Any]] = set()
+
+    async def __aenter__(self) -> TaskGroup:
+        return self
+
+    async def __aexit__(
+        self,
+        typ: Type[BaseException] | None,
+        value: BaseException | None,
+        traceback: TracebackType | None,
+    ) -> None:
+        while self._pending:
+            task = self._pending.pop()
+            await self.cancel_task(task)
+
+    def create_task(self, coro: Awaitable[T]) -> Task[T]:
+        task = asyncio.ensure_future(coro)
+        self._pending.add(task)
+        return task
+
+    async def wait_any(self, tasks: List[Task[T]]) -> Set[Task[T]]:
+        done, _ = await asyncio.wait(tasks, return_when="FIRST_COMPLETED")
+        self._pending -= done
+        return done
+
+    async def wait_all(self, tasks: List[Task[T]]) -> Set[Task[T]]:
+        if not tasks:
+            return set()
+        done, _ = await asyncio.wait(tasks)
+        self._pending -= done
+        return done
+
+    async def cancel_task(self, task: Task[Any]) -> None:
+        try:
+            # The task is already cancelled
+            if task.cancelled():
+                pass
+            # The task is already finished
+            elif task.done():
+                # Discard the pending exception (if any).
+                # This makes sense since we don't know in which context the exception
+                # was meant to be processed. For instance, a `StopAsyncIteration`
+                # might be raised to notify that the end of a streamer has been reached.
+                task.exception()
+            # The task needs to be cancelled and awaited
+            else:
+                task.cancel()
+                try:
+                    await task
+                except asyncio.CancelledError:
+                    pass
+                # Silence any exception raised while cancelling the task.
+                # This might happen if the `CancelledError` is silenced, and the
+                # corresponding async generator returns, causing the `anext` call
+                # to raise a `StopAsyncIteration`.
+                except Exception:
+                    pass
+        finally:
+            self._pending.discard(task)
+
+
+class StreamerManager(Generic[T]):
+    def __init__(self) -> None:
+        self.tasks: dict[Streamer[T], Task[T]] = {}
+        self.streamers: list[Streamer[T]] = []
+        self.group: TaskGroup = TaskGroup()
+        self.stack = AsyncExitStack()
+
+    async def __aenter__(self) -> StreamerManager[T]:
+        await self.stack.__aenter__()
+        await self.stack.enter_async_context(self.group)
+        return self
+
+    async def __aexit__(
+        self,
+        typ: Type[BaseException] | None,
+        value: BaseException | None,
+        traceback: TracebackType | None,
+    ) -> bool:
+        for streamer in self.streamers:
+            task = self.tasks.pop(streamer, None)
+            if task is not None:
+                self.stack.push_async_callback(self.group.cancel_task, task)
+            self.stack.push_async_exit(streamer)
+        self.tasks.clear()
+        self.streamers.clear()
+        return await self.stack.__aexit__(typ, value, traceback)
+
+    async def enter_and_create_task(self, aiter: AsyncIterable[T]) -> Streamer[T]:
+        streamer = streamcontext(aiter)
+        await streamer.__aenter__()
+        self.streamers.append(streamer)
+        self.create_task(streamer)
+        return streamer
+
+    def create_task(self, streamer: Streamer[T]) -> None:
+        assert streamer in self.streamers
+        assert streamer not in self.tasks
+        self.tasks[streamer] = self.group.create_task(anext(streamer))
+
+    async def wait_single_event(
+        self, filters: list[Streamer[T]]
+    ) -> Tuple[Streamer[T], Task[T]]:
+        tasks = [self.tasks[streamer] for streamer in filters]
+        done = await self.group.wait_any(tasks)
+        for streamer in filters:
+            if self.tasks.get(streamer) in done:
+                return streamer, self.tasks.pop(streamer)
+        assert False
+
+    async def clean_streamer(self, streamer: Streamer[T]) -> None:
+        task = self.tasks.pop(streamer, None)
+        if task is not None:
+            await self.group.cancel_task(task)
+        await streamer.aclose()
+        self.streamers.remove(streamer)
+
+    async def clean_streamers(self, streamers: list[Streamer[T]]) -> None:
+        tasks = [
+            self.group.create_task(self.clean_streamer(streamer))
+            for streamer in streamers
+        ]
+        done = await self.group.wait_all(tasks)
+        # Raise exception if any
+        for task in done:
+            task.result()
diff --git a/.venv/lib/python3.12/site-packages/aiostream/pipe.py b/.venv/lib/python3.12/site-packages/aiostream/pipe.py
new file mode 100644
index 00000000..c27e101a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/pipe.py
@@ -0,0 +1,39 @@
+"""Gather the pipe operators."""
+from __future__ import annotations
+
+from . import stream
+
+accumulate = stream.accumulate.pipe
+action = stream.action.pipe
+amap = stream.amap.pipe
+chain = stream.chain.pipe
+chunks = stream.chunks.pipe
+concat = stream.concat.pipe
+concatmap = stream.concatmap.pipe
+cycle = stream.cycle.pipe
+delay = stream.delay.pipe
+dropwhile = stream.dropwhile.pipe
+enumerate = stream.enumerate.pipe
+filter = stream.filter.pipe
+flatmap = stream.flatmap.pipe
+flatten = stream.flatten.pipe
+getitem = stream.getitem.pipe
+list = stream.list.pipe
+map = stream.map.pipe
+merge = stream.merge.pipe
+print = stream.print.pipe
+reduce = stream.reduce.pipe
+skip = stream.skip.pipe
+skiplast = stream.skiplast.pipe
+smap = stream.smap.pipe
+spaceout = stream.spaceout.pipe
+starmap = stream.starmap.pipe
+switch = stream.switch.pipe
+switchmap = stream.switchmap.pipe
+take = stream.take.pipe
+takelast = stream.takelast.pipe
+takewhile = stream.takewhile.pipe
+timeout = stream.timeout.pipe
+until = stream.until.pipe
+zip = stream.zip.pipe
+ziplatest = stream.ziplatest.pipe
diff --git a/.venv/lib/python3.12/site-packages/aiostream/py.typed b/.venv/lib/python3.12/site-packages/aiostream/py.typed
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/py.typed
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py b/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py
new file mode 100644
index 00000000..cd20d6c9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py
@@ -0,0 +1,10 @@
+"""Gather all the stream operators."""
+
+from .create import *
+from .transform import *
+from .select import *
+from .combine import *
+from .aggregate import *
+from .time import *
+from .misc import *
+from .advanced import *
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py b/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py
new file mode 100644
index 00000000..106f0f1d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py
@@ -0,0 +1,222 @@
+"""Advanced operators (to deal with streams of higher order) ."""
+from __future__ import annotations
+
+from typing import AsyncIterator, AsyncIterable, TypeVar, Union, cast
+from typing_extensions import ParamSpec
+
+from . import combine
+
+from ..core import Streamer, pipable_operator
+from ..manager import StreamerManager
+
+
+__all__ = ["concat", "flatten", "switch", "concatmap", "flatmap", "switchmap"]
+
+
+T = TypeVar("T")
+U = TypeVar("U")
+P = ParamSpec("P")
+
+
+# Helper to manage stream of higher order
+
+
+@pipable_operator
+async def base_combine(
+    source: AsyncIterable[AsyncIterable[T]],
+    switch: bool = False,
+    ordered: bool = False,
+    task_limit: int | None = None,
+) -> AsyncIterator[T]:
+    """Base operator for managing an asynchronous sequence of sequences.
+
+    The sequences are awaited concurrently, although it's possible to limit
+    the amount of running sequences using the `task_limit` argument.
+
+    The ``switch`` argument enables the switch mecanism, which cause the
+    previous subsequence to be discarded when a new one is created.
+
+    The items can either be generated in order or as soon as they're received,
+    depending on the ``ordered`` argument.
+    """
+
+    # Task limit
+    if task_limit is not None and not task_limit > 0:
+        raise ValueError("The task limit must be None or greater than 0")
+
+    # Safe context
+    async with StreamerManager[Union[AsyncIterable[T], T]]() as manager:
+        main_streamer: Streamer[
+            AsyncIterable[T] | T
+        ] | None = await manager.enter_and_create_task(source)
+
+        # Loop over events
+        while manager.tasks:
+            # Extract streamer groups
+            substreamers = manager.streamers[1:]
+            mainstreamers = [main_streamer] if main_streamer in manager.tasks else []
+
+            # Switch - use the main streamer then the substreamer
+            if switch:
+                filters = mainstreamers + substreamers
+            # Concat - use the first substreamer then the main streamer
+            elif ordered:
+                filters = substreamers[:1] + mainstreamers
+            # Flat - use the substreamers then the main streamer
+            else:
+                filters = substreamers + mainstreamers
+
+            # Wait for next event
+            streamer, task = await manager.wait_single_event(filters)
+
+            # Get result
+            try:
+                result = task.result()
+
+            # End of stream
+            except StopAsyncIteration:
+                # Main streamer is finished
+                if streamer is main_streamer:
+                    main_streamer = None
+
+                # A substreamer is finished
+                else:
+                    await manager.clean_streamer(streamer)
+
+                    # Re-schedule the main streamer if necessary
+                    if main_streamer is not None and main_streamer not in manager.tasks:
+                        manager.create_task(main_streamer)
+
+            # Process result
+            else:
+                # Switch mecanism
+                if switch and streamer is main_streamer:
+                    await manager.clean_streamers(substreamers)
+
+                # Setup a new source
+                if streamer is main_streamer:
+                    assert isinstance(result, AsyncIterable)
+                    await manager.enter_and_create_task(result)
+
+                    # Re-schedule the main streamer if task limit allows it
+                    if task_limit is None or task_limit > len(manager.tasks):
+                        manager.create_task(streamer)
+
+                # Yield the result
+                else:
+                    item = cast("T", result)
+                    yield item
+
+                    # Re-schedule the streamer
+                    manager.create_task(streamer)
+
+
+# Advanced operators (for streams of higher order)
+
+
+@pipable_operator
+def concat(
+    source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None
+) -> AsyncIterator[T]:
+    """Given an asynchronous sequence of sequences, generate the elements
+    of the sequences in order.
+
+    The sequences are awaited concurrently, although it's possible to limit
+    the amount of running sequences using the `task_limit` argument.
+
+    Errors raised in the source or an element sequence are propagated.
+    """
+    return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=True)
+
+
+@pipable_operator
+def flatten(
+    source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None
+) -> AsyncIterator[T]:
+    """Given an asynchronous sequence of sequences, generate the elements
+    of the sequences as soon as they're received.
+
+    The sequences are awaited concurrently, although it's possible to limit
+    the amount of running sequences using the `task_limit` argument.
+
+    Errors raised in the source or an element sequence are propagated.
+    """
+    return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=False)
+
+
+@pipable_operator
+def switch(source: AsyncIterable[AsyncIterable[T]]) -> AsyncIterator[T]:
+    """Given an asynchronous sequence of sequences, generate the elements of
+    the most recent sequence.
+
+    Element sequences are generated eagerly, and closed once they are
+    superseded by a more recent sequence. Once the main sequence is finished,
+    the last subsequence will be exhausted completely.
+
+    Errors raised in the source or an element sequence (that was not already
+    closed) are propagated.
+    """
+    return base_combine.raw(source, switch=True)
+
+
+# Advanced *-map operators
+
+
+@pipable_operator
+def concatmap(
+    source: AsyncIterable[T],
+    func: combine.SmapCallable[T, AsyncIterable[U]],
+    *more_sources: AsyncIterable[T],
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
+    """Apply a given function that creates a sequence from the elements of one
+    or several asynchronous sequences, and generate the elements of the created
+    sequences in order.
+
+    The function is applied as described in `map`, and must return an
+    asynchronous sequence. The returned sequences are awaited concurrently,
+    although it's possible to limit the amount of running sequences using
+    the `task_limit` argument.
+    """
+    mapped = combine.smap.raw(source, func, *more_sources)
+    return concat.raw(mapped, task_limit=task_limit)
+
+
+@pipable_operator
+def flatmap(
+    source: AsyncIterable[T],
+    func: combine.SmapCallable[T, AsyncIterable[U]],
+    *more_sources: AsyncIterable[T],
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
+    """Apply a given function that creates a sequence from the elements of one
+    or several asynchronous sequences, and generate the elements of the created
+    sequences as soon as they arrive.
+
+    The function is applied as described in `map`, and must return an
+    asynchronous sequence. The returned sequences are awaited concurrently,
+    although it's possible to limit the amount of running sequences using
+    the `task_limit` argument.
+
+    Errors raised in a source or output sequence are propagated.
+    """
+    mapped = combine.smap.raw(source, func, *more_sources)
+    return flatten.raw(mapped, task_limit=task_limit)
+
+
+@pipable_operator
+def switchmap(
+    source: AsyncIterable[T],
+    func: combine.SmapCallable[T, AsyncIterable[U]],
+    *more_sources: AsyncIterable[T],
+) -> AsyncIterator[U]:
+    """Apply a given function that creates a sequence from the elements of one
+    or several asynchronous sequences and generate the elements of the most
+    recently created sequence.
+
+    The function is applied as described in `map`, and must return an
+    asynchronous sequence. Errors raised in a source or output sequence (that
+    was not already closed) are propagated.
+    """
+    mapped = combine.smap.raw(source, func, *more_sources)
+    return switch.raw(mapped)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py b/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py
new file mode 100644
index 00000000..8ed7c0e1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py
@@ -0,0 +1,90 @@
+"""Aggregation operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+import operator as op
+from typing import AsyncIterator, Awaitable, Callable, TypeVar, AsyncIterable, cast
+
+
+from . import select
+from ..aiter_utils import anext
+from ..core import pipable_operator, streamcontext
+
+__all__ = ["accumulate", "reduce", "list"]
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def accumulate(
+    source: AsyncIterable[T],
+    func: Callable[[T, T], Awaitable[T] | T] = op.add,
+    initializer: T | None = None,
+) -> AsyncIterator[T]:
+    """Generate a series of accumulated sums (or other binary function)
+    from an asynchronous sequence.
+
+    If ``initializer`` is present, it is placed before the items
+    of the sequence in the calculation, and serves as a default
+    when the sequence is empty.
+    """
+    iscorofunc = asyncio.iscoroutinefunction(func)
+    async with streamcontext(source) as streamer:
+        # Initialize
+        if initializer is None:
+            try:
+                value = await anext(streamer)
+            except StopAsyncIteration:
+                return
+        else:
+            value = initializer
+        # First value
+        yield value
+        # Iterate streamer
+        async for item in streamer:
+            returned = func(value, item)
+            if iscorofunc:
+                awaitable_value = cast("Awaitable[T]", returned)
+                value = await awaitable_value
+            else:
+                value = cast("T", returned)
+            yield value
+
+
+@pipable_operator
+def reduce(
+    source: AsyncIterable[T],
+    func: Callable[[T, T], Awaitable[T] | T],
+    initializer: T | None = None,
+) -> AsyncIterator[T]:
+    """Apply a function of two arguments cumulatively to the items
+    of an asynchronous sequence, reducing the sequence to a single value.
+
+    If ``initializer`` is present, it is placed before the items
+    of the sequence in the calculation, and serves as a default when the
+    sequence is empty.
+    """
+    acc = accumulate.raw(source, func, initializer)
+    return select.item.raw(acc, -1)
+
+
+@pipable_operator
+async def list(source: AsyncIterable[T]) -> AsyncIterator[builtins.list[T]]:
+    """Build a list from an asynchronous sequence.
+
+    All the intermediate steps are generated, starting from the empty list.
+
+    This operator can be used to easily convert a stream into a list::
+
+        lst = await stream.list(x)
+
+    ..note:: The same list object is produced at each step in order to avoid
+    memory copies.
+    """
+    result: builtins.list[T] = []
+    yield result
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            result.append(item)
+            yield result
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py b/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py
new file mode 100644
index 00000000..a782a730
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py
@@ -0,0 +1,282 @@
+"""Combination operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+
+from typing import (
+    Awaitable,
+    Protocol,
+    TypeVar,
+    AsyncIterable,
+    AsyncIterator,
+    Callable,
+    cast,
+)
+from typing_extensions import ParamSpec
+
+from ..aiter_utils import AsyncExitStack, anext
+from ..core import streamcontext, pipable_operator
+
+from . import create
+from . import select
+from . import advanced
+from . import aggregate
+
+__all__ = ["chain", "zip", "map", "merge", "ziplatest", "amap", "smap"]
+
+T = TypeVar("T")
+U = TypeVar("U")
+K = TypeVar("K")
+P = ParamSpec("P")
+
+
+@pipable_operator
+async def chain(
+    source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[T]:
+    """Chain asynchronous sequences together, in the order they are given.
+
+    Note: the sequences are not iterated until it is required,
+    so if the operation is interrupted, the remaining sequences
+    will be left untouched.
+    """
+    sources = source, *more_sources
+    for source in sources:
+        async with streamcontext(source) as streamer:
+            async for item in streamer:
+                yield item
+
+
+@pipable_operator
+async def zip(
+    source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[tuple[T, ...]]:
+    """Combine and forward the elements of several asynchronous sequences.
+
+    Each generated value is a tuple of elements, using the same order as
+    their respective sources. The generation continues until the shortest
+    sequence is exhausted.
+
+    Note: the different sequences are awaited in parrallel, so that their
+    waiting times don't add up.
+    """
+    sources = source, *more_sources
+
+    # One sources
+    if len(sources) == 1:
+        (source,) = sources
+        async with streamcontext(source) as streamer:
+            async for item in streamer:
+                yield (item,)
+        return
+
+    # N sources
+    async with AsyncExitStack() as stack:
+        # Handle resources
+        streamers = [
+            await stack.enter_async_context(streamcontext(source)) for source in sources
+        ]
+        # Loop over items
+        while True:
+            try:
+                coros = builtins.map(anext, streamers)
+                items = await asyncio.gather(*coros)
+            except StopAsyncIteration:
+                break
+            else:
+                yield tuple(items)
+
+
+X = TypeVar("X", contravariant=True)
+Y = TypeVar("Y", covariant=True)
+
+
+class SmapCallable(Protocol[X, Y]):
+    def __call__(self, arg: X, /, *args: X) -> Y:
+        ...
+
+
+class AmapCallable(Protocol[X, Y]):
+    async def __call__(self, arg: X, /, *args: X) -> Y:
+        ...
+
+
+class MapCallable(Protocol[X, Y]):
+    def __call__(self, arg: X, /, *args: X) -> Awaitable[Y] | Y:
+        ...
+
+
+@pipable_operator
+async def smap(
+    source: AsyncIterable[T],
+    func: SmapCallable[T, U],
+    *more_sources: AsyncIterable[T],
+) -> AsyncIterator[U]:
+    """Apply a given function to the elements of one or several
+    asynchronous sequences.
+
+    Each element is used as a positional argument, using the same order as
+    their respective sources. The generation continues until the shortest
+    sequence is exhausted. The function is treated synchronously.
+
+    Note: if more than one sequence is provided, they're awaited concurrently
+    so that their waiting times don't add up.
+    """
+    stream = zip(source, *more_sources)
+    async with streamcontext(stream) as streamer:
+        async for item in streamer:
+            yield func(*item)
+
+
+@pipable_operator
+def amap(
+    source: AsyncIterable[T],
+    corofn: AmapCallable[T, U],
+    *more_sources: AsyncIterable[T],
+    ordered: bool = True,
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
+    """Apply a given coroutine function to the elements of one or several
+    asynchronous sequences.
+
+    Each element is used as a positional argument, using the same order as
+    their respective sources. The generation continues until the shortest
+    sequence is exhausted.
+
+    The results can either be returned in or out of order, depending on
+    the corresponding ``ordered`` argument.
+
+    The coroutines run concurrently but their amount can be limited using
+    the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+    to run sequentially.
+
+    If more than one sequence is provided, they're also awaited concurrently,
+    so that their waiting times don't add up.
+    """
+
+    async def func(arg: T, *args: T) -> AsyncIterable[U]:
+        yield await corofn(arg, *args)
+
+    if ordered:
+        return advanced.concatmap.raw(
+            source, func, *more_sources, task_limit=task_limit
+        )
+    return advanced.flatmap.raw(source, func, *more_sources, task_limit=task_limit)
+
+
+@pipable_operator
+def map(
+    source: AsyncIterable[T],
+    func: MapCallable[T, U],
+    *more_sources: AsyncIterable[T],
+    ordered: bool = True,
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
+    """Apply a given function to the elements of one or several
+    asynchronous sequences.
+
+    Each element is used as a positional argument, using the same order as
+    their respective sources. The generation continues until the shortest
+    sequence is exhausted. The function can either be synchronous or
+    asynchronous (coroutine function).
+
+    The results can either be returned in or out of order, depending on
+    the corresponding ``ordered`` argument. This argument is ignored if the
+    provided function is synchronous.
+
+    The coroutines run concurrently but their amount can be limited using
+    the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+    to run sequentially. This argument is ignored if the provided function
+    is synchronous.
+
+    If more than one sequence is provided, they're also awaited concurrently,
+    so that their waiting times don't add up.
+
+    It might happen that the provided function returns a coroutine but is not
+    a coroutine function per se. In this case, one can wrap the function with
+    ``aiostream.async_`` in order to force ``map`` to await the resulting
+    coroutine. The following example illustrates the use ``async_`` with a
+    lambda function::
+
+        from aiostream import stream, async_
+        ...
+        ys = stream.map(xs, async_(lambda ms: asyncio.sleep(ms / 1000)))
+    """
+    if asyncio.iscoroutinefunction(func):
+        return amap.raw(
+            source, func, *more_sources, ordered=ordered, task_limit=task_limit
+        )
+    sync_func = cast("SmapCallable[T, U]", func)
+    return smap.raw(source, sync_func, *more_sources)
+
+
+@pipable_operator
+def merge(
+    source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[T]:
+    """Merge several asynchronous sequences together.
+
+    All the sequences are iterated simultaneously and their elements
+    are forwarded as soon as they're available. The generation continues
+    until all the sequences are exhausted.
+    """
+    sources = [source, *more_sources]
+    source_stream: AsyncIterable[AsyncIterable[T]] = create.iterate.raw(sources)
+    return advanced.flatten.raw(source_stream)
+
+
+@pipable_operator
+def ziplatest(
+    source: AsyncIterable[T],
+    *more_sources: AsyncIterable[T],
+    partial: bool = True,
+    default: T | None = None,
+) -> AsyncIterator[tuple[T | None, ...]]:
+    """Combine several asynchronous sequences together, producing a tuple with
+    the lastest element of each sequence whenever a new element is received.
+
+    The value to use when a sequence has not procuded any element yet is given
+    by the ``default`` argument (defaulting to ``None``).
+
+    The producing of partial results can be disabled by setting the optional
+    argument ``partial`` to ``False``.
+
+    All the sequences are iterated simultaneously and their elements
+    are forwarded as soon as they're available. The generation continues
+    until all the sequences are exhausted.
+    """
+    sources = source, *more_sources
+    n = len(sources)
+
+    # Custom getter
+    def getter(dct: dict[int, T]) -> Callable[[int], T | None]:
+        return lambda key: dct.get(key, default)
+
+    # Add source index to the items
+    def make_func(i: int) -> SmapCallable[T, dict[int, T]]:
+        def func(x: T, *_: object) -> dict[int, T]:
+            return {i: x}
+
+        return func
+
+    new_sources = [smap.raw(source, make_func(i)) for i, source in enumerate(sources)]
+
+    # Merge the sources
+    merged = merge.raw(*new_sources)
+
+    # Accumulate the current state in a dict
+    accumulated = aggregate.accumulate.raw(merged, lambda x, e: {**x, **e})
+
+    # Filter partial result
+    filtered = (
+        accumulated
+        if partial
+        else select.filter.raw(accumulated, lambda x: len(x) == n)
+    )
+
+    # Convert the state dict to a tuple
+    def dict_to_tuple(x: dict[int, T], *_: object) -> tuple[T | None, ...]:
+        return tuple(builtins.map(getter(x), range(n)))
+
+    return smap.raw(filtered, dict_to_tuple)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/create.py b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py
new file mode 100644
index 00000000..d7676b71
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py
@@ -0,0 +1,191 @@
+"""Non-pipable creation operators."""
+from __future__ import annotations
+
+import sys
+import asyncio
+import inspect
+import builtins
+import itertools
+
+from typing import (
+    AsyncIterable,
+    Awaitable,
+    Iterable,
+    Protocol,
+    TypeVar,
+    AsyncIterator,
+    cast,
+)
+from typing_extensions import ParamSpec
+
+from ..stream import time
+from ..core import operator, streamcontext
+
+__all__ = [
+    "iterate",
+    "preserve",
+    "just",
+    "call",
+    "throw",
+    "empty",
+    "never",
+    "repeat",
+    "range",
+    "count",
+]
+
+T = TypeVar("T")
+P = ParamSpec("P")
+
+# Hack for python 3.8 compatibility
+if sys.version_info < (3, 9):
+    P = TypeVar("P")
+
+# Convert regular iterables
+
+
+@operator
+async def from_iterable(it: Iterable[T]) -> AsyncIterator[T]:
+    """Generate values from a regular iterable."""
+    for item in it:
+        await asyncio.sleep(0)
+        yield item
+
+
+@operator
+def from_async_iterable(ait: AsyncIterable[T]) -> AsyncIterator[T]:
+    """Generate values from an asynchronous iterable.
+
+    Note: the corresponding iterator will be explicitely closed
+    when leaving the context manager."""
+    return streamcontext(ait)
+
+
+@operator
+def iterate(it: AsyncIterable[T] | Iterable[T]) -> AsyncIterator[T]:
+    """Generate values from a sychronous or asynchronous iterable."""
+    if isinstance(it, AsyncIterable):
+        return from_async_iterable.raw(it)
+    if isinstance(it, Iterable):
+        return from_iterable.raw(it)
+    raise TypeError(f"{type(it).__name__!r} object is not (async) iterable")
+
+
+@operator
+async def preserve(ait: AsyncIterable[T]) -> AsyncIterator[T]:
+    """Generate values from an asynchronous iterable without
+    explicitly closing the corresponding iterator."""
+    async for item in ait:
+        yield item
+
+
+# Simple operators
+
+
+@operator
+async def just(value: T) -> AsyncIterator[T]:
+    """Await if possible, and generate a single value."""
+    if inspect.isawaitable(value):
+        yield await value
+    else:
+        yield value
+
+
+Y = TypeVar("Y", covariant=True)
+
+
+class SyncCallable(Protocol[P, Y]):
+    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Y:
+        ...
+
+
+class AsyncCallable(Protocol[P, Y]):
+    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[Y]:
+        ...
+
+
+@operator
+async def call(
+    func: SyncCallable[P, T] | AsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs
+) -> AsyncIterator[T]:
+    """Call the given function and generate a single value.
+
+    Await if the provided function is asynchronous.
+    """
+    if asyncio.iscoroutinefunction(func):
+        async_func = cast("AsyncCallable[P, T]", func)
+        yield await async_func(*args, **kwargs)
+    else:
+        sync_func = cast("SyncCallable[P, T]", func)
+        yield sync_func(*args, **kwargs)
+
+
+@operator
+async def throw(exc: Exception) -> AsyncIterator[None]:
+    """Throw an exception without generating any value."""
+    if False:
+        yield
+    raise exc
+
+
+@operator
+async def empty() -> AsyncIterator[None]:
+    """Terminate without generating any value."""
+    if False:
+        yield
+
+
+@operator
+async def never() -> AsyncIterator[None]:
+    """Hang forever without generating any value."""
+    if False:
+        yield
+    future: asyncio.Future[None] = asyncio.Future()
+    try:
+        await future
+    finally:
+        future.cancel()
+
+
+@operator
+def repeat(
+    value: T, times: int | None = None, *, interval: float = 0.0
+) -> AsyncIterator[T]:
+    """Generate the same value a given number of times.
+
+    If ``times`` is ``None``, the value is repeated indefinitely.
+    An optional interval can be given to space the values out.
+    """
+    args = () if times is None else (times,)
+    it = itertools.repeat(value, *args)
+    agen = from_iterable.raw(it)
+    return time.spaceout.raw(agen, interval) if interval else agen
+
+
+# Counting operators
+
+
+@operator
+def range(*args: int, interval: float = 0.0) -> AsyncIterator[int]:
+    """Generate a given range of numbers.
+
+    It supports the same arguments as the builtin function.
+    An optional interval can be given to space the values out.
+    """
+    agen = from_iterable.raw(builtins.range(*args))
+    return time.spaceout.raw(agen, interval) if interval else agen
+
+
+@operator
+def count(
+    start: int = 0, step: int = 1, *, interval: float = 0.0
+) -> AsyncIterator[int]:
+    """Generate consecutive numbers indefinitely.
+
+    Optional starting point and increment can be defined,
+    respectively defaulting to ``0`` and ``1``.
+
+    An optional interval can be given to space the values out.
+    """
+    agen = from_iterable.raw(itertools.count(start, step))
+    return time.spaceout.raw(agen, interval) if interval else agen
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
new file mode 100644
index 00000000..1be834e6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
@@ -0,0 +1,83 @@
+"""Extra operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+
+from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any
+
+from .combine import amap, smap
+from ..core import pipable_operator
+
+__all__ = ["action", "print"]
+
+
+T = TypeVar("T")
+
+
+@pipable_operator
+def action(
+    source: AsyncIterable[T],
+    func: Callable[[T], Awaitable[Any] | Any],
+    ordered: bool = True,
+    task_limit: int | None = None,
+) -> AsyncIterator[T]:
+    """Perform an action for each element of an asynchronous sequence
+    without modifying it.
+
+    The given function can be synchronous or asynchronous.
+
+    The results can either be returned in or out of order, depending on
+    the corresponding ``ordered`` argument. This argument is ignored if the
+    provided function is synchronous.
+
+    The coroutines run concurrently but their amount can be limited using
+    the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+    to run sequentially. This argument is ignored if the provided function
+    is synchronous.
+    """
+    if asyncio.iscoroutinefunction(func):
+
+        async def ainnerfunc(arg: T, *_: object) -> T:
+            awaitable = func(arg)
+            assert isinstance(awaitable, Awaitable)
+            await awaitable
+            return arg
+
+        return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit)
+
+    else:
+
+        def innerfunc(arg: T, *_: object) -> T:
+            func(arg)
+            return arg
+
+        return smap.raw(source, innerfunc)
+
+
+@pipable_operator
+def print(
+    source: AsyncIterable[T],
+    template: str = "{}",
+    sep: str = " ",
+    end: str = "\n",
+    file: Any | None = None,
+    flush: bool = False,
+) -> AsyncIterator[T]:
+    """Print each element of an asynchronous sequence without modifying it.
+
+    An optional template can be provided to be formatted with the elements.
+    All the keyword arguments are forwarded to the builtin function print.
+    """
+
+    def func(value: T) -> None:
+        string = template.format(value)
+        builtins.print(
+            string,
+            sep=sep,
+            end=end,
+            file=file,
+            flush=flush,
+        )
+
+    return action.raw(source, func)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/select.py b/.venv/lib/python3.12/site-packages/aiostream/stream/select.py
new file mode 100644
index 00000000..9390f464
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/select.py
@@ -0,0 +1,284 @@
+"""Selection operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+import collections
+
+from typing import Awaitable, Callable, TypeVar, AsyncIterable, AsyncIterator
+
+from . import transform
+from ..aiter_utils import aiter, anext
+from ..core import streamcontext, pipable_operator
+
+__all__ = [
+    "take",
+    "takelast",
+    "skip",
+    "skiplast",
+    "getitem",
+    "filter",
+    "until",
+    "dropwhile",
+    "takewhile",
+]
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def take(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+    """Forward the first ``n`` elements from an asynchronous sequence.
+
+    If ``n`` is negative, it simply terminates before iterating the source.
+    """
+    enumerated = transform.enumerate.raw(source)
+    async with streamcontext(enumerated) as streamer:
+        if n <= 0:
+            return
+        async for i, item in streamer:
+            yield item
+            if i >= n - 1:
+                return
+
+
+@pipable_operator
+async def takelast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+    """Forward the last ``n`` elements from an asynchronous sequence.
+
+    If ``n`` is negative, it simply terminates after iterating the source.
+
+    Note: it is required to reach the end of the source before the first
+    element is generated.
+    """
+    queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            queue.append(item)
+        for item in queue:
+            yield item
+
+
+@pipable_operator
+async def skip(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+    """Forward an asynchronous sequence, skipping the first ``n`` elements.
+
+    If ``n`` is negative, no elements are skipped.
+    """
+    enumerated = transform.enumerate.raw(source)
+    async with streamcontext(enumerated) as streamer:
+        async for i, item in streamer:
+            if i >= n:
+                yield item
+
+
+@pipable_operator
+async def skiplast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+    """Forward an asynchronous sequence, skipping the last ``n`` elements.
+
+    If ``n`` is negative, no elements are skipped.
+
+    Note: it is required to reach the ``n+1`` th element of the source
+    before the first element is generated.
+    """
+    queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            if n <= 0:
+                yield item
+                continue
+            if len(queue) == n:
+                yield queue[0]
+            queue.append(item)
+
+
+@pipable_operator
+async def filterindex(
+    source: AsyncIterable[T], func: Callable[[int], bool]
+) -> AsyncIterator[T]:
+    """Filter an asynchronous sequence using the index of the elements.
+
+    The given function is synchronous, takes the index as an argument,
+    and returns ``True`` if the corresponding should be forwarded,
+    ``False`` otherwise.
+    """
+    enumerated = transform.enumerate.raw(source)
+    async with streamcontext(enumerated) as streamer:
+        async for i, item in streamer:
+            if func(i):
+                yield item
+
+
+@pipable_operator
+def slice(source: AsyncIterable[T], *args: int) -> AsyncIterator[T]:
+    """Slice an asynchronous sequence.
+
+    The arguments are the same as the builtin type slice.
+
+    There are two limitations compare to regular slices:
+    - Positive stop index with negative start index is not supported
+    - Negative step is not supported
+    """
+    s = builtins.slice(*args)
+    start, stop, step = s.start or 0, s.stop, s.step or 1
+    aiterator = aiter(source)
+    # Filter the first items
+    if start < 0:
+        aiterator = takelast.raw(aiterator, abs(start))
+    elif start > 0:
+        aiterator = skip.raw(aiterator, start)
+    # Filter the last items
+    if stop is not None:
+        if stop >= 0 and start < 0:
+            raise ValueError("Positive stop with negative start is not supported")
+        elif stop >= 0:
+            aiterator = take.raw(aiterator, stop - start)
+        else:
+            aiterator = skiplast.raw(aiterator, abs(stop))
+    # Filter step items
+    if step is not None:
+        if step > 1:
+            aiterator = filterindex.raw(aiterator, lambda i: i % step == 0)
+        elif step < 0:
+            raise ValueError("Negative step not supported")
+    # Return
+    return aiterator
+
+
+@pipable_operator
+async def item(source: AsyncIterable[T], index: int) -> AsyncIterator[T]:
+    """Forward the ``n``th element of an asynchronous sequence.
+
+    The index can be negative and works like regular indexing.
+    If the index is out of range, and ``IndexError`` is raised.
+    """
+    # Prepare
+    if index >= 0:
+        source = skip.raw(source, index)
+    else:
+        source = takelast(source, abs(index))
+    async with streamcontext(source) as streamer:
+        # Get first item
+        try:
+            result = await anext(streamer)
+        except StopAsyncIteration:
+            raise IndexError("Index out of range")
+        # Check length
+        if index < 0:
+            count = 1
+            async for _ in streamer:
+                count += 1
+            if count != abs(index):
+                raise IndexError("Index out of range")
+        # Yield result
+        yield result
+
+
+@pipable_operator
+def getitem(source: AsyncIterable[T], index: int | builtins.slice) -> AsyncIterator[T]:
+    """Forward one or several items from an asynchronous sequence.
+
+    The argument can either be a slice or an integer.
+    See the slice and item operators for more information.
+    """
+    if isinstance(index, builtins.slice):
+        return slice.raw(source, index.start, index.stop, index.step)
+    if isinstance(index, int):
+        return item.raw(source, index)
+    raise TypeError("Not a valid index (int or slice)")
+
+
+@pipable_operator
+async def filter(
+    source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+    """Filter an asynchronous sequence using an arbitrary function.
+
+    The function takes the item as an argument and returns ``True``
+    if it should be forwarded, ``False`` otherwise.
+    The function can either be synchronous or asynchronous.
+    """
+    iscorofunc = asyncio.iscoroutinefunction(func)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            result = func(item)
+            if iscorofunc:
+                assert isinstance(result, Awaitable)
+                result = await result
+            if result:
+                yield item
+
+
+@pipable_operator
+async def until(
+    source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+    """Forward an asynchronous sequence until a condition is met.
+
+    Contrary to the ``takewhile`` operator, the last tested element is included
+    in the sequence.
+
+    The given function takes the item as an argument and returns a boolean
+    corresponding to the condition to meet. The function can either be
+    synchronous or asynchronous.
+    """
+    iscorofunc = asyncio.iscoroutinefunction(func)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            result = func(item)
+            if iscorofunc:
+                assert isinstance(result, Awaitable)
+                result = await result
+            yield item
+            if result:
+                return
+
+
+@pipable_operator
+async def takewhile(
+    source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+    """Forward an asynchronous sequence while a condition is met.
+
+    Contrary to the ``until`` operator, the last tested element is not included
+    in the sequence.
+
+    The given function takes the item as an argument and returns a boolean
+    corresponding to the condition to meet. The function can either be
+    synchronous or asynchronous.
+    """
+    iscorofunc = asyncio.iscoroutinefunction(func)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            result = func(item)
+            if iscorofunc:
+                assert isinstance(result, Awaitable)
+                result = await result
+            if not result:
+                return
+            yield item
+
+
+@pipable_operator
+async def dropwhile(
+    source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+    """Discard the elements from an asynchronous sequence
+    while a condition is met.
+
+    The given function takes the item as an argument and returns a boolean
+    corresponding to the condition to meet. The function can either be
+    synchronous or asynchronous.
+    """
+    iscorofunc = asyncio.iscoroutinefunction(func)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            result = func(item)
+            if iscorofunc:
+                assert isinstance(result, Awaitable)
+                result = await result
+            if not result:
+                yield item
+                break
+        async for item in streamer:
+            yield item
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/time.py b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
new file mode 100644
index 00000000..c9c0df88
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
@@ -0,0 +1,56 @@
+"""Time-specific operators."""
+from __future__ import annotations
+import asyncio
+
+from ..aiter_utils import anext
+from ..core import streamcontext, pipable_operator
+
+from typing import TypeVar, AsyncIterable, AsyncIterator
+
+__all__ = ["spaceout", "delay", "timeout"]
+
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]:
+    """Make sure the elements of an asynchronous sequence are separated
+    in time by the given interval.
+    """
+    timeout = 0.0
+    loop = asyncio.get_event_loop()
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            delta = timeout - loop.time()
+            delay = delta if delta > 0 else 0.0
+            await asyncio.sleep(delay)
+            yield item
+            timeout = loop.time() + interval
+
+
+@pipable_operator
+async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]:
+    """Raise a time-out if an element of the asynchronous sequence
+    takes too long to arrive.
+
+    Note: the timeout is not global but specific to each step of
+    the iteration.
+    """
+    async with streamcontext(source) as streamer:
+        while True:
+            try:
+                item = await asyncio.wait_for(anext(streamer), timeout)
+            except StopAsyncIteration:
+                break
+            else:
+                yield item
+
+
+@pipable_operator
+async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]:
+    """Delay the iteration of an asynchronous sequence."""
+    await asyncio.sleep(delay)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            yield item
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py b/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py
new file mode 100644
index 00000000..f11bffa6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py
@@ -0,0 +1,128 @@
+"""Transformation operators."""
+
+from __future__ import annotations
+
+import asyncio
+import itertools
+from typing import (
+    Protocol,
+    TypeVar,
+    AsyncIterable,
+    AsyncIterator,
+    Awaitable,
+    cast,
+)
+
+from ..core import streamcontext, pipable_operator
+
+from . import select
+from . import create
+from . import aggregate
+from .combine import map, amap, smap
+
+__all__ = ["map", "enumerate", "starmap", "cycle", "chunks"]
+
+# map, amap and smap are also transform operators
+map, amap, smap
+
+T = TypeVar("T")
+U = TypeVar("U")
+
+
+@pipable_operator
+async def enumerate(
+    source: AsyncIterable[T], start: int = 0, step: int = 1
+) -> AsyncIterator[tuple[int, T]]:
+    """Generate ``(index, value)`` tuples from an asynchronous sequence.
+
+    This index is computed using a starting point and an increment,
+    respectively defaulting to ``0`` and ``1``.
+    """
+    count = itertools.count(start, step)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            yield next(count), item
+
+
+X = TypeVar("X", contravariant=True)
+Y = TypeVar("Y", covariant=True)
+
+
+class AsyncStarmapCallable(Protocol[X, Y]):
+    def __call__(self, arg: X, /, *args: X) -> Awaitable[Y]:
+        ...
+
+
+class SyncStarmapCallable(Protocol[X, Y]):
+    def __call__(self, arg: X, /, *args: X) -> Y:
+        ...
+
+
+@pipable_operator
+def starmap(
+    source: AsyncIterable[tuple[T, ...]],
+    func: SyncStarmapCallable[T, U] | AsyncStarmapCallable[T, U],
+    ordered: bool = True,
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
+    """Apply a given function to the unpacked elements of
+    an asynchronous sequence.
+
+    Each element is unpacked before applying the function.
+    The given function can either be synchronous or asynchronous.
+
+    The results can either be returned in or out of order, depending on
+    the corresponding ``ordered`` argument. This argument is ignored if
+    the provided function is synchronous.
+
+    The coroutines run concurrently but their amount can be limited using
+    the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+    to run sequentially. This argument is ignored if the provided function
+    is synchronous.
+    """
+    if asyncio.iscoroutinefunction(func):
+        async_func = cast("AsyncStarmapCallable[T, U]", func)
+
+        async def astarfunc(args: tuple[T, ...], *_: object) -> U:
+            awaitable = async_func(*args)
+            return await awaitable
+
+        return amap.raw(source, astarfunc, ordered=ordered, task_limit=task_limit)
+
+    else:
+        sync_func = cast("SyncStarmapCallable[T, U]", func)
+
+        def starfunc(args: tuple[T, ...], *_: object) -> U:
+            return sync_func(*args)
+
+        return smap.raw(source, starfunc)
+
+
+@pipable_operator
+async def cycle(source: AsyncIterable[T]) -> AsyncIterator[T]:
+    """Iterate indefinitely over an asynchronous sequence.
+
+    Note: it does not perform any buffering, but re-iterate over
+    the same given sequence instead. If the sequence is not
+    re-iterable, the generator might end up looping indefinitely
+    without yielding any item.
+    """
+    while True:
+        async with streamcontext(source) as streamer:
+            async for item in streamer:
+                yield item
+            # Prevent blocking while loop if the stream is empty
+            await asyncio.sleep(0)
+
+
+@pipable_operator
+async def chunks(source: AsyncIterable[T], n: int) -> AsyncIterator[list[T]]:
+    """Generate chunks of size ``n`` from an asynchronous sequence.
+
+    The chunks are lists, and the last chunk might contain less than ``n``
+    elements.
+    """
+    async with streamcontext(source) as streamer:
+        async for first in streamer:
+            xs = select.take(create.preserve(streamer), n - 1)
+            yield [first] + await aggregate.list(xs)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/test_utils.py b/.venv/lib/python3.12/site-packages/aiostream/test_utils.py
new file mode 100644
index 00000000..56761d2e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/test_utils.py
@@ -0,0 +1,173 @@
+"""Utilities for testing stream operators."""
+from __future__ import annotations
+
+import asyncio
+from unittest.mock import Mock
+from contextlib import contextmanager
+
+import pytest
+
+from .core import StreamEmpty, streamcontext, pipable_operator
+from typing import TYPE_CHECKING, Any, Callable, List
+
+if TYPE_CHECKING:
+    from _pytest.fixtures import SubRequest
+    from aiostream.core import Stream
+
+__all__ = ["add_resource", "assert_run", "event_loop"]
+
+
+@pipable_operator
+async def add_resource(source, cleanup_time):
+    """Simulate an open resource in a stream operator."""
+    try:
+        loop = asyncio.get_event_loop()
+        loop.open_resources += 1
+        loop.resources += 1
+        async with streamcontext(source) as streamer:
+            async for item in streamer:
+                yield item
+    finally:
+        try:
+            await asyncio.sleep(cleanup_time)
+        finally:
+            loop.open_resources -= 1
+
+
+def compare_exceptions(
+    exc1: Exception,
+    exc2: Exception,
+) -> bool:
+    """Compare two exceptions together."""
+    return exc1 == exc2 or exc1.__class__ == exc2.__class__ and exc1.args == exc2.args
+
+
+async def assert_aiter(
+    source: Stream,
+    values: List[Any],
+    exception: Exception | None = None,
+) -> None:
+    """Check the results of a stream using a streamcontext."""
+    results = []
+    exception_type = (type(exception),) if exception else ()
+    try:
+        async with streamcontext(source) as streamer:
+            async for item in streamer:
+                results.append(item)
+    except exception_type as exc:
+        assert exception is not None
+        assert compare_exceptions(exc, exception)
+    else:
+        assert exception is None
+    assert results == values
+
+
+async def assert_await(
+    source: Stream,
+    values: List[Any],
+    exception: Exception | None = None,
+) -> None:
+    """Check the results of a stream using by awaiting it."""
+    exception_type = (type(exception),) if exception else ()
+    try:
+        result = await source
+    except StreamEmpty:
+        assert values == []
+        assert exception is None
+    except exception_type as exc:
+        assert exception is not None
+        assert compare_exceptions(exc, exception)
+    else:
+        assert result == values[-1]
+        assert exception is None
+
+
+@pytest.fixture(params=[assert_aiter, assert_await], ids=["aiter", "await"])
+def assert_run(request: SubRequest) -> Callable:
+    """Parametrized fixture returning a stream runner."""
+    return request.param
+
+
+@pytest.fixture
+def event_loop():
+    """Fixture providing a test event loop.
+
+    The event loop simulate and records the sleep operation,
+    available as event_loop.steps
+
+    It also tracks simulated resources and make sure they are
+    all released before the loop is closed.
+    """
+
+    class TimeTrackingTestLoop(asyncio.BaseEventLoop):
+        stuck_threshold = 100
+
+        def __init__(self):
+            super().__init__()
+            self._time = 0
+            self._timers = []
+            self._selector = Mock()
+            self.clear()
+
+        # Loop internals
+
+        def _run_once(self):
+            super()._run_once()
+            # Update internals
+            self.busy_count += 1
+            self._timers = sorted(when for when in self._timers if when > loop.time())
+            # Time advance
+            if self.time_to_go:
+                when = self._timers.pop(0)
+                step = when - loop.time()
+                self.steps.append(step)
+                self.advance_time(step)
+                self.busy_count = 0
+
+        def _process_events(self, event_list):
+            return
+
+        def _write_to_self(self):
+            return
+
+        # Time management
+
+        def time(self):
+            return self._time
+
+        def advance_time(self, advance):
+            if advance:
+                self._time += advance
+
+        def call_at(self, when, callback, *args, **kwargs):
+            self._timers.append(when)
+            return super().call_at(when, callback, *args, **kwargs)
+
+        @property
+        def stuck(self):
+            return self.busy_count > self.stuck_threshold
+
+        @property
+        def time_to_go(self):
+            return self._timers and (self.stuck or not self._ready)
+
+        # Resource management
+
+        def clear(self):
+            self.steps = []
+            self.open_resources = 0
+            self.resources = 0
+            self.busy_count = 0
+
+        @contextmanager
+        def assert_cleanup(self):
+            self.clear()
+            yield self
+            assert self.open_resources == 0
+            self.clear()
+
+    loop = TimeTrackingTestLoop()
+    asyncio.set_event_loop(loop)
+    with loop.assert_cleanup():
+        yield loop
+    loop.close()