aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/aiostream/stream
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiostream/stream')
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py10
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py222
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py90
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/combine.py282
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/create.py191
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/misc.py83
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/select.py284
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/time.py56
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/transform.py128
9 files changed, 1346 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py b/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py
new file mode 100644
index 00000000..cd20d6c9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py
@@ -0,0 +1,10 @@
+"""Gather all the stream operators."""
+
+from .create import *
+from .transform import *
+from .select import *
+from .combine import *
+from .aggregate import *
+from .time import *
+from .misc import *
+from .advanced import *
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py b/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py
new file mode 100644
index 00000000..106f0f1d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py
@@ -0,0 +1,222 @@
+"""Advanced operators (to deal with streams of higher order) ."""
+from __future__ import annotations
+
+from typing import AsyncIterator, AsyncIterable, TypeVar, Union, cast
+from typing_extensions import ParamSpec
+
+from . import combine
+
+from ..core import Streamer, pipable_operator
+from ..manager import StreamerManager
+
+
+__all__ = ["concat", "flatten", "switch", "concatmap", "flatmap", "switchmap"]
+
+
+T = TypeVar("T")
+U = TypeVar("U")
+P = ParamSpec("P")
+
+
+# Helper to manage stream of higher order
+
+
+@pipable_operator
+async def base_combine(
+ source: AsyncIterable[AsyncIterable[T]],
+ switch: bool = False,
+ ordered: bool = False,
+ task_limit: int | None = None,
+) -> AsyncIterator[T]:
+ """Base operator for managing an asynchronous sequence of sequences.
+
+ The sequences are awaited concurrently, although it's possible to limit
+ the amount of running sequences using the `task_limit` argument.
+
+ The ``switch`` argument enables the switch mecanism, which cause the
+ previous subsequence to be discarded when a new one is created.
+
+ The items can either be generated in order or as soon as they're received,
+ depending on the ``ordered`` argument.
+ """
+
+ # Task limit
+ if task_limit is not None and not task_limit > 0:
+ raise ValueError("The task limit must be None or greater than 0")
+
+ # Safe context
+ async with StreamerManager[Union[AsyncIterable[T], T]]() as manager:
+ main_streamer: Streamer[
+ AsyncIterable[T] | T
+ ] | None = await manager.enter_and_create_task(source)
+
+ # Loop over events
+ while manager.tasks:
+ # Extract streamer groups
+ substreamers = manager.streamers[1:]
+ mainstreamers = [main_streamer] if main_streamer in manager.tasks else []
+
+ # Switch - use the main streamer then the substreamer
+ if switch:
+ filters = mainstreamers + substreamers
+ # Concat - use the first substreamer then the main streamer
+ elif ordered:
+ filters = substreamers[:1] + mainstreamers
+ # Flat - use the substreamers then the main streamer
+ else:
+ filters = substreamers + mainstreamers
+
+ # Wait for next event
+ streamer, task = await manager.wait_single_event(filters)
+
+ # Get result
+ try:
+ result = task.result()
+
+ # End of stream
+ except StopAsyncIteration:
+ # Main streamer is finished
+ if streamer is main_streamer:
+ main_streamer = None
+
+ # A substreamer is finished
+ else:
+ await manager.clean_streamer(streamer)
+
+ # Re-schedule the main streamer if necessary
+ if main_streamer is not None and main_streamer not in manager.tasks:
+ manager.create_task(main_streamer)
+
+ # Process result
+ else:
+ # Switch mecanism
+ if switch and streamer is main_streamer:
+ await manager.clean_streamers(substreamers)
+
+ # Setup a new source
+ if streamer is main_streamer:
+ assert isinstance(result, AsyncIterable)
+ await manager.enter_and_create_task(result)
+
+ # Re-schedule the main streamer if task limit allows it
+ if task_limit is None or task_limit > len(manager.tasks):
+ manager.create_task(streamer)
+
+ # Yield the result
+ else:
+ item = cast("T", result)
+ yield item
+
+ # Re-schedule the streamer
+ manager.create_task(streamer)
+
+
+# Advanced operators (for streams of higher order)
+
+
+@pipable_operator
+def concat(
+ source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None
+) -> AsyncIterator[T]:
+ """Given an asynchronous sequence of sequences, generate the elements
+ of the sequences in order.
+
+ The sequences are awaited concurrently, although it's possible to limit
+ the amount of running sequences using the `task_limit` argument.
+
+ Errors raised in the source or an element sequence are propagated.
+ """
+ return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=True)
+
+
+@pipable_operator
+def flatten(
+ source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None
+) -> AsyncIterator[T]:
+ """Given an asynchronous sequence of sequences, generate the elements
+ of the sequences as soon as they're received.
+
+ The sequences are awaited concurrently, although it's possible to limit
+ the amount of running sequences using the `task_limit` argument.
+
+ Errors raised in the source or an element sequence are propagated.
+ """
+ return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=False)
+
+
+@pipable_operator
+def switch(source: AsyncIterable[AsyncIterable[T]]) -> AsyncIterator[T]:
+ """Given an asynchronous sequence of sequences, generate the elements of
+ the most recent sequence.
+
+ Element sequences are generated eagerly, and closed once they are
+ superseded by a more recent sequence. Once the main sequence is finished,
+ the last subsequence will be exhausted completely.
+
+ Errors raised in the source or an element sequence (that was not already
+ closed) are propagated.
+ """
+ return base_combine.raw(source, switch=True)
+
+
+# Advanced *-map operators
+
+
+@pipable_operator
+def concatmap(
+ source: AsyncIterable[T],
+ func: combine.SmapCallable[T, AsyncIterable[U]],
+ *more_sources: AsyncIterable[T],
+ task_limit: int | None = None,
+) -> AsyncIterator[U]:
+ """Apply a given function that creates a sequence from the elements of one
+ or several asynchronous sequences, and generate the elements of the created
+ sequences in order.
+
+ The function is applied as described in `map`, and must return an
+ asynchronous sequence. The returned sequences are awaited concurrently,
+ although it's possible to limit the amount of running sequences using
+ the `task_limit` argument.
+ """
+ mapped = combine.smap.raw(source, func, *more_sources)
+ return concat.raw(mapped, task_limit=task_limit)
+
+
+@pipable_operator
+def flatmap(
+ source: AsyncIterable[T],
+ func: combine.SmapCallable[T, AsyncIterable[U]],
+ *more_sources: AsyncIterable[T],
+ task_limit: int | None = None,
+) -> AsyncIterator[U]:
+ """Apply a given function that creates a sequence from the elements of one
+ or several asynchronous sequences, and generate the elements of the created
+ sequences as soon as they arrive.
+
+ The function is applied as described in `map`, and must return an
+ asynchronous sequence. The returned sequences are awaited concurrently,
+ although it's possible to limit the amount of running sequences using
+ the `task_limit` argument.
+
+ Errors raised in a source or output sequence are propagated.
+ """
+ mapped = combine.smap.raw(source, func, *more_sources)
+ return flatten.raw(mapped, task_limit=task_limit)
+
+
+@pipable_operator
+def switchmap(
+ source: AsyncIterable[T],
+ func: combine.SmapCallable[T, AsyncIterable[U]],
+ *more_sources: AsyncIterable[T],
+) -> AsyncIterator[U]:
+ """Apply a given function that creates a sequence from the elements of one
+ or several asynchronous sequences and generate the elements of the most
+ recently created sequence.
+
+ The function is applied as described in `map`, and must return an
+ asynchronous sequence. Errors raised in a source or output sequence (that
+ was not already closed) are propagated.
+ """
+ mapped = combine.smap.raw(source, func, *more_sources)
+ return switch.raw(mapped)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py b/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py
new file mode 100644
index 00000000..8ed7c0e1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py
@@ -0,0 +1,90 @@
+"""Aggregation operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+import operator as op
+from typing import AsyncIterator, Awaitable, Callable, TypeVar, AsyncIterable, cast
+
+
+from . import select
+from ..aiter_utils import anext
+from ..core import pipable_operator, streamcontext
+
+__all__ = ["accumulate", "reduce", "list"]
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def accumulate(
+ source: AsyncIterable[T],
+ func: Callable[[T, T], Awaitable[T] | T] = op.add,
+ initializer: T | None = None,
+) -> AsyncIterator[T]:
+ """Generate a series of accumulated sums (or other binary function)
+ from an asynchronous sequence.
+
+ If ``initializer`` is present, it is placed before the items
+ of the sequence in the calculation, and serves as a default
+ when the sequence is empty.
+ """
+ iscorofunc = asyncio.iscoroutinefunction(func)
+ async with streamcontext(source) as streamer:
+ # Initialize
+ if initializer is None:
+ try:
+ value = await anext(streamer)
+ except StopAsyncIteration:
+ return
+ else:
+ value = initializer
+ # First value
+ yield value
+ # Iterate streamer
+ async for item in streamer:
+ returned = func(value, item)
+ if iscorofunc:
+ awaitable_value = cast("Awaitable[T]", returned)
+ value = await awaitable_value
+ else:
+ value = cast("T", returned)
+ yield value
+
+
+@pipable_operator
+def reduce(
+ source: AsyncIterable[T],
+ func: Callable[[T, T], Awaitable[T] | T],
+ initializer: T | None = None,
+) -> AsyncIterator[T]:
+ """Apply a function of two arguments cumulatively to the items
+ of an asynchronous sequence, reducing the sequence to a single value.
+
+ If ``initializer`` is present, it is placed before the items
+ of the sequence in the calculation, and serves as a default when the
+ sequence is empty.
+ """
+ acc = accumulate.raw(source, func, initializer)
+ return select.item.raw(acc, -1)
+
+
+@pipable_operator
+async def list(source: AsyncIterable[T]) -> AsyncIterator[builtins.list[T]]:
+ """Build a list from an asynchronous sequence.
+
+ All the intermediate steps are generated, starting from the empty list.
+
+ This operator can be used to easily convert a stream into a list::
+
+ lst = await stream.list(x)
+
+ ..note:: The same list object is produced at each step in order to avoid
+ memory copies.
+ """
+ result: builtins.list[T] = []
+ yield result
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ result.append(item)
+ yield result
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py b/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py
new file mode 100644
index 00000000..a782a730
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py
@@ -0,0 +1,282 @@
+"""Combination operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+
+from typing import (
+ Awaitable,
+ Protocol,
+ TypeVar,
+ AsyncIterable,
+ AsyncIterator,
+ Callable,
+ cast,
+)
+from typing_extensions import ParamSpec
+
+from ..aiter_utils import AsyncExitStack, anext
+from ..core import streamcontext, pipable_operator
+
+from . import create
+from . import select
+from . import advanced
+from . import aggregate
+
+__all__ = ["chain", "zip", "map", "merge", "ziplatest", "amap", "smap"]
+
+T = TypeVar("T")
+U = TypeVar("U")
+K = TypeVar("K")
+P = ParamSpec("P")
+
+
+@pipable_operator
+async def chain(
+ source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[T]:
+ """Chain asynchronous sequences together, in the order they are given.
+
+ Note: the sequences are not iterated until it is required,
+ so if the operation is interrupted, the remaining sequences
+ will be left untouched.
+ """
+ sources = source, *more_sources
+ for source in sources:
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield item
+
+
+@pipable_operator
+async def zip(
+ source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[tuple[T, ...]]:
+ """Combine and forward the elements of several asynchronous sequences.
+
+ Each generated value is a tuple of elements, using the same order as
+ their respective sources. The generation continues until the shortest
+ sequence is exhausted.
+
+ Note: the different sequences are awaited in parrallel, so that their
+ waiting times don't add up.
+ """
+ sources = source, *more_sources
+
+ # One sources
+ if len(sources) == 1:
+ (source,) = sources
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield (item,)
+ return
+
+ # N sources
+ async with AsyncExitStack() as stack:
+ # Handle resources
+ streamers = [
+ await stack.enter_async_context(streamcontext(source)) for source in sources
+ ]
+ # Loop over items
+ while True:
+ try:
+ coros = builtins.map(anext, streamers)
+ items = await asyncio.gather(*coros)
+ except StopAsyncIteration:
+ break
+ else:
+ yield tuple(items)
+
+
+X = TypeVar("X", contravariant=True)
+Y = TypeVar("Y", covariant=True)
+
+
+class SmapCallable(Protocol[X, Y]):
+ def __call__(self, arg: X, /, *args: X) -> Y:
+ ...
+
+
+class AmapCallable(Protocol[X, Y]):
+ async def __call__(self, arg: X, /, *args: X) -> Y:
+ ...
+
+
+class MapCallable(Protocol[X, Y]):
+ def __call__(self, arg: X, /, *args: X) -> Awaitable[Y] | Y:
+ ...
+
+
+@pipable_operator
+async def smap(
+ source: AsyncIterable[T],
+ func: SmapCallable[T, U],
+ *more_sources: AsyncIterable[T],
+) -> AsyncIterator[U]:
+ """Apply a given function to the elements of one or several
+ asynchronous sequences.
+
+ Each element is used as a positional argument, using the same order as
+ their respective sources. The generation continues until the shortest
+ sequence is exhausted. The function is treated synchronously.
+
+ Note: if more than one sequence is provided, they're awaited concurrently
+ so that their waiting times don't add up.
+ """
+ stream = zip(source, *more_sources)
+ async with streamcontext(stream) as streamer:
+ async for item in streamer:
+ yield func(*item)
+
+
+@pipable_operator
+def amap(
+ source: AsyncIterable[T],
+ corofn: AmapCallable[T, U],
+ *more_sources: AsyncIterable[T],
+ ordered: bool = True,
+ task_limit: int | None = None,
+) -> AsyncIterator[U]:
+ """Apply a given coroutine function to the elements of one or several
+ asynchronous sequences.
+
+ Each element is used as a positional argument, using the same order as
+ their respective sources. The generation continues until the shortest
+ sequence is exhausted.
+
+ The results can either be returned in or out of order, depending on
+ the corresponding ``ordered`` argument.
+
+ The coroutines run concurrently but their amount can be limited using
+ the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+ to run sequentially.
+
+ If more than one sequence is provided, they're also awaited concurrently,
+ so that their waiting times don't add up.
+ """
+
+ async def func(arg: T, *args: T) -> AsyncIterable[U]:
+ yield await corofn(arg, *args)
+
+ if ordered:
+ return advanced.concatmap.raw(
+ source, func, *more_sources, task_limit=task_limit
+ )
+ return advanced.flatmap.raw(source, func, *more_sources, task_limit=task_limit)
+
+
+@pipable_operator
+def map(
+ source: AsyncIterable[T],
+ func: MapCallable[T, U],
+ *more_sources: AsyncIterable[T],
+ ordered: bool = True,
+ task_limit: int | None = None,
+) -> AsyncIterator[U]:
+ """Apply a given function to the elements of one or several
+ asynchronous sequences.
+
+ Each element is used as a positional argument, using the same order as
+ their respective sources. The generation continues until the shortest
+ sequence is exhausted. The function can either be synchronous or
+ asynchronous (coroutine function).
+
+ The results can either be returned in or out of order, depending on
+ the corresponding ``ordered`` argument. This argument is ignored if the
+ provided function is synchronous.
+
+ The coroutines run concurrently but their amount can be limited using
+ the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+ to run sequentially. This argument is ignored if the provided function
+ is synchronous.
+
+ If more than one sequence is provided, they're also awaited concurrently,
+ so that their waiting times don't add up.
+
+ It might happen that the provided function returns a coroutine but is not
+ a coroutine function per se. In this case, one can wrap the function with
+ ``aiostream.async_`` in order to force ``map`` to await the resulting
+ coroutine. The following example illustrates the use ``async_`` with a
+ lambda function::
+
+ from aiostream import stream, async_
+ ...
+ ys = stream.map(xs, async_(lambda ms: asyncio.sleep(ms / 1000)))
+ """
+ if asyncio.iscoroutinefunction(func):
+ return amap.raw(
+ source, func, *more_sources, ordered=ordered, task_limit=task_limit
+ )
+ sync_func = cast("SmapCallable[T, U]", func)
+ return smap.raw(source, sync_func, *more_sources)
+
+
+@pipable_operator
+def merge(
+ source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[T]:
+ """Merge several asynchronous sequences together.
+
+ All the sequences are iterated simultaneously and their elements
+ are forwarded as soon as they're available. The generation continues
+ until all the sequences are exhausted.
+ """
+ sources = [source, *more_sources]
+ source_stream: AsyncIterable[AsyncIterable[T]] = create.iterate.raw(sources)
+ return advanced.flatten.raw(source_stream)
+
+
+@pipable_operator
+def ziplatest(
+ source: AsyncIterable[T],
+ *more_sources: AsyncIterable[T],
+ partial: bool = True,
+ default: T | None = None,
+) -> AsyncIterator[tuple[T | None, ...]]:
+ """Combine several asynchronous sequences together, producing a tuple with
+ the lastest element of each sequence whenever a new element is received.
+
+ The value to use when a sequence has not procuded any element yet is given
+ by the ``default`` argument (defaulting to ``None``).
+
+ The producing of partial results can be disabled by setting the optional
+ argument ``partial`` to ``False``.
+
+ All the sequences are iterated simultaneously and their elements
+ are forwarded as soon as they're available. The generation continues
+ until all the sequences are exhausted.
+ """
+ sources = source, *more_sources
+ n = len(sources)
+
+ # Custom getter
+ def getter(dct: dict[int, T]) -> Callable[[int], T | None]:
+ return lambda key: dct.get(key, default)
+
+ # Add source index to the items
+ def make_func(i: int) -> SmapCallable[T, dict[int, T]]:
+ def func(x: T, *_: object) -> dict[int, T]:
+ return {i: x}
+
+ return func
+
+ new_sources = [smap.raw(source, make_func(i)) for i, source in enumerate(sources)]
+
+ # Merge the sources
+ merged = merge.raw(*new_sources)
+
+ # Accumulate the current state in a dict
+ accumulated = aggregate.accumulate.raw(merged, lambda x, e: {**x, **e})
+
+ # Filter partial result
+ filtered = (
+ accumulated
+ if partial
+ else select.filter.raw(accumulated, lambda x: len(x) == n)
+ )
+
+ # Convert the state dict to a tuple
+ def dict_to_tuple(x: dict[int, T], *_: object) -> tuple[T | None, ...]:
+ return tuple(builtins.map(getter(x), range(n)))
+
+ return smap.raw(filtered, dict_to_tuple)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/create.py b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py
new file mode 100644
index 00000000..d7676b71
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py
@@ -0,0 +1,191 @@
+"""Non-pipable creation operators."""
+from __future__ import annotations
+
+import sys
+import asyncio
+import inspect
+import builtins
+import itertools
+
+from typing import (
+ AsyncIterable,
+ Awaitable,
+ Iterable,
+ Protocol,
+ TypeVar,
+ AsyncIterator,
+ cast,
+)
+from typing_extensions import ParamSpec
+
+from ..stream import time
+from ..core import operator, streamcontext
+
+__all__ = [
+ "iterate",
+ "preserve",
+ "just",
+ "call",
+ "throw",
+ "empty",
+ "never",
+ "repeat",
+ "range",
+ "count",
+]
+
+T = TypeVar("T")
+P = ParamSpec("P")
+
+# Hack for python 3.8 compatibility
+if sys.version_info < (3, 9):
+ P = TypeVar("P")
+
+# Convert regular iterables
+
+
+@operator
+async def from_iterable(it: Iterable[T]) -> AsyncIterator[T]:
+ """Generate values from a regular iterable."""
+ for item in it:
+ await asyncio.sleep(0)
+ yield item
+
+
+@operator
+def from_async_iterable(ait: AsyncIterable[T]) -> AsyncIterator[T]:
+ """Generate values from an asynchronous iterable.
+
+ Note: the corresponding iterator will be explicitely closed
+ when leaving the context manager."""
+ return streamcontext(ait)
+
+
+@operator
+def iterate(it: AsyncIterable[T] | Iterable[T]) -> AsyncIterator[T]:
+ """Generate values from a sychronous or asynchronous iterable."""
+ if isinstance(it, AsyncIterable):
+ return from_async_iterable.raw(it)
+ if isinstance(it, Iterable):
+ return from_iterable.raw(it)
+ raise TypeError(f"{type(it).__name__!r} object is not (async) iterable")
+
+
+@operator
+async def preserve(ait: AsyncIterable[T]) -> AsyncIterator[T]:
+ """Generate values from an asynchronous iterable without
+ explicitly closing the corresponding iterator."""
+ async for item in ait:
+ yield item
+
+
+# Simple operators
+
+
+@operator
+async def just(value: T) -> AsyncIterator[T]:
+ """Await if possible, and generate a single value."""
+ if inspect.isawaitable(value):
+ yield await value
+ else:
+ yield value
+
+
+Y = TypeVar("Y", covariant=True)
+
+
+class SyncCallable(Protocol[P, Y]):
+ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Y:
+ ...
+
+
+class AsyncCallable(Protocol[P, Y]):
+ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[Y]:
+ ...
+
+
+@operator
+async def call(
+ func: SyncCallable[P, T] | AsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs
+) -> AsyncIterator[T]:
+ """Call the given function and generate a single value.
+
+ Await if the provided function is asynchronous.
+ """
+ if asyncio.iscoroutinefunction(func):
+ async_func = cast("AsyncCallable[P, T]", func)
+ yield await async_func(*args, **kwargs)
+ else:
+ sync_func = cast("SyncCallable[P, T]", func)
+ yield sync_func(*args, **kwargs)
+
+
+@operator
+async def throw(exc: Exception) -> AsyncIterator[None]:
+ """Throw an exception without generating any value."""
+ if False:
+ yield
+ raise exc
+
+
+@operator
+async def empty() -> AsyncIterator[None]:
+ """Terminate without generating any value."""
+ if False:
+ yield
+
+
+@operator
+async def never() -> AsyncIterator[None]:
+ """Hang forever without generating any value."""
+ if False:
+ yield
+ future: asyncio.Future[None] = asyncio.Future()
+ try:
+ await future
+ finally:
+ future.cancel()
+
+
+@operator
+def repeat(
+ value: T, times: int | None = None, *, interval: float = 0.0
+) -> AsyncIterator[T]:
+ """Generate the same value a given number of times.
+
+ If ``times`` is ``None``, the value is repeated indefinitely.
+ An optional interval can be given to space the values out.
+ """
+ args = () if times is None else (times,)
+ it = itertools.repeat(value, *args)
+ agen = from_iterable.raw(it)
+ return time.spaceout.raw(agen, interval) if interval else agen
+
+
+# Counting operators
+
+
+@operator
+def range(*args: int, interval: float = 0.0) -> AsyncIterator[int]:
+ """Generate a given range of numbers.
+
+ It supports the same arguments as the builtin function.
+ An optional interval can be given to space the values out.
+ """
+ agen = from_iterable.raw(builtins.range(*args))
+ return time.spaceout.raw(agen, interval) if interval else agen
+
+
+@operator
+def count(
+ start: int = 0, step: int = 1, *, interval: float = 0.0
+) -> AsyncIterator[int]:
+ """Generate consecutive numbers indefinitely.
+
+ Optional starting point and increment can be defined,
+ respectively defaulting to ``0`` and ``1``.
+
+ An optional interval can be given to space the values out.
+ """
+ agen = from_iterable.raw(itertools.count(start, step))
+ return time.spaceout.raw(agen, interval) if interval else agen
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
new file mode 100644
index 00000000..1be834e6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
@@ -0,0 +1,83 @@
+"""Extra operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+
+from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any
+
+from .combine import amap, smap
+from ..core import pipable_operator
+
+__all__ = ["action", "print"]
+
+
+T = TypeVar("T")
+
+
+@pipable_operator
+def action(
+ source: AsyncIterable[T],
+ func: Callable[[T], Awaitable[Any] | Any],
+ ordered: bool = True,
+ task_limit: int | None = None,
+) -> AsyncIterator[T]:
+ """Perform an action for each element of an asynchronous sequence
+ without modifying it.
+
+ The given function can be synchronous or asynchronous.
+
+ The results can either be returned in or out of order, depending on
+ the corresponding ``ordered`` argument. This argument is ignored if the
+ provided function is synchronous.
+
+ The coroutines run concurrently but their amount can be limited using
+ the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+ to run sequentially. This argument is ignored if the provided function
+ is synchronous.
+ """
+ if asyncio.iscoroutinefunction(func):
+
+ async def ainnerfunc(arg: T, *_: object) -> T:
+ awaitable = func(arg)
+ assert isinstance(awaitable, Awaitable)
+ await awaitable
+ return arg
+
+ return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit)
+
+ else:
+
+ def innerfunc(arg: T, *_: object) -> T:
+ func(arg)
+ return arg
+
+ return smap.raw(source, innerfunc)
+
+
+@pipable_operator
+def print(
+ source: AsyncIterable[T],
+ template: str = "{}",
+ sep: str = " ",
+ end: str = "\n",
+ file: Any | None = None,
+ flush: bool = False,
+) -> AsyncIterator[T]:
+ """Print each element of an asynchronous sequence without modifying it.
+
+ An optional template can be provided to be formatted with the elements.
+ All the keyword arguments are forwarded to the builtin function print.
+ """
+
+ def func(value: T) -> None:
+ string = template.format(value)
+ builtins.print(
+ string,
+ sep=sep,
+ end=end,
+ file=file,
+ flush=flush,
+ )
+
+ return action.raw(source, func)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/select.py b/.venv/lib/python3.12/site-packages/aiostream/stream/select.py
new file mode 100644
index 00000000..9390f464
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/select.py
@@ -0,0 +1,284 @@
+"""Selection operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+import collections
+
+from typing import Awaitable, Callable, TypeVar, AsyncIterable, AsyncIterator
+
+from . import transform
+from ..aiter_utils import aiter, anext
+from ..core import streamcontext, pipable_operator
+
+__all__ = [
+ "take",
+ "takelast",
+ "skip",
+ "skiplast",
+ "getitem",
+ "filter",
+ "until",
+ "dropwhile",
+ "takewhile",
+]
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def take(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+ """Forward the first ``n`` elements from an asynchronous sequence.
+
+ If ``n`` is negative, it simply terminates before iterating the source.
+ """
+ enumerated = transform.enumerate.raw(source)
+ async with streamcontext(enumerated) as streamer:
+ if n <= 0:
+ return
+ async for i, item in streamer:
+ yield item
+ if i >= n - 1:
+ return
+
+
+@pipable_operator
+async def takelast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+ """Forward the last ``n`` elements from an asynchronous sequence.
+
+ If ``n`` is negative, it simply terminates after iterating the source.
+
+ Note: it is required to reach the end of the source before the first
+ element is generated.
+ """
+ queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ queue.append(item)
+ for item in queue:
+ yield item
+
+
+@pipable_operator
+async def skip(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+ """Forward an asynchronous sequence, skipping the first ``n`` elements.
+
+ If ``n`` is negative, no elements are skipped.
+ """
+ enumerated = transform.enumerate.raw(source)
+ async with streamcontext(enumerated) as streamer:
+ async for i, item in streamer:
+ if i >= n:
+ yield item
+
+
+@pipable_operator
+async def skiplast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+ """Forward an asynchronous sequence, skipping the last ``n`` elements.
+
+ If ``n`` is negative, no elements are skipped.
+
+ Note: it is required to reach the ``n+1`` th element of the source
+ before the first element is generated.
+ """
+ queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ if n <= 0:
+ yield item
+ continue
+ if len(queue) == n:
+ yield queue[0]
+ queue.append(item)
+
+
+@pipable_operator
+async def filterindex(
+ source: AsyncIterable[T], func: Callable[[int], bool]
+) -> AsyncIterator[T]:
+ """Filter an asynchronous sequence using the index of the elements.
+
+ The given function is synchronous, takes the index as an argument,
+ and returns ``True`` if the corresponding should be forwarded,
+ ``False`` otherwise.
+ """
+ enumerated = transform.enumerate.raw(source)
+ async with streamcontext(enumerated) as streamer:
+ async for i, item in streamer:
+ if func(i):
+ yield item
+
+
+@pipable_operator
+def slice(source: AsyncIterable[T], *args: int) -> AsyncIterator[T]:
+ """Slice an asynchronous sequence.
+
+ The arguments are the same as the builtin type slice.
+
+ There are two limitations compare to regular slices:
+ - Positive stop index with negative start index is not supported
+ - Negative step is not supported
+ """
+ s = builtins.slice(*args)
+ start, stop, step = s.start or 0, s.stop, s.step or 1
+ aiterator = aiter(source)
+ # Filter the first items
+ if start < 0:
+ aiterator = takelast.raw(aiterator, abs(start))
+ elif start > 0:
+ aiterator = skip.raw(aiterator, start)
+ # Filter the last items
+ if stop is not None:
+ if stop >= 0 and start < 0:
+ raise ValueError("Positive stop with negative start is not supported")
+ elif stop >= 0:
+ aiterator = take.raw(aiterator, stop - start)
+ else:
+ aiterator = skiplast.raw(aiterator, abs(stop))
+ # Filter step items
+ if step is not None:
+ if step > 1:
+ aiterator = filterindex.raw(aiterator, lambda i: i % step == 0)
+ elif step < 0:
+ raise ValueError("Negative step not supported")
+ # Return
+ return aiterator
+
+
+@pipable_operator
+async def item(source: AsyncIterable[T], index: int) -> AsyncIterator[T]:
+ """Forward the ``n``th element of an asynchronous sequence.
+
+ The index can be negative and works like regular indexing.
+ If the index is out of range, and ``IndexError`` is raised.
+ """
+ # Prepare
+ if index >= 0:
+ source = skip.raw(source, index)
+ else:
+ source = takelast(source, abs(index))
+ async with streamcontext(source) as streamer:
+ # Get first item
+ try:
+ result = await anext(streamer)
+ except StopAsyncIteration:
+ raise IndexError("Index out of range")
+ # Check length
+ if index < 0:
+ count = 1
+ async for _ in streamer:
+ count += 1
+ if count != abs(index):
+ raise IndexError("Index out of range")
+ # Yield result
+ yield result
+
+
+@pipable_operator
+def getitem(source: AsyncIterable[T], index: int | builtins.slice) -> AsyncIterator[T]:
+ """Forward one or several items from an asynchronous sequence.
+
+ The argument can either be a slice or an integer.
+ See the slice and item operators for more information.
+ """
+ if isinstance(index, builtins.slice):
+ return slice.raw(source, index.start, index.stop, index.step)
+ if isinstance(index, int):
+ return item.raw(source, index)
+ raise TypeError("Not a valid index (int or slice)")
+
+
+@pipable_operator
+async def filter(
+ source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+ """Filter an asynchronous sequence using an arbitrary function.
+
+ The function takes the item as an argument and returns ``True``
+ if it should be forwarded, ``False`` otherwise.
+ The function can either be synchronous or asynchronous.
+ """
+ iscorofunc = asyncio.iscoroutinefunction(func)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ result = func(item)
+ if iscorofunc:
+ assert isinstance(result, Awaitable)
+ result = await result
+ if result:
+ yield item
+
+
+@pipable_operator
+async def until(
+ source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+ """Forward an asynchronous sequence until a condition is met.
+
+ Contrary to the ``takewhile`` operator, the last tested element is included
+ in the sequence.
+
+ The given function takes the item as an argument and returns a boolean
+ corresponding to the condition to meet. The function can either be
+ synchronous or asynchronous.
+ """
+ iscorofunc = asyncio.iscoroutinefunction(func)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ result = func(item)
+ if iscorofunc:
+ assert isinstance(result, Awaitable)
+ result = await result
+ yield item
+ if result:
+ return
+
+
+@pipable_operator
+async def takewhile(
+ source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+ """Forward an asynchronous sequence while a condition is met.
+
+ Contrary to the ``until`` operator, the last tested element is not included
+ in the sequence.
+
+ The given function takes the item as an argument and returns a boolean
+ corresponding to the condition to meet. The function can either be
+ synchronous or asynchronous.
+ """
+ iscorofunc = asyncio.iscoroutinefunction(func)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ result = func(item)
+ if iscorofunc:
+ assert isinstance(result, Awaitable)
+ result = await result
+ if not result:
+ return
+ yield item
+
+
+@pipable_operator
+async def dropwhile(
+ source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+ """Discard the elements from an asynchronous sequence
+ while a condition is met.
+
+ The given function takes the item as an argument and returns a boolean
+ corresponding to the condition to meet. The function can either be
+ synchronous or asynchronous.
+ """
+ iscorofunc = asyncio.iscoroutinefunction(func)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ result = func(item)
+ if iscorofunc:
+ assert isinstance(result, Awaitable)
+ result = await result
+ if not result:
+ yield item
+ break
+ async for item in streamer:
+ yield item
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/time.py b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
new file mode 100644
index 00000000..c9c0df88
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
@@ -0,0 +1,56 @@
+"""Time-specific operators."""
+from __future__ import annotations
+import asyncio
+
+from ..aiter_utils import anext
+from ..core import streamcontext, pipable_operator
+
+from typing import TypeVar, AsyncIterable, AsyncIterator
+
+__all__ = ["spaceout", "delay", "timeout"]
+
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]:
+ """Make sure the elements of an asynchronous sequence are separated
+ in time by the given interval.
+ """
+ timeout = 0.0
+ loop = asyncio.get_event_loop()
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ delta = timeout - loop.time()
+ delay = delta if delta > 0 else 0.0
+ await asyncio.sleep(delay)
+ yield item
+ timeout = loop.time() + interval
+
+
+@pipable_operator
+async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]:
+ """Raise a time-out if an element of the asynchronous sequence
+ takes too long to arrive.
+
+ Note: the timeout is not global but specific to each step of
+ the iteration.
+ """
+ async with streamcontext(source) as streamer:
+ while True:
+ try:
+ item = await asyncio.wait_for(anext(streamer), timeout)
+ except StopAsyncIteration:
+ break
+ else:
+ yield item
+
+
+@pipable_operator
+async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]:
+ """Delay the iteration of an asynchronous sequence."""
+ await asyncio.sleep(delay)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield item
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py b/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py
new file mode 100644
index 00000000..f11bffa6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py
@@ -0,0 +1,128 @@
+"""Transformation operators."""
+
+from __future__ import annotations
+
+import asyncio
+import itertools
+from typing import (
+ Protocol,
+ TypeVar,
+ AsyncIterable,
+ AsyncIterator,
+ Awaitable,
+ cast,
+)
+
+from ..core import streamcontext, pipable_operator
+
+from . import select
+from . import create
+from . import aggregate
+from .combine import map, amap, smap
+
+__all__ = ["map", "enumerate", "starmap", "cycle", "chunks"]
+
+# map, amap and smap are also transform operators
+map, amap, smap
+
+T = TypeVar("T")
+U = TypeVar("U")
+
+
+@pipable_operator
+async def enumerate(
+ source: AsyncIterable[T], start: int = 0, step: int = 1
+) -> AsyncIterator[tuple[int, T]]:
+ """Generate ``(index, value)`` tuples from an asynchronous sequence.
+
+ This index is computed using a starting point and an increment,
+ respectively defaulting to ``0`` and ``1``.
+ """
+ count = itertools.count(start, step)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield next(count), item
+
+
+X = TypeVar("X", contravariant=True)
+Y = TypeVar("Y", covariant=True)
+
+
+class AsyncStarmapCallable(Protocol[X, Y]):
+ def __call__(self, arg: X, /, *args: X) -> Awaitable[Y]:
+ ...
+
+
+class SyncStarmapCallable(Protocol[X, Y]):
+ def __call__(self, arg: X, /, *args: X) -> Y:
+ ...
+
+
+@pipable_operator
+def starmap(
+ source: AsyncIterable[tuple[T, ...]],
+ func: SyncStarmapCallable[T, U] | AsyncStarmapCallable[T, U],
+ ordered: bool = True,
+ task_limit: int | None = None,
+) -> AsyncIterator[U]:
+ """Apply a given function to the unpacked elements of
+ an asynchronous sequence.
+
+ Each element is unpacked before applying the function.
+ The given function can either be synchronous or asynchronous.
+
+ The results can either be returned in or out of order, depending on
+ the corresponding ``ordered`` argument. This argument is ignored if
+ the provided function is synchronous.
+
+ The coroutines run concurrently but their amount can be limited using
+ the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+ to run sequentially. This argument is ignored if the provided function
+ is synchronous.
+ """
+ if asyncio.iscoroutinefunction(func):
+ async_func = cast("AsyncStarmapCallable[T, U]", func)
+
+ async def astarfunc(args: tuple[T, ...], *_: object) -> U:
+ awaitable = async_func(*args)
+ return await awaitable
+
+ return amap.raw(source, astarfunc, ordered=ordered, task_limit=task_limit)
+
+ else:
+ sync_func = cast("SyncStarmapCallable[T, U]", func)
+
+ def starfunc(args: tuple[T, ...], *_: object) -> U:
+ return sync_func(*args)
+
+ return smap.raw(source, starfunc)
+
+
+@pipable_operator
+async def cycle(source: AsyncIterable[T]) -> AsyncIterator[T]:
+ """Iterate indefinitely over an asynchronous sequence.
+
+ Note: it does not perform any buffering, but re-iterate over
+ the same given sequence instead. If the sequence is not
+ re-iterable, the generator might end up looping indefinitely
+ without yielding any item.
+ """
+ while True:
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield item
+ # Prevent blocking while loop if the stream is empty
+ await asyncio.sleep(0)
+
+
+@pipable_operator
+async def chunks(source: AsyncIterable[T], n: int) -> AsyncIterator[list[T]]:
+ """Generate chunks of size ``n`` from an asynchronous sequence.
+
+ The chunks are lists, and the last chunk might contain less than ``n``
+ elements.
+ """
+ async with streamcontext(source) as streamer:
+ async for first in streamer:
+ xs = select.take(create.preserve(streamer), n - 1)
+ yield [first] + await aggregate.list(xs)