about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/aiostream/stream
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/aiostream/stream
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiostream/stream')
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py10
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py222
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py90
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/combine.py282
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/create.py191
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/misc.py83
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/select.py284
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/time.py56
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/transform.py128
9 files changed, 1346 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py b/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py
new file mode 100644
index 00000000..cd20d6c9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py
@@ -0,0 +1,10 @@
+"""Gather all the stream operators."""
+
+from .create import *
+from .transform import *
+from .select import *
+from .combine import *
+from .aggregate import *
+from .time import *
+from .misc import *
+from .advanced import *
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py b/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py
new file mode 100644
index 00000000..106f0f1d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py
@@ -0,0 +1,222 @@
+"""Advanced operators (to deal with streams of higher order) ."""
+from __future__ import annotations
+
+from typing import AsyncIterator, AsyncIterable, TypeVar, Union, cast
+from typing_extensions import ParamSpec
+
+from . import combine
+
+from ..core import Streamer, pipable_operator
+from ..manager import StreamerManager
+
+
+__all__ = ["concat", "flatten", "switch", "concatmap", "flatmap", "switchmap"]
+
+
+T = TypeVar("T")
+U = TypeVar("U")
+P = ParamSpec("P")
+
+
+# Helper to manage stream of higher order
+
+
+@pipable_operator
+async def base_combine(
+    source: AsyncIterable[AsyncIterable[T]],
+    switch: bool = False,
+    ordered: bool = False,
+    task_limit: int | None = None,
+) -> AsyncIterator[T]:
+    """Base operator for managing an asynchronous sequence of sequences.
+
+    The sequences are awaited concurrently, although it's possible to limit
+    the amount of running sequences using the `task_limit` argument.
+
+    The ``switch`` argument enables the switch mecanism, which cause the
+    previous subsequence to be discarded when a new one is created.
+
+    The items can either be generated in order or as soon as they're received,
+    depending on the ``ordered`` argument.
+    """
+
+    # Task limit
+    if task_limit is not None and not task_limit > 0:
+        raise ValueError("The task limit must be None or greater than 0")
+
+    # Safe context
+    async with StreamerManager[Union[AsyncIterable[T], T]]() as manager:
+        main_streamer: Streamer[
+            AsyncIterable[T] | T
+        ] | None = await manager.enter_and_create_task(source)
+
+        # Loop over events
+        while manager.tasks:
+            # Extract streamer groups
+            substreamers = manager.streamers[1:]
+            mainstreamers = [main_streamer] if main_streamer in manager.tasks else []
+
+            # Switch - use the main streamer then the substreamer
+            if switch:
+                filters = mainstreamers + substreamers
+            # Concat - use the first substreamer then the main streamer
+            elif ordered:
+                filters = substreamers[:1] + mainstreamers
+            # Flat - use the substreamers then the main streamer
+            else:
+                filters = substreamers + mainstreamers
+
+            # Wait for next event
+            streamer, task = await manager.wait_single_event(filters)
+
+            # Get result
+            try:
+                result = task.result()
+
+            # End of stream
+            except StopAsyncIteration:
+                # Main streamer is finished
+                if streamer is main_streamer:
+                    main_streamer = None
+
+                # A substreamer is finished
+                else:
+                    await manager.clean_streamer(streamer)
+
+                    # Re-schedule the main streamer if necessary
+                    if main_streamer is not None and main_streamer not in manager.tasks:
+                        manager.create_task(main_streamer)
+
+            # Process result
+            else:
+                # Switch mecanism
+                if switch and streamer is main_streamer:
+                    await manager.clean_streamers(substreamers)
+
+                # Setup a new source
+                if streamer is main_streamer:
+                    assert isinstance(result, AsyncIterable)
+                    await manager.enter_and_create_task(result)
+
+                    # Re-schedule the main streamer if task limit allows it
+                    if task_limit is None or task_limit > len(manager.tasks):
+                        manager.create_task(streamer)
+
+                # Yield the result
+                else:
+                    item = cast("T", result)
+                    yield item
+
+                    # Re-schedule the streamer
+                    manager.create_task(streamer)
+
+
+# Advanced operators (for streams of higher order)
+
+
+@pipable_operator
+def concat(
+    source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None
+) -> AsyncIterator[T]:
+    """Given an asynchronous sequence of sequences, generate the elements
+    of the sequences in order.
+
+    The sequences are awaited concurrently, although it's possible to limit
+    the amount of running sequences using the `task_limit` argument.
+
+    Errors raised in the source or an element sequence are propagated.
+    """
+    return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=True)
+
+
+@pipable_operator
+def flatten(
+    source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None
+) -> AsyncIterator[T]:
+    """Given an asynchronous sequence of sequences, generate the elements
+    of the sequences as soon as they're received.
+
+    The sequences are awaited concurrently, although it's possible to limit
+    the amount of running sequences using the `task_limit` argument.
+
+    Errors raised in the source or an element sequence are propagated.
+    """
+    return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=False)
+
+
+@pipable_operator
+def switch(source: AsyncIterable[AsyncIterable[T]]) -> AsyncIterator[T]:
+    """Given an asynchronous sequence of sequences, generate the elements of
+    the most recent sequence.
+
+    Element sequences are generated eagerly, and closed once they are
+    superseded by a more recent sequence. Once the main sequence is finished,
+    the last subsequence will be exhausted completely.
+
+    Errors raised in the source or an element sequence (that was not already
+    closed) are propagated.
+    """
+    return base_combine.raw(source, switch=True)
+
+
+# Advanced *-map operators
+
+
+@pipable_operator
+def concatmap(
+    source: AsyncIterable[T],
+    func: combine.SmapCallable[T, AsyncIterable[U]],
+    *more_sources: AsyncIterable[T],
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
+    """Apply a given function that creates a sequence from the elements of one
+    or several asynchronous sequences, and generate the elements of the created
+    sequences in order.
+
+    The function is applied as described in `map`, and must return an
+    asynchronous sequence. The returned sequences are awaited concurrently,
+    although it's possible to limit the amount of running sequences using
+    the `task_limit` argument.
+    """
+    mapped = combine.smap.raw(source, func, *more_sources)
+    return concat.raw(mapped, task_limit=task_limit)
+
+
+@pipable_operator
+def flatmap(
+    source: AsyncIterable[T],
+    func: combine.SmapCallable[T, AsyncIterable[U]],
+    *more_sources: AsyncIterable[T],
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
+    """Apply a given function that creates a sequence from the elements of one
+    or several asynchronous sequences, and generate the elements of the created
+    sequences as soon as they arrive.
+
+    The function is applied as described in `map`, and must return an
+    asynchronous sequence. The returned sequences are awaited concurrently,
+    although it's possible to limit the amount of running sequences using
+    the `task_limit` argument.
+
+    Errors raised in a source or output sequence are propagated.
+    """
+    mapped = combine.smap.raw(source, func, *more_sources)
+    return flatten.raw(mapped, task_limit=task_limit)
+
+
+@pipable_operator
+def switchmap(
+    source: AsyncIterable[T],
+    func: combine.SmapCallable[T, AsyncIterable[U]],
+    *more_sources: AsyncIterable[T],
+) -> AsyncIterator[U]:
+    """Apply a given function that creates a sequence from the elements of one
+    or several asynchronous sequences and generate the elements of the most
+    recently created sequence.
+
+    The function is applied as described in `map`, and must return an
+    asynchronous sequence. Errors raised in a source or output sequence (that
+    was not already closed) are propagated.
+    """
+    mapped = combine.smap.raw(source, func, *more_sources)
+    return switch.raw(mapped)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py b/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py
new file mode 100644
index 00000000..8ed7c0e1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py
@@ -0,0 +1,90 @@
+"""Aggregation operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+import operator as op
+from typing import AsyncIterator, Awaitable, Callable, TypeVar, AsyncIterable, cast
+
+
+from . import select
+from ..aiter_utils import anext
+from ..core import pipable_operator, streamcontext
+
+__all__ = ["accumulate", "reduce", "list"]
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def accumulate(
+    source: AsyncIterable[T],
+    func: Callable[[T, T], Awaitable[T] | T] = op.add,
+    initializer: T | None = None,
+) -> AsyncIterator[T]:
+    """Generate a series of accumulated sums (or other binary function)
+    from an asynchronous sequence.
+
+    If ``initializer`` is present, it is placed before the items
+    of the sequence in the calculation, and serves as a default
+    when the sequence is empty.
+    """
+    iscorofunc = asyncio.iscoroutinefunction(func)
+    async with streamcontext(source) as streamer:
+        # Initialize
+        if initializer is None:
+            try:
+                value = await anext(streamer)
+            except StopAsyncIteration:
+                return
+        else:
+            value = initializer
+        # First value
+        yield value
+        # Iterate streamer
+        async for item in streamer:
+            returned = func(value, item)
+            if iscorofunc:
+                awaitable_value = cast("Awaitable[T]", returned)
+                value = await awaitable_value
+            else:
+                value = cast("T", returned)
+            yield value
+
+
+@pipable_operator
+def reduce(
+    source: AsyncIterable[T],
+    func: Callable[[T, T], Awaitable[T] | T],
+    initializer: T | None = None,
+) -> AsyncIterator[T]:
+    """Apply a function of two arguments cumulatively to the items
+    of an asynchronous sequence, reducing the sequence to a single value.
+
+    If ``initializer`` is present, it is placed before the items
+    of the sequence in the calculation, and serves as a default when the
+    sequence is empty.
+    """
+    acc = accumulate.raw(source, func, initializer)
+    return select.item.raw(acc, -1)
+
+
+@pipable_operator
+async def list(source: AsyncIterable[T]) -> AsyncIterator[builtins.list[T]]:
+    """Build a list from an asynchronous sequence.
+
+    All the intermediate steps are generated, starting from the empty list.
+
+    This operator can be used to easily convert a stream into a list::
+
+        lst = await stream.list(x)
+
+    ..note:: The same list object is produced at each step in order to avoid
+    memory copies.
+    """
+    result: builtins.list[T] = []
+    yield result
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            result.append(item)
+            yield result
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py b/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py
new file mode 100644
index 00000000..a782a730
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py
@@ -0,0 +1,282 @@
+"""Combination operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+
+from typing import (
+    Awaitable,
+    Protocol,
+    TypeVar,
+    AsyncIterable,
+    AsyncIterator,
+    Callable,
+    cast,
+)
+from typing_extensions import ParamSpec
+
+from ..aiter_utils import AsyncExitStack, anext
+from ..core import streamcontext, pipable_operator
+
+from . import create
+from . import select
+from . import advanced
+from . import aggregate
+
+__all__ = ["chain", "zip", "map", "merge", "ziplatest", "amap", "smap"]
+
+T = TypeVar("T")
+U = TypeVar("U")
+K = TypeVar("K")
+P = ParamSpec("P")
+
+
+@pipable_operator
+async def chain(
+    source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[T]:
+    """Chain asynchronous sequences together, in the order they are given.
+
+    Note: the sequences are not iterated until it is required,
+    so if the operation is interrupted, the remaining sequences
+    will be left untouched.
+    """
+    sources = source, *more_sources
+    for source in sources:
+        async with streamcontext(source) as streamer:
+            async for item in streamer:
+                yield item
+
+
+@pipable_operator
+async def zip(
+    source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[tuple[T, ...]]:
+    """Combine and forward the elements of several asynchronous sequences.
+
+    Each generated value is a tuple of elements, using the same order as
+    their respective sources. The generation continues until the shortest
+    sequence is exhausted.
+
+    Note: the different sequences are awaited in parrallel, so that their
+    waiting times don't add up.
+    """
+    sources = source, *more_sources
+
+    # One sources
+    if len(sources) == 1:
+        (source,) = sources
+        async with streamcontext(source) as streamer:
+            async for item in streamer:
+                yield (item,)
+        return
+
+    # N sources
+    async with AsyncExitStack() as stack:
+        # Handle resources
+        streamers = [
+            await stack.enter_async_context(streamcontext(source)) for source in sources
+        ]
+        # Loop over items
+        while True:
+            try:
+                coros = builtins.map(anext, streamers)
+                items = await asyncio.gather(*coros)
+            except StopAsyncIteration:
+                break
+            else:
+                yield tuple(items)
+
+
+X = TypeVar("X", contravariant=True)
+Y = TypeVar("Y", covariant=True)
+
+
+class SmapCallable(Protocol[X, Y]):
+    def __call__(self, arg: X, /, *args: X) -> Y:
+        ...
+
+
+class AmapCallable(Protocol[X, Y]):
+    async def __call__(self, arg: X, /, *args: X) -> Y:
+        ...
+
+
+class MapCallable(Protocol[X, Y]):
+    def __call__(self, arg: X, /, *args: X) -> Awaitable[Y] | Y:
+        ...
+
+
+@pipable_operator
+async def smap(
+    source: AsyncIterable[T],
+    func: SmapCallable[T, U],
+    *more_sources: AsyncIterable[T],
+) -> AsyncIterator[U]:
+    """Apply a given function to the elements of one or several
+    asynchronous sequences.
+
+    Each element is used as a positional argument, using the same order as
+    their respective sources. The generation continues until the shortest
+    sequence is exhausted. The function is treated synchronously.
+
+    Note: if more than one sequence is provided, they're awaited concurrently
+    so that their waiting times don't add up.
+    """
+    stream = zip(source, *more_sources)
+    async with streamcontext(stream) as streamer:
+        async for item in streamer:
+            yield func(*item)
+
+
+@pipable_operator
+def amap(
+    source: AsyncIterable[T],
+    corofn: AmapCallable[T, U],
+    *more_sources: AsyncIterable[T],
+    ordered: bool = True,
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
+    """Apply a given coroutine function to the elements of one or several
+    asynchronous sequences.
+
+    Each element is used as a positional argument, using the same order as
+    their respective sources. The generation continues until the shortest
+    sequence is exhausted.
+
+    The results can either be returned in or out of order, depending on
+    the corresponding ``ordered`` argument.
+
+    The coroutines run concurrently but their amount can be limited using
+    the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+    to run sequentially.
+
+    If more than one sequence is provided, they're also awaited concurrently,
+    so that their waiting times don't add up.
+    """
+
+    async def func(arg: T, *args: T) -> AsyncIterable[U]:
+        yield await corofn(arg, *args)
+
+    if ordered:
+        return advanced.concatmap.raw(
+            source, func, *more_sources, task_limit=task_limit
+        )
+    return advanced.flatmap.raw(source, func, *more_sources, task_limit=task_limit)
+
+
+@pipable_operator
+def map(
+    source: AsyncIterable[T],
+    func: MapCallable[T, U],
+    *more_sources: AsyncIterable[T],
+    ordered: bool = True,
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
+    """Apply a given function to the elements of one or several
+    asynchronous sequences.
+
+    Each element is used as a positional argument, using the same order as
+    their respective sources. The generation continues until the shortest
+    sequence is exhausted. The function can either be synchronous or
+    asynchronous (coroutine function).
+
+    The results can either be returned in or out of order, depending on
+    the corresponding ``ordered`` argument. This argument is ignored if the
+    provided function is synchronous.
+
+    The coroutines run concurrently but their amount can be limited using
+    the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+    to run sequentially. This argument is ignored if the provided function
+    is synchronous.
+
+    If more than one sequence is provided, they're also awaited concurrently,
+    so that their waiting times don't add up.
+
+    It might happen that the provided function returns a coroutine but is not
+    a coroutine function per se. In this case, one can wrap the function with
+    ``aiostream.async_`` in order to force ``map`` to await the resulting
+    coroutine. The following example illustrates the use ``async_`` with a
+    lambda function::
+
+        from aiostream import stream, async_
+        ...
+        ys = stream.map(xs, async_(lambda ms: asyncio.sleep(ms / 1000)))
+    """
+    if asyncio.iscoroutinefunction(func):
+        return amap.raw(
+            source, func, *more_sources, ordered=ordered, task_limit=task_limit
+        )
+    sync_func = cast("SmapCallable[T, U]", func)
+    return smap.raw(source, sync_func, *more_sources)
+
+
+@pipable_operator
+def merge(
+    source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[T]:
+    """Merge several asynchronous sequences together.
+
+    All the sequences are iterated simultaneously and their elements
+    are forwarded as soon as they're available. The generation continues
+    until all the sequences are exhausted.
+    """
+    sources = [source, *more_sources]
+    source_stream: AsyncIterable[AsyncIterable[T]] = create.iterate.raw(sources)
+    return advanced.flatten.raw(source_stream)
+
+
+@pipable_operator
+def ziplatest(
+    source: AsyncIterable[T],
+    *more_sources: AsyncIterable[T],
+    partial: bool = True,
+    default: T | None = None,
+) -> AsyncIterator[tuple[T | None, ...]]:
+    """Combine several asynchronous sequences together, producing a tuple with
+    the lastest element of each sequence whenever a new element is received.
+
+    The value to use when a sequence has not procuded any element yet is given
+    by the ``default`` argument (defaulting to ``None``).
+
+    The producing of partial results can be disabled by setting the optional
+    argument ``partial`` to ``False``.
+
+    All the sequences are iterated simultaneously and their elements
+    are forwarded as soon as they're available. The generation continues
+    until all the sequences are exhausted.
+    """
+    sources = source, *more_sources
+    n = len(sources)
+
+    # Custom getter
+    def getter(dct: dict[int, T]) -> Callable[[int], T | None]:
+        return lambda key: dct.get(key, default)
+
+    # Add source index to the items
+    def make_func(i: int) -> SmapCallable[T, dict[int, T]]:
+        def func(x: T, *_: object) -> dict[int, T]:
+            return {i: x}
+
+        return func
+
+    new_sources = [smap.raw(source, make_func(i)) for i, source in enumerate(sources)]
+
+    # Merge the sources
+    merged = merge.raw(*new_sources)
+
+    # Accumulate the current state in a dict
+    accumulated = aggregate.accumulate.raw(merged, lambda x, e: {**x, **e})
+
+    # Filter partial result
+    filtered = (
+        accumulated
+        if partial
+        else select.filter.raw(accumulated, lambda x: len(x) == n)
+    )
+
+    # Convert the state dict to a tuple
+    def dict_to_tuple(x: dict[int, T], *_: object) -> tuple[T | None, ...]:
+        return tuple(builtins.map(getter(x), range(n)))
+
+    return smap.raw(filtered, dict_to_tuple)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/create.py b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py
new file mode 100644
index 00000000..d7676b71
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py
@@ -0,0 +1,191 @@
+"""Non-pipable creation operators."""
+from __future__ import annotations
+
+import sys
+import asyncio
+import inspect
+import builtins
+import itertools
+
+from typing import (
+    AsyncIterable,
+    Awaitable,
+    Iterable,
+    Protocol,
+    TypeVar,
+    AsyncIterator,
+    cast,
+)
+from typing_extensions import ParamSpec
+
+from ..stream import time
+from ..core import operator, streamcontext
+
+__all__ = [
+    "iterate",
+    "preserve",
+    "just",
+    "call",
+    "throw",
+    "empty",
+    "never",
+    "repeat",
+    "range",
+    "count",
+]
+
+T = TypeVar("T")
+P = ParamSpec("P")
+
+# Hack for python 3.8 compatibility
+if sys.version_info < (3, 9):
+    P = TypeVar("P")
+
+# Convert regular iterables
+
+
+@operator
+async def from_iterable(it: Iterable[T]) -> AsyncIterator[T]:
+    """Generate values from a regular iterable."""
+    for item in it:
+        await asyncio.sleep(0)
+        yield item
+
+
+@operator
+def from_async_iterable(ait: AsyncIterable[T]) -> AsyncIterator[T]:
+    """Generate values from an asynchronous iterable.
+
+    Note: the corresponding iterator will be explicitely closed
+    when leaving the context manager."""
+    return streamcontext(ait)
+
+
+@operator
+def iterate(it: AsyncIterable[T] | Iterable[T]) -> AsyncIterator[T]:
+    """Generate values from a sychronous or asynchronous iterable."""
+    if isinstance(it, AsyncIterable):
+        return from_async_iterable.raw(it)
+    if isinstance(it, Iterable):
+        return from_iterable.raw(it)
+    raise TypeError(f"{type(it).__name__!r} object is not (async) iterable")
+
+
+@operator
+async def preserve(ait: AsyncIterable[T]) -> AsyncIterator[T]:
+    """Generate values from an asynchronous iterable without
+    explicitly closing the corresponding iterator."""
+    async for item in ait:
+        yield item
+
+
+# Simple operators
+
+
+@operator
+async def just(value: T) -> AsyncIterator[T]:
+    """Await if possible, and generate a single value."""
+    if inspect.isawaitable(value):
+        yield await value
+    else:
+        yield value
+
+
+Y = TypeVar("Y", covariant=True)
+
+
+class SyncCallable(Protocol[P, Y]):
+    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Y:
+        ...
+
+
+class AsyncCallable(Protocol[P, Y]):
+    def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[Y]:
+        ...
+
+
+@operator
+async def call(
+    func: SyncCallable[P, T] | AsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs
+) -> AsyncIterator[T]:
+    """Call the given function and generate a single value.
+
+    Await if the provided function is asynchronous.
+    """
+    if asyncio.iscoroutinefunction(func):
+        async_func = cast("AsyncCallable[P, T]", func)
+        yield await async_func(*args, **kwargs)
+    else:
+        sync_func = cast("SyncCallable[P, T]", func)
+        yield sync_func(*args, **kwargs)
+
+
+@operator
+async def throw(exc: Exception) -> AsyncIterator[None]:
+    """Throw an exception without generating any value."""
+    if False:
+        yield
+    raise exc
+
+
+@operator
+async def empty() -> AsyncIterator[None]:
+    """Terminate without generating any value."""
+    if False:
+        yield
+
+
+@operator
+async def never() -> AsyncIterator[None]:
+    """Hang forever without generating any value."""
+    if False:
+        yield
+    future: asyncio.Future[None] = asyncio.Future()
+    try:
+        await future
+    finally:
+        future.cancel()
+
+
+@operator
+def repeat(
+    value: T, times: int | None = None, *, interval: float = 0.0
+) -> AsyncIterator[T]:
+    """Generate the same value a given number of times.
+
+    If ``times`` is ``None``, the value is repeated indefinitely.
+    An optional interval can be given to space the values out.
+    """
+    args = () if times is None else (times,)
+    it = itertools.repeat(value, *args)
+    agen = from_iterable.raw(it)
+    return time.spaceout.raw(agen, interval) if interval else agen
+
+
+# Counting operators
+
+
+@operator
+def range(*args: int, interval: float = 0.0) -> AsyncIterator[int]:
+    """Generate a given range of numbers.
+
+    It supports the same arguments as the builtin function.
+    An optional interval can be given to space the values out.
+    """
+    agen = from_iterable.raw(builtins.range(*args))
+    return time.spaceout.raw(agen, interval) if interval else agen
+
+
+@operator
+def count(
+    start: int = 0, step: int = 1, *, interval: float = 0.0
+) -> AsyncIterator[int]:
+    """Generate consecutive numbers indefinitely.
+
+    Optional starting point and increment can be defined,
+    respectively defaulting to ``0`` and ``1``.
+
+    An optional interval can be given to space the values out.
+    """
+    agen = from_iterable.raw(itertools.count(start, step))
+    return time.spaceout.raw(agen, interval) if interval else agen
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
new file mode 100644
index 00000000..1be834e6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
@@ -0,0 +1,83 @@
+"""Extra operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+
+from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any
+
+from .combine import amap, smap
+from ..core import pipable_operator
+
+__all__ = ["action", "print"]
+
+
+T = TypeVar("T")
+
+
+@pipable_operator
+def action(
+    source: AsyncIterable[T],
+    func: Callable[[T], Awaitable[Any] | Any],
+    ordered: bool = True,
+    task_limit: int | None = None,
+) -> AsyncIterator[T]:
+    """Perform an action for each element of an asynchronous sequence
+    without modifying it.
+
+    The given function can be synchronous or asynchronous.
+
+    The results can either be returned in or out of order, depending on
+    the corresponding ``ordered`` argument. This argument is ignored if the
+    provided function is synchronous.
+
+    The coroutines run concurrently but their amount can be limited using
+    the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+    to run sequentially. This argument is ignored if the provided function
+    is synchronous.
+    """
+    if asyncio.iscoroutinefunction(func):
+
+        async def ainnerfunc(arg: T, *_: object) -> T:
+            awaitable = func(arg)
+            assert isinstance(awaitable, Awaitable)
+            await awaitable
+            return arg
+
+        return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit)
+
+    else:
+
+        def innerfunc(arg: T, *_: object) -> T:
+            func(arg)
+            return arg
+
+        return smap.raw(source, innerfunc)
+
+
+@pipable_operator
+def print(
+    source: AsyncIterable[T],
+    template: str = "{}",
+    sep: str = " ",
+    end: str = "\n",
+    file: Any | None = None,
+    flush: bool = False,
+) -> AsyncIterator[T]:
+    """Print each element of an asynchronous sequence without modifying it.
+
+    An optional template can be provided to be formatted with the elements.
+    All the keyword arguments are forwarded to the builtin function print.
+    """
+
+    def func(value: T) -> None:
+        string = template.format(value)
+        builtins.print(
+            string,
+            sep=sep,
+            end=end,
+            file=file,
+            flush=flush,
+        )
+
+    return action.raw(source, func)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/select.py b/.venv/lib/python3.12/site-packages/aiostream/stream/select.py
new file mode 100644
index 00000000..9390f464
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/select.py
@@ -0,0 +1,284 @@
+"""Selection operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+import collections
+
+from typing import Awaitable, Callable, TypeVar, AsyncIterable, AsyncIterator
+
+from . import transform
+from ..aiter_utils import aiter, anext
+from ..core import streamcontext, pipable_operator
+
+__all__ = [
+    "take",
+    "takelast",
+    "skip",
+    "skiplast",
+    "getitem",
+    "filter",
+    "until",
+    "dropwhile",
+    "takewhile",
+]
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def take(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+    """Forward the first ``n`` elements from an asynchronous sequence.
+
+    If ``n`` is negative, it simply terminates before iterating the source.
+    """
+    enumerated = transform.enumerate.raw(source)
+    async with streamcontext(enumerated) as streamer:
+        if n <= 0:
+            return
+        async for i, item in streamer:
+            yield item
+            if i >= n - 1:
+                return
+
+
+@pipable_operator
+async def takelast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+    """Forward the last ``n`` elements from an asynchronous sequence.
+
+    If ``n`` is negative, it simply terminates after iterating the source.
+
+    Note: it is required to reach the end of the source before the first
+    element is generated.
+    """
+    queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            queue.append(item)
+        for item in queue:
+            yield item
+
+
+@pipable_operator
+async def skip(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+    """Forward an asynchronous sequence, skipping the first ``n`` elements.
+
+    If ``n`` is negative, no elements are skipped.
+    """
+    enumerated = transform.enumerate.raw(source)
+    async with streamcontext(enumerated) as streamer:
+        async for i, item in streamer:
+            if i >= n:
+                yield item
+
+
+@pipable_operator
+async def skiplast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+    """Forward an asynchronous sequence, skipping the last ``n`` elements.
+
+    If ``n`` is negative, no elements are skipped.
+
+    Note: it is required to reach the ``n+1`` th element of the source
+    before the first element is generated.
+    """
+    queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            if n <= 0:
+                yield item
+                continue
+            if len(queue) == n:
+                yield queue[0]
+            queue.append(item)
+
+
+@pipable_operator
+async def filterindex(
+    source: AsyncIterable[T], func: Callable[[int], bool]
+) -> AsyncIterator[T]:
+    """Filter an asynchronous sequence using the index of the elements.
+
+    The given function is synchronous, takes the index as an argument,
+    and returns ``True`` if the corresponding should be forwarded,
+    ``False`` otherwise.
+    """
+    enumerated = transform.enumerate.raw(source)
+    async with streamcontext(enumerated) as streamer:
+        async for i, item in streamer:
+            if func(i):
+                yield item
+
+
+@pipable_operator
+def slice(source: AsyncIterable[T], *args: int) -> AsyncIterator[T]:
+    """Slice an asynchronous sequence.
+
+    The arguments are the same as the builtin type slice.
+
+    There are two limitations compare to regular slices:
+    - Positive stop index with negative start index is not supported
+    - Negative step is not supported
+    """
+    s = builtins.slice(*args)
+    start, stop, step = s.start or 0, s.stop, s.step or 1
+    aiterator = aiter(source)
+    # Filter the first items
+    if start < 0:
+        aiterator = takelast.raw(aiterator, abs(start))
+    elif start > 0:
+        aiterator = skip.raw(aiterator, start)
+    # Filter the last items
+    if stop is not None:
+        if stop >= 0 and start < 0:
+            raise ValueError("Positive stop with negative start is not supported")
+        elif stop >= 0:
+            aiterator = take.raw(aiterator, stop - start)
+        else:
+            aiterator = skiplast.raw(aiterator, abs(stop))
+    # Filter step items
+    if step is not None:
+        if step > 1:
+            aiterator = filterindex.raw(aiterator, lambda i: i % step == 0)
+        elif step < 0:
+            raise ValueError("Negative step not supported")
+    # Return
+    return aiterator
+
+
+@pipable_operator
+async def item(source: AsyncIterable[T], index: int) -> AsyncIterator[T]:
+    """Forward the ``n``th element of an asynchronous sequence.
+
+    The index can be negative and works like regular indexing.
+    If the index is out of range, and ``IndexError`` is raised.
+    """
+    # Prepare
+    if index >= 0:
+        source = skip.raw(source, index)
+    else:
+        source = takelast(source, abs(index))
+    async with streamcontext(source) as streamer:
+        # Get first item
+        try:
+            result = await anext(streamer)
+        except StopAsyncIteration:
+            raise IndexError("Index out of range")
+        # Check length
+        if index < 0:
+            count = 1
+            async for _ in streamer:
+                count += 1
+            if count != abs(index):
+                raise IndexError("Index out of range")
+        # Yield result
+        yield result
+
+
+@pipable_operator
+def getitem(source: AsyncIterable[T], index: int | builtins.slice) -> AsyncIterator[T]:
+    """Forward one or several items from an asynchronous sequence.
+
+    The argument can either be a slice or an integer.
+    See the slice and item operators for more information.
+    """
+    if isinstance(index, builtins.slice):
+        return slice.raw(source, index.start, index.stop, index.step)
+    if isinstance(index, int):
+        return item.raw(source, index)
+    raise TypeError("Not a valid index (int or slice)")
+
+
+@pipable_operator
+async def filter(
+    source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+    """Filter an asynchronous sequence using an arbitrary function.
+
+    The function takes the item as an argument and returns ``True``
+    if it should be forwarded, ``False`` otherwise.
+    The function can either be synchronous or asynchronous.
+    """
+    iscorofunc = asyncio.iscoroutinefunction(func)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            result = func(item)
+            if iscorofunc:
+                assert isinstance(result, Awaitable)
+                result = await result
+            if result:
+                yield item
+
+
+@pipable_operator
+async def until(
+    source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+    """Forward an asynchronous sequence until a condition is met.
+
+    Contrary to the ``takewhile`` operator, the last tested element is included
+    in the sequence.
+
+    The given function takes the item as an argument and returns a boolean
+    corresponding to the condition to meet. The function can either be
+    synchronous or asynchronous.
+    """
+    iscorofunc = asyncio.iscoroutinefunction(func)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            result = func(item)
+            if iscorofunc:
+                assert isinstance(result, Awaitable)
+                result = await result
+            yield item
+            if result:
+                return
+
+
+@pipable_operator
+async def takewhile(
+    source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+    """Forward an asynchronous sequence while a condition is met.
+
+    Contrary to the ``until`` operator, the last tested element is not included
+    in the sequence.
+
+    The given function takes the item as an argument and returns a boolean
+    corresponding to the condition to meet. The function can either be
+    synchronous or asynchronous.
+    """
+    iscorofunc = asyncio.iscoroutinefunction(func)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            result = func(item)
+            if iscorofunc:
+                assert isinstance(result, Awaitable)
+                result = await result
+            if not result:
+                return
+            yield item
+
+
+@pipable_operator
+async def dropwhile(
+    source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+    """Discard the elements from an asynchronous sequence
+    while a condition is met.
+
+    The given function takes the item as an argument and returns a boolean
+    corresponding to the condition to meet. The function can either be
+    synchronous or asynchronous.
+    """
+    iscorofunc = asyncio.iscoroutinefunction(func)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            result = func(item)
+            if iscorofunc:
+                assert isinstance(result, Awaitable)
+                result = await result
+            if not result:
+                yield item
+                break
+        async for item in streamer:
+            yield item
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/time.py b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
new file mode 100644
index 00000000..c9c0df88
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
@@ -0,0 +1,56 @@
+"""Time-specific operators."""
+from __future__ import annotations
+import asyncio
+
+from ..aiter_utils import anext
+from ..core import streamcontext, pipable_operator
+
+from typing import TypeVar, AsyncIterable, AsyncIterator
+
+__all__ = ["spaceout", "delay", "timeout"]
+
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]:
+    """Make sure the elements of an asynchronous sequence are separated
+    in time by the given interval.
+    """
+    timeout = 0.0
+    loop = asyncio.get_event_loop()
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            delta = timeout - loop.time()
+            delay = delta if delta > 0 else 0.0
+            await asyncio.sleep(delay)
+            yield item
+            timeout = loop.time() + interval
+
+
+@pipable_operator
+async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]:
+    """Raise a time-out if an element of the asynchronous sequence
+    takes too long to arrive.
+
+    Note: the timeout is not global but specific to each step of
+    the iteration.
+    """
+    async with streamcontext(source) as streamer:
+        while True:
+            try:
+                item = await asyncio.wait_for(anext(streamer), timeout)
+            except StopAsyncIteration:
+                break
+            else:
+                yield item
+
+
+@pipable_operator
+async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]:
+    """Delay the iteration of an asynchronous sequence."""
+    await asyncio.sleep(delay)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            yield item
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py b/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py
new file mode 100644
index 00000000..f11bffa6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py
@@ -0,0 +1,128 @@
+"""Transformation operators."""
+
+from __future__ import annotations
+
+import asyncio
+import itertools
+from typing import (
+    Protocol,
+    TypeVar,
+    AsyncIterable,
+    AsyncIterator,
+    Awaitable,
+    cast,
+)
+
+from ..core import streamcontext, pipable_operator
+
+from . import select
+from . import create
+from . import aggregate
+from .combine import map, amap, smap
+
+__all__ = ["map", "enumerate", "starmap", "cycle", "chunks"]
+
+# map, amap and smap are also transform operators
+map, amap, smap
+
+T = TypeVar("T")
+U = TypeVar("U")
+
+
+@pipable_operator
+async def enumerate(
+    source: AsyncIterable[T], start: int = 0, step: int = 1
+) -> AsyncIterator[tuple[int, T]]:
+    """Generate ``(index, value)`` tuples from an asynchronous sequence.
+
+    This index is computed using a starting point and an increment,
+    respectively defaulting to ``0`` and ``1``.
+    """
+    count = itertools.count(start, step)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            yield next(count), item
+
+
+X = TypeVar("X", contravariant=True)
+Y = TypeVar("Y", covariant=True)
+
+
+class AsyncStarmapCallable(Protocol[X, Y]):
+    def __call__(self, arg: X, /, *args: X) -> Awaitable[Y]:
+        ...
+
+
+class SyncStarmapCallable(Protocol[X, Y]):
+    def __call__(self, arg: X, /, *args: X) -> Y:
+        ...
+
+
+@pipable_operator
+def starmap(
+    source: AsyncIterable[tuple[T, ...]],
+    func: SyncStarmapCallable[T, U] | AsyncStarmapCallable[T, U],
+    ordered: bool = True,
+    task_limit: int | None = None,
+) -> AsyncIterator[U]:
+    """Apply a given function to the unpacked elements of
+    an asynchronous sequence.
+
+    Each element is unpacked before applying the function.
+    The given function can either be synchronous or asynchronous.
+
+    The results can either be returned in or out of order, depending on
+    the corresponding ``ordered`` argument. This argument is ignored if
+    the provided function is synchronous.
+
+    The coroutines run concurrently but their amount can be limited using
+    the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+    to run sequentially. This argument is ignored if the provided function
+    is synchronous.
+    """
+    if asyncio.iscoroutinefunction(func):
+        async_func = cast("AsyncStarmapCallable[T, U]", func)
+
+        async def astarfunc(args: tuple[T, ...], *_: object) -> U:
+            awaitable = async_func(*args)
+            return await awaitable
+
+        return amap.raw(source, astarfunc, ordered=ordered, task_limit=task_limit)
+
+    else:
+        sync_func = cast("SyncStarmapCallable[T, U]", func)
+
+        def starfunc(args: tuple[T, ...], *_: object) -> U:
+            return sync_func(*args)
+
+        return smap.raw(source, starfunc)
+
+
+@pipable_operator
+async def cycle(source: AsyncIterable[T]) -> AsyncIterator[T]:
+    """Iterate indefinitely over an asynchronous sequence.
+
+    Note: it does not perform any buffering, but re-iterate over
+    the same given sequence instead. If the sequence is not
+    re-iterable, the generator might end up looping indefinitely
+    without yielding any item.
+    """
+    while True:
+        async with streamcontext(source) as streamer:
+            async for item in streamer:
+                yield item
+            # Prevent blocking while loop if the stream is empty
+            await asyncio.sleep(0)
+
+
+@pipable_operator
+async def chunks(source: AsyncIterable[T], n: int) -> AsyncIterator[list[T]]:
+    """Generate chunks of size ``n`` from an asynchronous sequence.
+
+    The chunks are lists, and the last chunk might contain less than ``n``
+    elements.
+    """
+    async with streamcontext(source) as streamer:
+        async for first in streamer:
+            xs = select.take(create.preserve(streamer), n - 1)
+            yield [first] + await aggregate.list(xs)