aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/aiostream
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/aiostream
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiostream')
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/__init__.py31
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py262
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/core.py567
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/manager.py159
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/pipe.py39
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/py.typed0
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py10
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py222
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py90
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/combine.py282
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/create.py191
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/misc.py83
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/select.py284
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/time.py56
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/transform.py128
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/test_utils.py173
16 files changed, 2577 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiostream/__init__.py b/.venv/lib/python3.12/site-packages/aiostream/__init__.py
new file mode 100644
index 00000000..3ac80c16
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/__init__.py
@@ -0,0 +1,31 @@
+"""Generator-based operators for asynchronous iteration.
+
+The two main modules are:
+- stream: provide all the stream operators (to create new stream objects)
+- pipe: provides all the pipe operators (to combine operators using '|')
+
+Additionally, three core objects are exposed:
+- streamcontext: a context for safe stream iteration
+- StreamEmpty: the exception raised when an empty stream is awaited
+- operator: a decorator to create stream operators from async generators
+
+Some utility modules are also provided:
+- aiter_utils: utilties for asynchronous iteration
+- context_utils: utilites for asynchronous context
+- test_utils: utilities for testing stream operators (require pytest)
+"""
+
+from . import stream, pipe
+from .aiter_utils import async_, await_
+from .core import StreamEmpty, operator, pipable_operator, streamcontext
+
+__all__ = [
+ "stream",
+ "pipe",
+ "async_",
+ "await_",
+ "operator",
+ "pipable_operator",
+ "streamcontext",
+ "StreamEmpty",
+]
diff --git a/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py b/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py
new file mode 100644
index 00000000..f68ea846
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py
@@ -0,0 +1,262 @@
+"""Utilities for asynchronous iteration."""
+from __future__ import annotations
+from types import TracebackType
+
+import warnings
+import functools
+from typing import (
+ TYPE_CHECKING,
+ AsyncContextManager,
+ AsyncGenerator,
+ AsyncIterable,
+ Awaitable,
+ Callable,
+ Type,
+ TypeVar,
+ AsyncIterator,
+ Any,
+)
+
+if TYPE_CHECKING:
+ from typing_extensions import ParamSpec
+
+ P = ParamSpec("P")
+
+from contextlib import AsyncExitStack
+
+__all__ = [
+ "aiter",
+ "anext",
+ "await_",
+ "async_",
+ "is_async_iterable",
+ "assert_async_iterable",
+ "is_async_iterator",
+ "assert_async_iterator",
+ "AsyncIteratorContext",
+ "aitercontext",
+ "AsyncExitStack",
+]
+
+
+# Magic method shorcuts
+
+
+def aiter(obj: AsyncIterable[T]) -> AsyncIterator[T]:
+ """Access aiter magic method."""
+ assert_async_iterable(obj)
+ return obj.__aiter__()
+
+
+def anext(obj: AsyncIterator[T]) -> Awaitable[T]:
+ """Access anext magic method."""
+ assert_async_iterator(obj)
+ return obj.__anext__()
+
+
+# Async / await helper functions
+
+
+async def await_(obj: Awaitable[T]) -> T:
+ """Identity coroutine function."""
+ return await obj
+
+
+def async_(fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]:
+ """Wrap the given function into a coroutine function."""
+
+ @functools.wraps(fn)
+ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
+ return await fn(*args, **kwargs)
+
+ return wrapper
+
+
+# Iterability helpers
+
+
+def is_async_iterable(obj: object) -> bool:
+ """Check if the given object is an asynchronous iterable."""
+ return hasattr(obj, "__aiter__")
+
+
+def assert_async_iterable(obj: object) -> None:
+ """Raise a TypeError if the given object is not an
+ asynchronous iterable.
+ """
+ if not is_async_iterable(obj):
+ raise TypeError(f"{type(obj).__name__!r} object is not async iterable")
+
+
+def is_async_iterator(obj: object) -> bool:
+ """Check if the given object is an asynchronous iterator."""
+ return hasattr(obj, "__anext__")
+
+
+def assert_async_iterator(obj: object) -> None:
+ """Raise a TypeError if the given object is not an
+ asynchronous iterator.
+ """
+ if not is_async_iterator(obj):
+ raise TypeError(f"{type(obj).__name__!r} object is not an async iterator")
+
+
+# Async iterator context
+
+T = TypeVar("T")
+Self = TypeVar("Self", bound="AsyncIteratorContext[Any]")
+
+
+class AsyncIteratorContext(AsyncIterator[T], AsyncContextManager[Any]):
+ """Asynchronous iterator with context management.
+
+ The context management makes sure the aclose asynchronous method
+ of the corresponding iterator has run before it exits. It also issues
+ warnings and RuntimeError if it is used incorrectly.
+
+ Correct usage::
+
+ ait = some_asynchronous_iterable()
+ async with AsyncIteratorContext(ait) as safe_ait:
+ async for item in safe_ait:
+ <block>
+
+ It is nonetheless not meant to use directly.
+ Prefer aitercontext helper instead.
+ """
+
+ _STANDBY = "STANDBY"
+ _RUNNING = "RUNNING"
+ _FINISHED = "FINISHED"
+
+ def __init__(self, aiterator: AsyncIterator[T]):
+ """Initialize with an asynchrnous iterator."""
+ assert_async_iterator(aiterator)
+ if isinstance(aiterator, AsyncIteratorContext):
+ raise TypeError(f"{aiterator!r} is already an AsyncIteratorContext")
+ self._state = self._STANDBY
+ self._aiterator = aiterator
+
+ def __aiter__(self: Self) -> Self:
+ return self
+
+ def __anext__(self) -> Awaitable[T]:
+ if self._state == self._FINISHED:
+ raise RuntimeError(
+ f"{type(self).__name__} is closed and cannot be iterated"
+ )
+ if self._state == self._STANDBY:
+ warnings.warn(
+ f"{type(self).__name__} is iterated outside of its context",
+ stacklevel=2,
+ )
+ return anext(self._aiterator)
+
+ async def __aenter__(self: Self) -> Self:
+ if self._state == self._RUNNING:
+ raise RuntimeError(f"{type(self).__name__} has already been entered")
+ if self._state == self._FINISHED:
+ raise RuntimeError(
+ f"{type(self).__name__} is closed and cannot be iterated"
+ )
+ self._state = self._RUNNING
+ return self
+
+ async def __aexit__(
+ self,
+ typ: Type[BaseException] | None,
+ value: BaseException | None,
+ traceback: TracebackType | None,
+ ) -> bool:
+ try:
+ if self._state == self._FINISHED:
+ return False
+ try:
+ # No exception to throw
+ if typ is None:
+ return False
+
+ # Prevent GeneratorExit from being silenced
+ if typ is GeneratorExit:
+ return False
+
+ # No method to throw
+ if not hasattr(self._aiterator, "athrow"):
+ return False
+
+ # No frame to throw
+ if not getattr(self._aiterator, "ag_frame", True):
+ return False
+
+ # Cannot throw at the moment
+ if getattr(self._aiterator, "ag_running", False):
+ return False
+
+ # Throw
+ try:
+ assert isinstance(self._aiterator, AsyncGenerator)
+ await self._aiterator.athrow(typ, value, traceback)
+ raise RuntimeError("Async iterator didn't stop after athrow()")
+
+ # Exception has been (most probably) silenced
+ except StopAsyncIteration as exc:
+ return exc is not value
+
+ # A (possibly new) exception has been raised
+ except BaseException as exc:
+ if exc is value:
+ return False
+ raise
+ finally:
+ # Look for an aclose method
+ aclose = getattr(self._aiterator, "aclose", None)
+
+ # The ag_running attribute has been introduced with python 3.8
+ running = getattr(self._aiterator, "ag_running", False)
+ closed = not getattr(self._aiterator, "ag_frame", True)
+
+ # A RuntimeError is raised if aiterator is running or closed
+ if aclose and not running and not closed:
+ try:
+ await aclose()
+
+ # Work around bpo-35409
+ except GeneratorExit:
+ pass # pragma: no cover
+ finally:
+ self._state = self._FINISHED
+
+ async def aclose(self) -> None:
+ await self.__aexit__(None, None, None)
+
+ async def athrow(self, exc: Exception) -> T:
+ if self._state == self._FINISHED:
+ raise RuntimeError(f"{type(self).__name__} is closed and cannot be used")
+ assert isinstance(self._aiterator, AsyncGenerator)
+ item: T = await self._aiterator.athrow(exc)
+ return item
+
+
+def aitercontext(
+ aiterable: AsyncIterable[T],
+) -> AsyncIteratorContext[T]:
+ """Return an asynchronous context manager from an asynchronous iterable.
+
+ The context management makes sure the aclose asynchronous method
+ has run before it exits. It also issues warnings and RuntimeError
+ if it is used incorrectly.
+
+ It is safe to use with any asynchronous iterable and prevent
+ asynchronous iterator context to be wrapped twice.
+
+ Correct usage::
+
+ ait = some_asynchronous_iterable()
+ async with aitercontext(ait) as safe_ait:
+ async for item in safe_ait:
+ <block>
+ """
+ aiterator = aiter(aiterable)
+ if isinstance(aiterator, AsyncIteratorContext):
+ return aiterator
+ return AsyncIteratorContext(aiterator)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/core.py b/.venv/lib/python3.12/site-packages/aiostream/core.py
new file mode 100644
index 00000000..a11f0823
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/core.py
@@ -0,0 +1,567 @@
+"""Core objects for stream operators."""
+from __future__ import annotations
+
+import inspect
+import functools
+import sys
+import warnings
+
+from .aiter_utils import AsyncIteratorContext, aiter, assert_async_iterable
+from typing import (
+ Any,
+ AsyncIterator,
+ Callable,
+ Generator,
+ Iterator,
+ Protocol,
+ Union,
+ TypeVar,
+ cast,
+ AsyncIterable,
+ Awaitable,
+)
+
+from typing_extensions import ParamSpec, Concatenate
+
+
+__all__ = ["Stream", "Streamer", "StreamEmpty", "operator", "streamcontext"]
+
+
+# Exception
+
+
+class StreamEmpty(Exception):
+ """Exception raised when awaiting an empty stream."""
+
+ pass
+
+
+# Helpers
+
+T = TypeVar("T")
+X = TypeVar("X")
+A = TypeVar("A", contravariant=True)
+P = ParamSpec("P")
+Q = ParamSpec("Q")
+
+# Hack for python 3.8 compatibility
+if sys.version_info < (3, 9):
+ P = TypeVar("P")
+
+
+async def wait_stream(aiterable: BaseStream[T]) -> T:
+ """Wait for an asynchronous iterable to finish and return the last item.
+
+ The iterable is executed within a safe stream context.
+ A StreamEmpty exception is raised if the sequence is empty.
+ """
+
+ class Unassigned:
+ pass
+
+ last_item: Unassigned | T = Unassigned()
+
+ async with streamcontext(aiterable) as streamer:
+ async for item in streamer:
+ last_item = item
+
+ if isinstance(last_item, Unassigned):
+ raise StreamEmpty()
+ return last_item
+
+
+# Core objects
+
+
+class BaseStream(AsyncIterable[T], Awaitable[T]):
+ """
+ Base class for streams.
+
+ See `Stream` and `Streamer` for more information.
+ """
+
+ def __init__(self, factory: Callable[[], AsyncIterable[T]]) -> None:
+ """Initialize the stream with an asynchronous iterable factory.
+
+ The factory is a callable and takes no argument.
+ The factory return value is an asynchronous iterable.
+ """
+ aiter = factory()
+ assert_async_iterable(aiter)
+ self._generator = self._make_generator(aiter, factory)
+
+ def _make_generator(
+ self, first: AsyncIterable[T], factory: Callable[[], AsyncIterable[T]]
+ ) -> Iterator[AsyncIterable[T]]:
+ """Generate asynchronous iterables when required.
+
+ The first iterable is created beforehand for extra checking.
+ """
+ yield first
+ del first
+ while True:
+ yield factory()
+
+ def __await__(self) -> Generator[Any, None, T]:
+ """Await protocol.
+
+ Safely iterate and return the last element.
+ """
+ return wait_stream(self).__await__()
+
+ def __or__(self, func: Callable[[BaseStream[T]], X]) -> X:
+ """Pipe protocol.
+
+ Allow to pipe stream operators.
+ """
+ return func(self)
+
+ def __add__(self, value: AsyncIterable[X]) -> Stream[Union[X, T]]:
+ """Addition protocol.
+
+ Concatenate with a given asynchronous sequence.
+ """
+ from .stream import chain
+
+ return chain(self, value)
+
+ def __getitem__(self, value: Union[int, slice]) -> Stream[T]:
+ """Get item protocol.
+
+ Accept index or slice to extract the corresponding item(s)
+ """
+ from .stream import getitem
+
+ return getitem(self, value)
+
+ # Disable sync iteration
+ # This is necessary because __getitem__ is defined
+ # which is a valid fallback for for-loops in python
+ __iter__: None = None
+
+
+class Stream(BaseStream[T]):
+ """Enhanced asynchronous iterable.
+
+ It provides the following features:
+
+ - **Operator pipe-lining** - using pipe symbol ``|``
+ - **Repeatability** - every iteration creates a different iterator
+ - **Safe iteration context** - using ``async with`` and the ``stream``
+ method
+ - **Simplified execution** - get the last element from a stream using
+ ``await``
+ - **Slicing and indexing** - using square brackets ``[]``
+ - **Concatenation** - using addition symbol ``+``
+
+ It is not meant to be instanciated directly.
+ Use the stream operators instead.
+
+ Example::
+
+ xs = stream.count() # xs is a stream object
+ ys = xs | pipe.skip(5) # pipe xs and skip the first 5 elements
+ zs = ys[5:10:2] # slice ys using start, stop and step
+
+ async with zs.stream() as streamer: # stream zs in a safe context
+ async for z in streamer: # iterate the zs streamer
+ print(z) # Prints 10, 12, 14
+
+ result = await zs # await zs and return its last element
+ print(result) # Prints 14
+ result = await zs # zs can be used several times
+ print(result) # Prints 14
+ """
+
+ def stream(self) -> Streamer[T]:
+ """Return a streamer context for safe iteration.
+
+ Example::
+
+ xs = stream.count()
+ async with xs.stream() as streamer:
+ async for item in streamer:
+ <block>
+
+ """
+ return self.__aiter__()
+
+ def __aiter__(self) -> Streamer[T]:
+ """Asynchronous iteration protocol.
+
+ Return a streamer context for safe iteration.
+ """
+ return streamcontext(next(self._generator))
+
+ # Advertise the proper synthax for entering a stream context
+
+ __aexit__: None = None
+
+ async def __aenter__(self) -> None:
+ raise TypeError(
+ "A stream object cannot be used as a context manager. "
+ "Use the `stream` method instead: "
+ "`async with xs.stream() as streamer`"
+ )
+
+
+class Streamer(AsyncIteratorContext[T], BaseStream[T]):
+ """Enhanced asynchronous iterator context.
+
+ It is similar to AsyncIteratorContext but provides the stream
+ magic methods for concatenation, indexing and awaiting.
+
+ It's not meant to be instanciated directly, use streamcontext instead.
+
+ Example::
+
+ ait = some_asynchronous_iterable()
+ async with streamcontext(ait) as streamer:
+ async for item in streamer:
+ await streamer[5]
+ """
+
+ pass
+
+
+def streamcontext(aiterable: AsyncIterable[T]) -> Streamer[T]:
+ """Return a stream context manager from an asynchronous iterable.
+
+ The context management makes sure the aclose asynchronous method
+ of the corresponding iterator has run before it exits. It also issues
+ warnings and RuntimeError if it is used incorrectly.
+
+ It is safe to use with any asynchronous iterable and prevent
+ asynchronous iterator context to be wrapped twice.
+
+ Correct usage::
+
+ ait = some_asynchronous_iterable()
+ async with streamcontext(ait) as streamer:
+ async for item in streamer:
+ <block>
+
+ For streams objects, it is possible to use the stream method instead::
+
+ xs = stream.count()
+ async with xs.stream() as streamer:
+ async for item in streamer:
+ <block>
+ """
+ aiterator = aiter(aiterable)
+ if isinstance(aiterator, Streamer):
+ return aiterator
+ return Streamer(aiterator)
+
+
+# Operator type protocol
+
+
+class OperatorType(Protocol[P, T]):
+ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Stream[T]:
+ ...
+
+ def raw(self, *args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]:
+ ...
+
+
+class PipableOperatorType(Protocol[A, P, T]):
+ def __call__(
+ self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs
+ ) -> Stream[T]:
+ ...
+
+ def raw(
+ self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs
+ ) -> AsyncIterator[T]:
+ ...
+
+ def pipe(
+ self, *args: P.args, **kwargs: P.kwargs
+ ) -> Callable[[AsyncIterable[A]], Stream[T]]:
+ ...
+
+
+# Operator decorator
+
+
+def operator(
+ func: Callable[P, AsyncIterator[T]] | None = None,
+ pipable: bool | None = None,
+) -> OperatorType[P, T]:
+ """Create a stream operator from an asynchronous generator
+ (or any function returning an asynchronous iterable).
+
+ Decorator usage::
+
+ @operator
+ async def random(offset=0., width=1.):
+ while True:
+ yield offset + width * random.random()
+
+ The return value is a dynamically created class.
+ It has the same name, module and doc as the original function.
+
+ A new stream is created by simply instanciating the operator::
+
+ xs = random()
+
+ The original function is called at instanciation to check that
+ signature match. Other methods are available:
+
+ - `original`: the original function as a static method
+ - `raw`: same as original but add extra checking
+
+ The `pipable` argument is deprecated, use `pipable_operator` instead.
+ """
+
+ # Handle compatibility with legacy (aiostream <= 0.4)
+ if pipable is not None or func is None:
+ warnings.warn(
+ "The `pipable` argument is deprecated. Use either `@operator` or `@pipable_operator` directly.",
+ DeprecationWarning,
+ )
+ if func is None:
+ return pipable_operator if pipable else operator # type: ignore
+ if pipable is True:
+ return pipable_operator(func) # type: ignore
+
+ # First check for classmethod instance, to avoid more confusing errors later on
+ if isinstance(func, classmethod):
+ raise ValueError(
+ "An operator cannot be created from a class method, "
+ "since the decorated function becomes an operator class"
+ )
+
+ # Gather data
+ bases = (Stream,)
+ name = func.__name__
+ module = func.__module__
+ extra_doc = func.__doc__
+ doc = extra_doc or f"Regular {name} stream operator."
+
+ # Extract signature
+ signature = inspect.signature(func)
+ parameters = list(signature.parameters.values())
+ if parameters and parameters[0].name in ("self", "cls"):
+ raise ValueError(
+ "An operator cannot be created from a method, "
+ "since the decorated function becomes an operator class"
+ )
+
+ # Look for "more_sources"
+ for i, p in enumerate(parameters):
+ if p.name == "more_sources" and p.kind == inspect.Parameter.VAR_POSITIONAL:
+ more_sources_index = i
+ break
+ else:
+ more_sources_index = None
+
+ # Injected parameters
+ self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
+ inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD)
+
+ # Wrapped static method
+ original = func
+ original.__qualname__ = name + ".original"
+
+ # Raw static method
+ raw = func
+ raw.__qualname__ = name + ".raw"
+
+ # Init method
+ def init(self: BaseStream[T], *args: P.args, **kwargs: P.kwargs) -> None:
+ if more_sources_index is not None:
+ for source in args[more_sources_index:]:
+ assert_async_iterable(source)
+ factory = functools.partial(raw, *args, **kwargs)
+ return BaseStream.__init__(self, factory)
+
+ # Customize init signature
+ new_parameters = [self_parameter] + parameters
+ init.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined]
+
+ # Customize init method
+ init.__qualname__ = name + ".__init__"
+ init.__name__ = "__init__"
+ init.__module__ = module
+ init.__doc__ = f"Initialize the {name} stream."
+
+ # Gather attributes
+ attrs = {
+ "__init__": init,
+ "__module__": module,
+ "__doc__": doc,
+ "raw": staticmethod(raw),
+ "original": staticmethod(original),
+ }
+
+ # Create operator class
+ return cast("OperatorType[P, T]", type(name, bases, attrs))
+
+
+def pipable_operator(
+ func: Callable[Concatenate[AsyncIterable[X], P], AsyncIterator[T]],
+) -> PipableOperatorType[X, P, T]:
+ """Create a pipable stream operator from an asynchronous generator
+ (or any function returning an asynchronous iterable).
+
+ Decorator usage::
+
+ @pipable_operator
+ async def multiply(source, factor):
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield factor * item
+
+ The first argument is expected to be the asynchronous iteratable used
+ for piping.
+
+ The return value is a dynamically created class.
+ It has the same name, module and doc as the original function.
+
+ A new stream is created by simply instanciating the operator::
+
+ xs = random()
+ ys = multiply(xs, 2)
+
+ The original function is called at instanciation to check that
+ signature match. The source is also checked for asynchronous iteration.
+
+ The operator also have a pipe class method that can be used along
+ with the piping synthax::
+
+ xs = random()
+ ys = xs | multiply.pipe(2)
+
+ This is strictly equivalent to the previous example.
+
+ Other methods are available:
+
+ - `original`: the original function as a static method
+ - `raw`: same as original but add extra checking
+
+ The raw method is useful to create new operators from existing ones::
+
+ @pipable_operator
+ def double(source):
+ return multiply.raw(source, 2)
+ """
+
+ # First check for classmethod instance, to avoid more confusing errors later on
+ if isinstance(func, classmethod):
+ raise ValueError(
+ "An operator cannot be created from a class method, "
+ "since the decorated function becomes an operator class"
+ )
+
+ # Gather data
+ bases = (Stream,)
+ name = func.__name__
+ module = func.__module__
+ extra_doc = func.__doc__
+ doc = extra_doc or f"Regular {name} stream operator."
+
+ # Extract signature
+ signature = inspect.signature(func)
+ parameters = list(signature.parameters.values())
+ if parameters and parameters[0].name in ("self", "cls"):
+ raise ValueError(
+ "An operator cannot be created from a method, "
+ "since the decorated function becomes an operator class"
+ )
+
+ # Look for "more_sources"
+ for i, p in enumerate(parameters):
+ if p.name == "more_sources" and p.kind == inspect.Parameter.VAR_POSITIONAL:
+ more_sources_index = i
+ break
+ else:
+ more_sources_index = None
+
+ # Injected parameters
+ self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD)
+ cls_parameter = inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD)
+
+ # Wrapped static method
+ original = func
+ original.__qualname__ = name + ".original"
+
+ # Raw static method
+ def raw(
+ arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs
+ ) -> AsyncIterator[T]:
+ assert_async_iterable(arg)
+ if more_sources_index is not None:
+ for source in args[more_sources_index - 1 :]:
+ assert_async_iterable(source)
+ return func(arg, *args, **kwargs)
+
+ # Custonize raw method
+ raw.__signature__ = signature # type: ignore[attr-defined]
+ raw.__qualname__ = name + ".raw"
+ raw.__module__ = module
+ raw.__doc__ = doc
+
+ # Init method
+ def init(
+ self: BaseStream[T], arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs
+ ) -> None:
+ assert_async_iterable(arg)
+ if more_sources_index is not None:
+ for source in args[more_sources_index - 1 :]:
+ assert_async_iterable(source)
+ factory = functools.partial(raw, arg, *args, **kwargs)
+ return BaseStream.__init__(self, factory)
+
+ # Customize init signature
+ new_parameters = [self_parameter] + parameters
+ init.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined]
+
+ # Customize init method
+ init.__qualname__ = name + ".__init__"
+ init.__name__ = "__init__"
+ init.__module__ = module
+ init.__doc__ = f"Initialize the {name} stream."
+
+ # Pipe class method
+ def pipe(
+ cls: PipableOperatorType[X, P, T],
+ /,
+ *args: P.args,
+ **kwargs: P.kwargs,
+ ) -> Callable[[AsyncIterable[X]], Stream[T]]:
+ return lambda source: cls(source, *args, **kwargs)
+
+ # Customize pipe signature
+ if parameters and parameters[0].kind in (
+ inspect.Parameter.POSITIONAL_ONLY,
+ inspect.Parameter.POSITIONAL_OR_KEYWORD,
+ ):
+ new_parameters = [cls_parameter] + parameters[1:]
+ else:
+ new_parameters = [cls_parameter] + parameters
+ pipe.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined]
+
+ # Customize pipe method
+ pipe.__qualname__ = name + ".pipe"
+ pipe.__module__ = module
+ pipe.__doc__ = f'Pipable "{name}" stream operator.'
+ if extra_doc:
+ pipe.__doc__ += "\n\n " + extra_doc
+
+ # Gather attributes
+ attrs = {
+ "__init__": init,
+ "__module__": module,
+ "__doc__": doc,
+ "raw": staticmethod(raw),
+ "original": staticmethod(original),
+ "pipe": classmethod(pipe), # type: ignore[arg-type]
+ }
+
+ # Create operator class
+ return cast(
+ "PipableOperatorType[X, P, T]",
+ type(name, bases, attrs),
+ )
diff --git a/.venv/lib/python3.12/site-packages/aiostream/manager.py b/.venv/lib/python3.12/site-packages/aiostream/manager.py
new file mode 100644
index 00000000..bab224a5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/manager.py
@@ -0,0 +1,159 @@
+"""Provide a context to easily manage several streamers running
+concurrently.
+"""
+from __future__ import annotations
+
+import asyncio
+from .aiter_utils import AsyncExitStack
+
+from .aiter_utils import anext
+from .core import streamcontext
+from typing import (
+ TYPE_CHECKING,
+ Awaitable,
+ List,
+ Set,
+ Tuple,
+ Generic,
+ TypeVar,
+ Any,
+ Type,
+ AsyncIterable,
+)
+from types import TracebackType
+
+if TYPE_CHECKING:
+ from asyncio import Task
+ from aiostream.core import Streamer
+
+T = TypeVar("T")
+
+
+class TaskGroup:
+ def __init__(self) -> None:
+ self._pending: set[Task[Any]] = set()
+
+ async def __aenter__(self) -> TaskGroup:
+ return self
+
+ async def __aexit__(
+ self,
+ typ: Type[BaseException] | None,
+ value: BaseException | None,
+ traceback: TracebackType | None,
+ ) -> None:
+ while self._pending:
+ task = self._pending.pop()
+ await self.cancel_task(task)
+
+ def create_task(self, coro: Awaitable[T]) -> Task[T]:
+ task = asyncio.ensure_future(coro)
+ self._pending.add(task)
+ return task
+
+ async def wait_any(self, tasks: List[Task[T]]) -> Set[Task[T]]:
+ done, _ = await asyncio.wait(tasks, return_when="FIRST_COMPLETED")
+ self._pending -= done
+ return done
+
+ async def wait_all(self, tasks: List[Task[T]]) -> Set[Task[T]]:
+ if not tasks:
+ return set()
+ done, _ = await asyncio.wait(tasks)
+ self._pending -= done
+ return done
+
+ async def cancel_task(self, task: Task[Any]) -> None:
+ try:
+ # The task is already cancelled
+ if task.cancelled():
+ pass
+ # The task is already finished
+ elif task.done():
+ # Discard the pending exception (if any).
+ # This makes sense since we don't know in which context the exception
+ # was meant to be processed. For instance, a `StopAsyncIteration`
+ # might be raised to notify that the end of a streamer has been reached.
+ task.exception()
+ # The task needs to be cancelled and awaited
+ else:
+ task.cancel()
+ try:
+ await task
+ except asyncio.CancelledError:
+ pass
+ # Silence any exception raised while cancelling the task.
+ # This might happen if the `CancelledError` is silenced, and the
+ # corresponding async generator returns, causing the `anext` call
+ # to raise a `StopAsyncIteration`.
+ except Exception:
+ pass
+ finally:
+ self._pending.discard(task)
+
+
+class StreamerManager(Generic[T]):
+ def __init__(self) -> None:
+ self.tasks: dict[Streamer[T], Task[T]] = {}
+ self.streamers: list[Streamer[T]] = []
+ self.group: TaskGroup = TaskGroup()
+ self.stack = AsyncExitStack()
+
+ async def __aenter__(self) -> StreamerManager[T]:
+ await self.stack.__aenter__()
+ await self.stack.enter_async_context(self.group)
+ return self
+
+ async def __aexit__(
+ self,
+ typ: Type[BaseException] | None,
+ value: BaseException | None,
+ traceback: TracebackType | None,
+ ) -> bool:
+ for streamer in self.streamers:
+ task = self.tasks.pop(streamer, None)
+ if task is not None:
+ self.stack.push_async_callback(self.group.cancel_task, task)
+ self.stack.push_async_exit(streamer)
+ self.tasks.clear()
+ self.streamers.clear()
+ return await self.stack.__aexit__(typ, value, traceback)
+
+ async def enter_and_create_task(self, aiter: AsyncIterable[T]) -> Streamer[T]:
+ streamer = streamcontext(aiter)
+ await streamer.__aenter__()
+ self.streamers.append(streamer)
+ self.create_task(streamer)
+ return streamer
+
+ def create_task(self, streamer: Streamer[T]) -> None:
+ assert streamer in self.streamers
+ assert streamer not in self.tasks
+ self.tasks[streamer] = self.group.create_task(anext(streamer))
+
+ async def wait_single_event(
+ self, filters: list[Streamer[T]]
+ ) -> Tuple[Streamer[T], Task[T]]:
+ tasks = [self.tasks[streamer] for streamer in filters]
+ done = await self.group.wait_any(tasks)
+ for streamer in filters:
+ if self.tasks.get(streamer) in done:
+ return streamer, self.tasks.pop(streamer)
+ assert False
+
+ async def clean_streamer(self, streamer: Streamer[T]) -> None:
+ task = self.tasks.pop(streamer, None)
+ if task is not None:
+ await self.group.cancel_task(task)
+ await streamer.aclose()
+ self.streamers.remove(streamer)
+
+ async def clean_streamers(self, streamers: list[Streamer[T]]) -> None:
+ tasks = [
+ self.group.create_task(self.clean_streamer(streamer))
+ for streamer in streamers
+ ]
+ done = await self.group.wait_all(tasks)
+ # Raise exception if any
+ for task in done:
+ task.result()
diff --git a/.venv/lib/python3.12/site-packages/aiostream/pipe.py b/.venv/lib/python3.12/site-packages/aiostream/pipe.py
new file mode 100644
index 00000000..c27e101a
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/pipe.py
@@ -0,0 +1,39 @@
+"""Gather the pipe operators."""
+from __future__ import annotations
+
+from . import stream
+
+accumulate = stream.accumulate.pipe
+action = stream.action.pipe
+amap = stream.amap.pipe
+chain = stream.chain.pipe
+chunks = stream.chunks.pipe
+concat = stream.concat.pipe
+concatmap = stream.concatmap.pipe
+cycle = stream.cycle.pipe
+delay = stream.delay.pipe
+dropwhile = stream.dropwhile.pipe
+enumerate = stream.enumerate.pipe
+filter = stream.filter.pipe
+flatmap = stream.flatmap.pipe
+flatten = stream.flatten.pipe
+getitem = stream.getitem.pipe
+list = stream.list.pipe
+map = stream.map.pipe
+merge = stream.merge.pipe
+print = stream.print.pipe
+reduce = stream.reduce.pipe
+skip = stream.skip.pipe
+skiplast = stream.skiplast.pipe
+smap = stream.smap.pipe
+spaceout = stream.spaceout.pipe
+starmap = stream.starmap.pipe
+switch = stream.switch.pipe
+switchmap = stream.switchmap.pipe
+take = stream.take.pipe
+takelast = stream.takelast.pipe
+takewhile = stream.takewhile.pipe
+timeout = stream.timeout.pipe
+until = stream.until.pipe
+zip = stream.zip.pipe
+ziplatest = stream.ziplatest.pipe
diff --git a/.venv/lib/python3.12/site-packages/aiostream/py.typed b/.venv/lib/python3.12/site-packages/aiostream/py.typed
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/py.typed
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py b/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py
new file mode 100644
index 00000000..cd20d6c9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py
@@ -0,0 +1,10 @@
+"""Gather all the stream operators."""
+
+from .create import *
+from .transform import *
+from .select import *
+from .combine import *
+from .aggregate import *
+from .time import *
+from .misc import *
+from .advanced import *
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py b/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py
new file mode 100644
index 00000000..106f0f1d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py
@@ -0,0 +1,222 @@
+"""Advanced operators (to deal with streams of higher order) ."""
+from __future__ import annotations
+
+from typing import AsyncIterator, AsyncIterable, TypeVar, Union, cast
+from typing_extensions import ParamSpec
+
+from . import combine
+
+from ..core import Streamer, pipable_operator
+from ..manager import StreamerManager
+
+
+__all__ = ["concat", "flatten", "switch", "concatmap", "flatmap", "switchmap"]
+
+
+T = TypeVar("T")
+U = TypeVar("U")
+P = ParamSpec("P")
+
+
+# Helper to manage stream of higher order
+
+
+@pipable_operator
+async def base_combine(
+ source: AsyncIterable[AsyncIterable[T]],
+ switch: bool = False,
+ ordered: bool = False,
+ task_limit: int | None = None,
+) -> AsyncIterator[T]:
+ """Base operator for managing an asynchronous sequence of sequences.
+
+ The sequences are awaited concurrently, although it's possible to limit
+ the amount of running sequences using the `task_limit` argument.
+
+ The ``switch`` argument enables the switch mecanism, which cause the
+ previous subsequence to be discarded when a new one is created.
+
+ The items can either be generated in order or as soon as they're received,
+ depending on the ``ordered`` argument.
+ """
+
+ # Task limit
+ if task_limit is not None and not task_limit > 0:
+ raise ValueError("The task limit must be None or greater than 0")
+
+ # Safe context
+ async with StreamerManager[Union[AsyncIterable[T], T]]() as manager:
+ main_streamer: Streamer[
+ AsyncIterable[T] | T
+ ] | None = await manager.enter_and_create_task(source)
+
+ # Loop over events
+ while manager.tasks:
+ # Extract streamer groups
+ substreamers = manager.streamers[1:]
+ mainstreamers = [main_streamer] if main_streamer in manager.tasks else []
+
+ # Switch - use the main streamer then the substreamer
+ if switch:
+ filters = mainstreamers + substreamers
+ # Concat - use the first substreamer then the main streamer
+ elif ordered:
+ filters = substreamers[:1] + mainstreamers
+ # Flat - use the substreamers then the main streamer
+ else:
+ filters = substreamers + mainstreamers
+
+ # Wait for next event
+ streamer, task = await manager.wait_single_event(filters)
+
+ # Get result
+ try:
+ result = task.result()
+
+ # End of stream
+ except StopAsyncIteration:
+ # Main streamer is finished
+ if streamer is main_streamer:
+ main_streamer = None
+
+ # A substreamer is finished
+ else:
+ await manager.clean_streamer(streamer)
+
+ # Re-schedule the main streamer if necessary
+ if main_streamer is not None and main_streamer not in manager.tasks:
+ manager.create_task(main_streamer)
+
+ # Process result
+ else:
+ # Switch mecanism
+ if switch and streamer is main_streamer:
+ await manager.clean_streamers(substreamers)
+
+ # Setup a new source
+ if streamer is main_streamer:
+ assert isinstance(result, AsyncIterable)
+ await manager.enter_and_create_task(result)
+
+ # Re-schedule the main streamer if task limit allows it
+ if task_limit is None or task_limit > len(manager.tasks):
+ manager.create_task(streamer)
+
+ # Yield the result
+ else:
+ item = cast("T", result)
+ yield item
+
+ # Re-schedule the streamer
+ manager.create_task(streamer)
+
+
+# Advanced operators (for streams of higher order)
+
+
+@pipable_operator
+def concat(
+ source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None
+) -> AsyncIterator[T]:
+ """Given an asynchronous sequence of sequences, generate the elements
+ of the sequences in order.
+
+ The sequences are awaited concurrently, although it's possible to limit
+ the amount of running sequences using the `task_limit` argument.
+
+ Errors raised in the source or an element sequence are propagated.
+ """
+ return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=True)
+
+
+@pipable_operator
+def flatten(
+ source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None
+) -> AsyncIterator[T]:
+ """Given an asynchronous sequence of sequences, generate the elements
+ of the sequences as soon as they're received.
+
+ The sequences are awaited concurrently, although it's possible to limit
+ the amount of running sequences using the `task_limit` argument.
+
+ Errors raised in the source or an element sequence are propagated.
+ """
+ return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=False)
+
+
+@pipable_operator
+def switch(source: AsyncIterable[AsyncIterable[T]]) -> AsyncIterator[T]:
+ """Given an asynchronous sequence of sequences, generate the elements of
+ the most recent sequence.
+
+ Element sequences are generated eagerly, and closed once they are
+ superseded by a more recent sequence. Once the main sequence is finished,
+ the last subsequence will be exhausted completely.
+
+ Errors raised in the source or an element sequence (that was not already
+ closed) are propagated.
+ """
+ return base_combine.raw(source, switch=True)
+
+
+# Advanced *-map operators
+
+
+@pipable_operator
+def concatmap(
+ source: AsyncIterable[T],
+ func: combine.SmapCallable[T, AsyncIterable[U]],
+ *more_sources: AsyncIterable[T],
+ task_limit: int | None = None,
+) -> AsyncIterator[U]:
+ """Apply a given function that creates a sequence from the elements of one
+ or several asynchronous sequences, and generate the elements of the created
+ sequences in order.
+
+ The function is applied as described in `map`, and must return an
+ asynchronous sequence. The returned sequences are awaited concurrently,
+ although it's possible to limit the amount of running sequences using
+ the `task_limit` argument.
+ """
+ mapped = combine.smap.raw(source, func, *more_sources)
+ return concat.raw(mapped, task_limit=task_limit)
+
+
+@pipable_operator
+def flatmap(
+ source: AsyncIterable[T],
+ func: combine.SmapCallable[T, AsyncIterable[U]],
+ *more_sources: AsyncIterable[T],
+ task_limit: int | None = None,
+) -> AsyncIterator[U]:
+ """Apply a given function that creates a sequence from the elements of one
+ or several asynchronous sequences, and generate the elements of the created
+ sequences as soon as they arrive.
+
+ The function is applied as described in `map`, and must return an
+ asynchronous sequence. The returned sequences are awaited concurrently,
+ although it's possible to limit the amount of running sequences using
+ the `task_limit` argument.
+
+ Errors raised in a source or output sequence are propagated.
+ """
+ mapped = combine.smap.raw(source, func, *more_sources)
+ return flatten.raw(mapped, task_limit=task_limit)
+
+
+@pipable_operator
+def switchmap(
+ source: AsyncIterable[T],
+ func: combine.SmapCallable[T, AsyncIterable[U]],
+ *more_sources: AsyncIterable[T],
+) -> AsyncIterator[U]:
+ """Apply a given function that creates a sequence from the elements of one
+ or several asynchronous sequences and generate the elements of the most
+ recently created sequence.
+
+ The function is applied as described in `map`, and must return an
+ asynchronous sequence. Errors raised in a source or output sequence (that
+ was not already closed) are propagated.
+ """
+ mapped = combine.smap.raw(source, func, *more_sources)
+ return switch.raw(mapped)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py b/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py
new file mode 100644
index 00000000..8ed7c0e1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py
@@ -0,0 +1,90 @@
+"""Aggregation operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+import operator as op
+from typing import AsyncIterator, Awaitable, Callable, TypeVar, AsyncIterable, cast
+
+
+from . import select
+from ..aiter_utils import anext
+from ..core import pipable_operator, streamcontext
+
+__all__ = ["accumulate", "reduce", "list"]
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def accumulate(
+ source: AsyncIterable[T],
+ func: Callable[[T, T], Awaitable[T] | T] = op.add,
+ initializer: T | None = None,
+) -> AsyncIterator[T]:
+ """Generate a series of accumulated sums (or other binary function)
+ from an asynchronous sequence.
+
+ If ``initializer`` is present, it is placed before the items
+ of the sequence in the calculation, and serves as a default
+ when the sequence is empty.
+ """
+ iscorofunc = asyncio.iscoroutinefunction(func)
+ async with streamcontext(source) as streamer:
+ # Initialize
+ if initializer is None:
+ try:
+ value = await anext(streamer)
+ except StopAsyncIteration:
+ return
+ else:
+ value = initializer
+ # First value
+ yield value
+ # Iterate streamer
+ async for item in streamer:
+ returned = func(value, item)
+ if iscorofunc:
+ awaitable_value = cast("Awaitable[T]", returned)
+ value = await awaitable_value
+ else:
+ value = cast("T", returned)
+ yield value
+
+
+@pipable_operator
+def reduce(
+ source: AsyncIterable[T],
+ func: Callable[[T, T], Awaitable[T] | T],
+ initializer: T | None = None,
+) -> AsyncIterator[T]:
+ """Apply a function of two arguments cumulatively to the items
+ of an asynchronous sequence, reducing the sequence to a single value.
+
+ If ``initializer`` is present, it is placed before the items
+ of the sequence in the calculation, and serves as a default when the
+ sequence is empty.
+ """
+ acc = accumulate.raw(source, func, initializer)
+ return select.item.raw(acc, -1)
+
+
+@pipable_operator
+async def list(source: AsyncIterable[T]) -> AsyncIterator[builtins.list[T]]:
+ """Build a list from an asynchronous sequence.
+
+ All the intermediate steps are generated, starting from the empty list.
+
+ This operator can be used to easily convert a stream into a list::
+
+ lst = await stream.list(x)
+
+ ..note:: The same list object is produced at each step in order to avoid
+ memory copies.
+ """
+ result: builtins.list[T] = []
+ yield result
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ result.append(item)
+ yield result
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py b/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py
new file mode 100644
index 00000000..a782a730
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py
@@ -0,0 +1,282 @@
+"""Combination operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+
+from typing import (
+ Awaitable,
+ Protocol,
+ TypeVar,
+ AsyncIterable,
+ AsyncIterator,
+ Callable,
+ cast,
+)
+from typing_extensions import ParamSpec
+
+from ..aiter_utils import AsyncExitStack, anext
+from ..core import streamcontext, pipable_operator
+
+from . import create
+from . import select
+from . import advanced
+from . import aggregate
+
+__all__ = ["chain", "zip", "map", "merge", "ziplatest", "amap", "smap"]
+
+T = TypeVar("T")
+U = TypeVar("U")
+K = TypeVar("K")
+P = ParamSpec("P")
+
+
+@pipable_operator
+async def chain(
+ source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[T]:
+ """Chain asynchronous sequences together, in the order they are given.
+
+ Note: the sequences are not iterated until it is required,
+ so if the operation is interrupted, the remaining sequences
+ will be left untouched.
+ """
+ sources = source, *more_sources
+ for source in sources:
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield item
+
+
+@pipable_operator
+async def zip(
+ source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[tuple[T, ...]]:
+ """Combine and forward the elements of several asynchronous sequences.
+
+ Each generated value is a tuple of elements, using the same order as
+ their respective sources. The generation continues until the shortest
+ sequence is exhausted.
+
+ Note: the different sequences are awaited in parrallel, so that their
+ waiting times don't add up.
+ """
+ sources = source, *more_sources
+
+ # One sources
+ if len(sources) == 1:
+ (source,) = sources
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield (item,)
+ return
+
+ # N sources
+ async with AsyncExitStack() as stack:
+ # Handle resources
+ streamers = [
+ await stack.enter_async_context(streamcontext(source)) for source in sources
+ ]
+ # Loop over items
+ while True:
+ try:
+ coros = builtins.map(anext, streamers)
+ items = await asyncio.gather(*coros)
+ except StopAsyncIteration:
+ break
+ else:
+ yield tuple(items)
+
+
+X = TypeVar("X", contravariant=True)
+Y = TypeVar("Y", covariant=True)
+
+
+class SmapCallable(Protocol[X, Y]):
+ def __call__(self, arg: X, /, *args: X) -> Y:
+ ...
+
+
+class AmapCallable(Protocol[X, Y]):
+ async def __call__(self, arg: X, /, *args: X) -> Y:
+ ...
+
+
+class MapCallable(Protocol[X, Y]):
+ def __call__(self, arg: X, /, *args: X) -> Awaitable[Y] | Y:
+ ...
+
+
+@pipable_operator
+async def smap(
+ source: AsyncIterable[T],
+ func: SmapCallable[T, U],
+ *more_sources: AsyncIterable[T],
+) -> AsyncIterator[U]:
+ """Apply a given function to the elements of one or several
+ asynchronous sequences.
+
+ Each element is used as a positional argument, using the same order as
+ their respective sources. The generation continues until the shortest
+ sequence is exhausted. The function is treated synchronously.
+
+ Note: if more than one sequence is provided, they're awaited concurrently
+ so that their waiting times don't add up.
+ """
+ stream = zip(source, *more_sources)
+ async with streamcontext(stream) as streamer:
+ async for item in streamer:
+ yield func(*item)
+
+
+@pipable_operator
+def amap(
+ source: AsyncIterable[T],
+ corofn: AmapCallable[T, U],
+ *more_sources: AsyncIterable[T],
+ ordered: bool = True,
+ task_limit: int | None = None,
+) -> AsyncIterator[U]:
+ """Apply a given coroutine function to the elements of one or several
+ asynchronous sequences.
+
+ Each element is used as a positional argument, using the same order as
+ their respective sources. The generation continues until the shortest
+ sequence is exhausted.
+
+ The results can either be returned in or out of order, depending on
+ the corresponding ``ordered`` argument.
+
+ The coroutines run concurrently but their amount can be limited using
+ the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+ to run sequentially.
+
+ If more than one sequence is provided, they're also awaited concurrently,
+ so that their waiting times don't add up.
+ """
+
+ async def func(arg: T, *args: T) -> AsyncIterable[U]:
+ yield await corofn(arg, *args)
+
+ if ordered:
+ return advanced.concatmap.raw(
+ source, func, *more_sources, task_limit=task_limit
+ )
+ return advanced.flatmap.raw(source, func, *more_sources, task_limit=task_limit)
+
+
+@pipable_operator
+def map(
+ source: AsyncIterable[T],
+ func: MapCallable[T, U],
+ *more_sources: AsyncIterable[T],
+ ordered: bool = True,
+ task_limit: int | None = None,
+) -> AsyncIterator[U]:
+ """Apply a given function to the elements of one or several
+ asynchronous sequences.
+
+ Each element is used as a positional argument, using the same order as
+ their respective sources. The generation continues until the shortest
+ sequence is exhausted. The function can either be synchronous or
+ asynchronous (coroutine function).
+
+ The results can either be returned in or out of order, depending on
+ the corresponding ``ordered`` argument. This argument is ignored if the
+ provided function is synchronous.
+
+ The coroutines run concurrently but their amount can be limited using
+ the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+ to run sequentially. This argument is ignored if the provided function
+ is synchronous.
+
+ If more than one sequence is provided, they're also awaited concurrently,
+ so that their waiting times don't add up.
+
+ It might happen that the provided function returns a coroutine but is not
+ a coroutine function per se. In this case, one can wrap the function with
+ ``aiostream.async_`` in order to force ``map`` to await the resulting
+ coroutine. The following example illustrates the use ``async_`` with a
+ lambda function::
+
+ from aiostream import stream, async_
+ ...
+ ys = stream.map(xs, async_(lambda ms: asyncio.sleep(ms / 1000)))
+ """
+ if asyncio.iscoroutinefunction(func):
+ return amap.raw(
+ source, func, *more_sources, ordered=ordered, task_limit=task_limit
+ )
+ sync_func = cast("SmapCallable[T, U]", func)
+ return smap.raw(source, sync_func, *more_sources)
+
+
+@pipable_operator
+def merge(
+ source: AsyncIterable[T], *more_sources: AsyncIterable[T]
+) -> AsyncIterator[T]:
+ """Merge several asynchronous sequences together.
+
+ All the sequences are iterated simultaneously and their elements
+ are forwarded as soon as they're available. The generation continues
+ until all the sequences are exhausted.
+ """
+ sources = [source, *more_sources]
+ source_stream: AsyncIterable[AsyncIterable[T]] = create.iterate.raw(sources)
+ return advanced.flatten.raw(source_stream)
+
+
+@pipable_operator
+def ziplatest(
+ source: AsyncIterable[T],
+ *more_sources: AsyncIterable[T],
+ partial: bool = True,
+ default: T | None = None,
+) -> AsyncIterator[tuple[T | None, ...]]:
+ """Combine several asynchronous sequences together, producing a tuple with
+ the lastest element of each sequence whenever a new element is received.
+
+ The value to use when a sequence has not procuded any element yet is given
+ by the ``default`` argument (defaulting to ``None``).
+
+ The producing of partial results can be disabled by setting the optional
+ argument ``partial`` to ``False``.
+
+ All the sequences are iterated simultaneously and their elements
+ are forwarded as soon as they're available. The generation continues
+ until all the sequences are exhausted.
+ """
+ sources = source, *more_sources
+ n = len(sources)
+
+ # Custom getter
+ def getter(dct: dict[int, T]) -> Callable[[int], T | None]:
+ return lambda key: dct.get(key, default)
+
+ # Add source index to the items
+ def make_func(i: int) -> SmapCallable[T, dict[int, T]]:
+ def func(x: T, *_: object) -> dict[int, T]:
+ return {i: x}
+
+ return func
+
+ new_sources = [smap.raw(source, make_func(i)) for i, source in enumerate(sources)]
+
+ # Merge the sources
+ merged = merge.raw(*new_sources)
+
+ # Accumulate the current state in a dict
+ accumulated = aggregate.accumulate.raw(merged, lambda x, e: {**x, **e})
+
+ # Filter partial result
+ filtered = (
+ accumulated
+ if partial
+ else select.filter.raw(accumulated, lambda x: len(x) == n)
+ )
+
+ # Convert the state dict to a tuple
+ def dict_to_tuple(x: dict[int, T], *_: object) -> tuple[T | None, ...]:
+ return tuple(builtins.map(getter(x), range(n)))
+
+ return smap.raw(filtered, dict_to_tuple)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/create.py b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py
new file mode 100644
index 00000000..d7676b71
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py
@@ -0,0 +1,191 @@
+"""Non-pipable creation operators."""
+from __future__ import annotations
+
+import sys
+import asyncio
+import inspect
+import builtins
+import itertools
+
+from typing import (
+ AsyncIterable,
+ Awaitable,
+ Iterable,
+ Protocol,
+ TypeVar,
+ AsyncIterator,
+ cast,
+)
+from typing_extensions import ParamSpec
+
+from ..stream import time
+from ..core import operator, streamcontext
+
+__all__ = [
+ "iterate",
+ "preserve",
+ "just",
+ "call",
+ "throw",
+ "empty",
+ "never",
+ "repeat",
+ "range",
+ "count",
+]
+
+T = TypeVar("T")
+P = ParamSpec("P")
+
+# Hack for python 3.8 compatibility
+if sys.version_info < (3, 9):
+ P = TypeVar("P")
+
+# Convert regular iterables
+
+
+@operator
+async def from_iterable(it: Iterable[T]) -> AsyncIterator[T]:
+ """Generate values from a regular iterable."""
+ for item in it:
+ await asyncio.sleep(0)
+ yield item
+
+
+@operator
+def from_async_iterable(ait: AsyncIterable[T]) -> AsyncIterator[T]:
+ """Generate values from an asynchronous iterable.
+
+ Note: the corresponding iterator will be explicitely closed
+ when leaving the context manager."""
+ return streamcontext(ait)
+
+
+@operator
+def iterate(it: AsyncIterable[T] | Iterable[T]) -> AsyncIterator[T]:
+ """Generate values from a sychronous or asynchronous iterable."""
+ if isinstance(it, AsyncIterable):
+ return from_async_iterable.raw(it)
+ if isinstance(it, Iterable):
+ return from_iterable.raw(it)
+ raise TypeError(f"{type(it).__name__!r} object is not (async) iterable")
+
+
+@operator
+async def preserve(ait: AsyncIterable[T]) -> AsyncIterator[T]:
+ """Generate values from an asynchronous iterable without
+ explicitly closing the corresponding iterator."""
+ async for item in ait:
+ yield item
+
+
+# Simple operators
+
+
+@operator
+async def just(value: T) -> AsyncIterator[T]:
+ """Await if possible, and generate a single value."""
+ if inspect.isawaitable(value):
+ yield await value
+ else:
+ yield value
+
+
+Y = TypeVar("Y", covariant=True)
+
+
+class SyncCallable(Protocol[P, Y]):
+ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Y:
+ ...
+
+
+class AsyncCallable(Protocol[P, Y]):
+ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[Y]:
+ ...
+
+
+@operator
+async def call(
+ func: SyncCallable[P, T] | AsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs
+) -> AsyncIterator[T]:
+ """Call the given function and generate a single value.
+
+ Await if the provided function is asynchronous.
+ """
+ if asyncio.iscoroutinefunction(func):
+ async_func = cast("AsyncCallable[P, T]", func)
+ yield await async_func(*args, **kwargs)
+ else:
+ sync_func = cast("SyncCallable[P, T]", func)
+ yield sync_func(*args, **kwargs)
+
+
+@operator
+async def throw(exc: Exception) -> AsyncIterator[None]:
+ """Throw an exception without generating any value."""
+ if False:
+ yield
+ raise exc
+
+
+@operator
+async def empty() -> AsyncIterator[None]:
+ """Terminate without generating any value."""
+ if False:
+ yield
+
+
+@operator
+async def never() -> AsyncIterator[None]:
+ """Hang forever without generating any value."""
+ if False:
+ yield
+ future: asyncio.Future[None] = asyncio.Future()
+ try:
+ await future
+ finally:
+ future.cancel()
+
+
+@operator
+def repeat(
+ value: T, times: int | None = None, *, interval: float = 0.0
+) -> AsyncIterator[T]:
+ """Generate the same value a given number of times.
+
+ If ``times`` is ``None``, the value is repeated indefinitely.
+ An optional interval can be given to space the values out.
+ """
+ args = () if times is None else (times,)
+ it = itertools.repeat(value, *args)
+ agen = from_iterable.raw(it)
+ return time.spaceout.raw(agen, interval) if interval else agen
+
+
+# Counting operators
+
+
+@operator
+def range(*args: int, interval: float = 0.0) -> AsyncIterator[int]:
+ """Generate a given range of numbers.
+
+ It supports the same arguments as the builtin function.
+ An optional interval can be given to space the values out.
+ """
+ agen = from_iterable.raw(builtins.range(*args))
+ return time.spaceout.raw(agen, interval) if interval else agen
+
+
+@operator
+def count(
+ start: int = 0, step: int = 1, *, interval: float = 0.0
+) -> AsyncIterator[int]:
+ """Generate consecutive numbers indefinitely.
+
+ Optional starting point and increment can be defined,
+ respectively defaulting to ``0`` and ``1``.
+
+ An optional interval can be given to space the values out.
+ """
+ agen = from_iterable.raw(itertools.count(start, step))
+ return time.spaceout.raw(agen, interval) if interval else agen
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
new file mode 100644
index 00000000..1be834e6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
@@ -0,0 +1,83 @@
+"""Extra operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+
+from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any
+
+from .combine import amap, smap
+from ..core import pipable_operator
+
+__all__ = ["action", "print"]
+
+
+T = TypeVar("T")
+
+
+@pipable_operator
+def action(
+ source: AsyncIterable[T],
+ func: Callable[[T], Awaitable[Any] | Any],
+ ordered: bool = True,
+ task_limit: int | None = None,
+) -> AsyncIterator[T]:
+ """Perform an action for each element of an asynchronous sequence
+ without modifying it.
+
+ The given function can be synchronous or asynchronous.
+
+ The results can either be returned in or out of order, depending on
+ the corresponding ``ordered`` argument. This argument is ignored if the
+ provided function is synchronous.
+
+ The coroutines run concurrently but their amount can be limited using
+ the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+ to run sequentially. This argument is ignored if the provided function
+ is synchronous.
+ """
+ if asyncio.iscoroutinefunction(func):
+
+ async def ainnerfunc(arg: T, *_: object) -> T:
+ awaitable = func(arg)
+ assert isinstance(awaitable, Awaitable)
+ await awaitable
+ return arg
+
+ return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit)
+
+ else:
+
+ def innerfunc(arg: T, *_: object) -> T:
+ func(arg)
+ return arg
+
+ return smap.raw(source, innerfunc)
+
+
+@pipable_operator
+def print(
+ source: AsyncIterable[T],
+ template: str = "{}",
+ sep: str = " ",
+ end: str = "\n",
+ file: Any | None = None,
+ flush: bool = False,
+) -> AsyncIterator[T]:
+ """Print each element of an asynchronous sequence without modifying it.
+
+ An optional template can be provided to be formatted with the elements.
+ All the keyword arguments are forwarded to the builtin function print.
+ """
+
+ def func(value: T) -> None:
+ string = template.format(value)
+ builtins.print(
+ string,
+ sep=sep,
+ end=end,
+ file=file,
+ flush=flush,
+ )
+
+ return action.raw(source, func)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/select.py b/.venv/lib/python3.12/site-packages/aiostream/stream/select.py
new file mode 100644
index 00000000..9390f464
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/select.py
@@ -0,0 +1,284 @@
+"""Selection operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+import collections
+
+from typing import Awaitable, Callable, TypeVar, AsyncIterable, AsyncIterator
+
+from . import transform
+from ..aiter_utils import aiter, anext
+from ..core import streamcontext, pipable_operator
+
+__all__ = [
+ "take",
+ "takelast",
+ "skip",
+ "skiplast",
+ "getitem",
+ "filter",
+ "until",
+ "dropwhile",
+ "takewhile",
+]
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def take(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+ """Forward the first ``n`` elements from an asynchronous sequence.
+
+ If ``n`` is negative, it simply terminates before iterating the source.
+ """
+ enumerated = transform.enumerate.raw(source)
+ async with streamcontext(enumerated) as streamer:
+ if n <= 0:
+ return
+ async for i, item in streamer:
+ yield item
+ if i >= n - 1:
+ return
+
+
+@pipable_operator
+async def takelast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+ """Forward the last ``n`` elements from an asynchronous sequence.
+
+ If ``n`` is negative, it simply terminates after iterating the source.
+
+ Note: it is required to reach the end of the source before the first
+ element is generated.
+ """
+ queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ queue.append(item)
+ for item in queue:
+ yield item
+
+
+@pipable_operator
+async def skip(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+ """Forward an asynchronous sequence, skipping the first ``n`` elements.
+
+ If ``n`` is negative, no elements are skipped.
+ """
+ enumerated = transform.enumerate.raw(source)
+ async with streamcontext(enumerated) as streamer:
+ async for i, item in streamer:
+ if i >= n:
+ yield item
+
+
+@pipable_operator
+async def skiplast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]:
+ """Forward an asynchronous sequence, skipping the last ``n`` elements.
+
+ If ``n`` is negative, no elements are skipped.
+
+ Note: it is required to reach the ``n+1`` th element of the source
+ before the first element is generated.
+ """
+ queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ if n <= 0:
+ yield item
+ continue
+ if len(queue) == n:
+ yield queue[0]
+ queue.append(item)
+
+
+@pipable_operator
+async def filterindex(
+ source: AsyncIterable[T], func: Callable[[int], bool]
+) -> AsyncIterator[T]:
+ """Filter an asynchronous sequence using the index of the elements.
+
+ The given function is synchronous, takes the index as an argument,
+ and returns ``True`` if the corresponding should be forwarded,
+ ``False`` otherwise.
+ """
+ enumerated = transform.enumerate.raw(source)
+ async with streamcontext(enumerated) as streamer:
+ async for i, item in streamer:
+ if func(i):
+ yield item
+
+
+@pipable_operator
+def slice(source: AsyncIterable[T], *args: int) -> AsyncIterator[T]:
+ """Slice an asynchronous sequence.
+
+ The arguments are the same as the builtin type slice.
+
+ There are two limitations compare to regular slices:
+ - Positive stop index with negative start index is not supported
+ - Negative step is not supported
+ """
+ s = builtins.slice(*args)
+ start, stop, step = s.start or 0, s.stop, s.step or 1
+ aiterator = aiter(source)
+ # Filter the first items
+ if start < 0:
+ aiterator = takelast.raw(aiterator, abs(start))
+ elif start > 0:
+ aiterator = skip.raw(aiterator, start)
+ # Filter the last items
+ if stop is not None:
+ if stop >= 0 and start < 0:
+ raise ValueError("Positive stop with negative start is not supported")
+ elif stop >= 0:
+ aiterator = take.raw(aiterator, stop - start)
+ else:
+ aiterator = skiplast.raw(aiterator, abs(stop))
+ # Filter step items
+ if step is not None:
+ if step > 1:
+ aiterator = filterindex.raw(aiterator, lambda i: i % step == 0)
+ elif step < 0:
+ raise ValueError("Negative step not supported")
+ # Return
+ return aiterator
+
+
+@pipable_operator
+async def item(source: AsyncIterable[T], index: int) -> AsyncIterator[T]:
+ """Forward the ``n``th element of an asynchronous sequence.
+
+ The index can be negative and works like regular indexing.
+ If the index is out of range, and ``IndexError`` is raised.
+ """
+ # Prepare
+ if index >= 0:
+ source = skip.raw(source, index)
+ else:
+ source = takelast(source, abs(index))
+ async with streamcontext(source) as streamer:
+ # Get first item
+ try:
+ result = await anext(streamer)
+ except StopAsyncIteration:
+ raise IndexError("Index out of range")
+ # Check length
+ if index < 0:
+ count = 1
+ async for _ in streamer:
+ count += 1
+ if count != abs(index):
+ raise IndexError("Index out of range")
+ # Yield result
+ yield result
+
+
+@pipable_operator
+def getitem(source: AsyncIterable[T], index: int | builtins.slice) -> AsyncIterator[T]:
+ """Forward one or several items from an asynchronous sequence.
+
+ The argument can either be a slice or an integer.
+ See the slice and item operators for more information.
+ """
+ if isinstance(index, builtins.slice):
+ return slice.raw(source, index.start, index.stop, index.step)
+ if isinstance(index, int):
+ return item.raw(source, index)
+ raise TypeError("Not a valid index (int or slice)")
+
+
+@pipable_operator
+async def filter(
+ source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+ """Filter an asynchronous sequence using an arbitrary function.
+
+ The function takes the item as an argument and returns ``True``
+ if it should be forwarded, ``False`` otherwise.
+ The function can either be synchronous or asynchronous.
+ """
+ iscorofunc = asyncio.iscoroutinefunction(func)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ result = func(item)
+ if iscorofunc:
+ assert isinstance(result, Awaitable)
+ result = await result
+ if result:
+ yield item
+
+
+@pipable_operator
+async def until(
+ source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+ """Forward an asynchronous sequence until a condition is met.
+
+ Contrary to the ``takewhile`` operator, the last tested element is included
+ in the sequence.
+
+ The given function takes the item as an argument and returns a boolean
+ corresponding to the condition to meet. The function can either be
+ synchronous or asynchronous.
+ """
+ iscorofunc = asyncio.iscoroutinefunction(func)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ result = func(item)
+ if iscorofunc:
+ assert isinstance(result, Awaitable)
+ result = await result
+ yield item
+ if result:
+ return
+
+
+@pipable_operator
+async def takewhile(
+ source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+ """Forward an asynchronous sequence while a condition is met.
+
+ Contrary to the ``until`` operator, the last tested element is not included
+ in the sequence.
+
+ The given function takes the item as an argument and returns a boolean
+ corresponding to the condition to meet. The function can either be
+ synchronous or asynchronous.
+ """
+ iscorofunc = asyncio.iscoroutinefunction(func)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ result = func(item)
+ if iscorofunc:
+ assert isinstance(result, Awaitable)
+ result = await result
+ if not result:
+ return
+ yield item
+
+
+@pipable_operator
+async def dropwhile(
+ source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]]
+) -> AsyncIterator[T]:
+ """Discard the elements from an asynchronous sequence
+ while a condition is met.
+
+ The given function takes the item as an argument and returns a boolean
+ corresponding to the condition to meet. The function can either be
+ synchronous or asynchronous.
+ """
+ iscorofunc = asyncio.iscoroutinefunction(func)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ result = func(item)
+ if iscorofunc:
+ assert isinstance(result, Awaitable)
+ result = await result
+ if not result:
+ yield item
+ break
+ async for item in streamer:
+ yield item
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/time.py b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
new file mode 100644
index 00000000..c9c0df88
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
@@ -0,0 +1,56 @@
+"""Time-specific operators."""
+from __future__ import annotations
+import asyncio
+
+from ..aiter_utils import anext
+from ..core import streamcontext, pipable_operator
+
+from typing import TypeVar, AsyncIterable, AsyncIterator
+
+__all__ = ["spaceout", "delay", "timeout"]
+
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]:
+ """Make sure the elements of an asynchronous sequence are separated
+ in time by the given interval.
+ """
+ timeout = 0.0
+ loop = asyncio.get_event_loop()
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ delta = timeout - loop.time()
+ delay = delta if delta > 0 else 0.0
+ await asyncio.sleep(delay)
+ yield item
+ timeout = loop.time() + interval
+
+
+@pipable_operator
+async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]:
+ """Raise a time-out if an element of the asynchronous sequence
+ takes too long to arrive.
+
+ Note: the timeout is not global but specific to each step of
+ the iteration.
+ """
+ async with streamcontext(source) as streamer:
+ while True:
+ try:
+ item = await asyncio.wait_for(anext(streamer), timeout)
+ except StopAsyncIteration:
+ break
+ else:
+ yield item
+
+
+@pipable_operator
+async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]:
+ """Delay the iteration of an asynchronous sequence."""
+ await asyncio.sleep(delay)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield item
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py b/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py
new file mode 100644
index 00000000..f11bffa6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py
@@ -0,0 +1,128 @@
+"""Transformation operators."""
+
+from __future__ import annotations
+
+import asyncio
+import itertools
+from typing import (
+ Protocol,
+ TypeVar,
+ AsyncIterable,
+ AsyncIterator,
+ Awaitable,
+ cast,
+)
+
+from ..core import streamcontext, pipable_operator
+
+from . import select
+from . import create
+from . import aggregate
+from .combine import map, amap, smap
+
+__all__ = ["map", "enumerate", "starmap", "cycle", "chunks"]
+
+# map, amap and smap are also transform operators
+map, amap, smap
+
+T = TypeVar("T")
+U = TypeVar("U")
+
+
+@pipable_operator
+async def enumerate(
+ source: AsyncIterable[T], start: int = 0, step: int = 1
+) -> AsyncIterator[tuple[int, T]]:
+ """Generate ``(index, value)`` tuples from an asynchronous sequence.
+
+ This index is computed using a starting point and an increment,
+ respectively defaulting to ``0`` and ``1``.
+ """
+ count = itertools.count(start, step)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield next(count), item
+
+
+X = TypeVar("X", contravariant=True)
+Y = TypeVar("Y", covariant=True)
+
+
+class AsyncStarmapCallable(Protocol[X, Y]):
+ def __call__(self, arg: X, /, *args: X) -> Awaitable[Y]:
+ ...
+
+
+class SyncStarmapCallable(Protocol[X, Y]):
+ def __call__(self, arg: X, /, *args: X) -> Y:
+ ...
+
+
+@pipable_operator
+def starmap(
+ source: AsyncIterable[tuple[T, ...]],
+ func: SyncStarmapCallable[T, U] | AsyncStarmapCallable[T, U],
+ ordered: bool = True,
+ task_limit: int | None = None,
+) -> AsyncIterator[U]:
+ """Apply a given function to the unpacked elements of
+ an asynchronous sequence.
+
+ Each element is unpacked before applying the function.
+ The given function can either be synchronous or asynchronous.
+
+ The results can either be returned in or out of order, depending on
+ the corresponding ``ordered`` argument. This argument is ignored if
+ the provided function is synchronous.
+
+ The coroutines run concurrently but their amount can be limited using
+ the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+ to run sequentially. This argument is ignored if the provided function
+ is synchronous.
+ """
+ if asyncio.iscoroutinefunction(func):
+ async_func = cast("AsyncStarmapCallable[T, U]", func)
+
+ async def astarfunc(args: tuple[T, ...], *_: object) -> U:
+ awaitable = async_func(*args)
+ return await awaitable
+
+ return amap.raw(source, astarfunc, ordered=ordered, task_limit=task_limit)
+
+ else:
+ sync_func = cast("SyncStarmapCallable[T, U]", func)
+
+ def starfunc(args: tuple[T, ...], *_: object) -> U:
+ return sync_func(*args)
+
+ return smap.raw(source, starfunc)
+
+
+@pipable_operator
+async def cycle(source: AsyncIterable[T]) -> AsyncIterator[T]:
+ """Iterate indefinitely over an asynchronous sequence.
+
+ Note: it does not perform any buffering, but re-iterate over
+ the same given sequence instead. If the sequence is not
+ re-iterable, the generator might end up looping indefinitely
+ without yielding any item.
+ """
+ while True:
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield item
+ # Prevent blocking while loop if the stream is empty
+ await asyncio.sleep(0)
+
+
+@pipable_operator
+async def chunks(source: AsyncIterable[T], n: int) -> AsyncIterator[list[T]]:
+ """Generate chunks of size ``n`` from an asynchronous sequence.
+
+ The chunks are lists, and the last chunk might contain less than ``n``
+ elements.
+ """
+ async with streamcontext(source) as streamer:
+ async for first in streamer:
+ xs = select.take(create.preserve(streamer), n - 1)
+ yield [first] + await aggregate.list(xs)
diff --git a/.venv/lib/python3.12/site-packages/aiostream/test_utils.py b/.venv/lib/python3.12/site-packages/aiostream/test_utils.py
new file mode 100644
index 00000000..56761d2e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/test_utils.py
@@ -0,0 +1,173 @@
+"""Utilities for testing stream operators."""
+from __future__ import annotations
+
+import asyncio
+from unittest.mock import Mock
+from contextlib import contextmanager
+
+import pytest
+
+from .core import StreamEmpty, streamcontext, pipable_operator
+from typing import TYPE_CHECKING, Any, Callable, List
+
+if TYPE_CHECKING:
+ from _pytest.fixtures import SubRequest
+ from aiostream.core import Stream
+
+__all__ = ["add_resource", "assert_run", "event_loop"]
+
+
+@pipable_operator
+async def add_resource(source, cleanup_time):
+ """Simulate an open resource in a stream operator."""
+ try:
+ loop = asyncio.get_event_loop()
+ loop.open_resources += 1
+ loop.resources += 1
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield item
+ finally:
+ try:
+ await asyncio.sleep(cleanup_time)
+ finally:
+ loop.open_resources -= 1
+
+
+def compare_exceptions(
+ exc1: Exception,
+ exc2: Exception,
+) -> bool:
+ """Compare two exceptions together."""
+ return exc1 == exc2 or exc1.__class__ == exc2.__class__ and exc1.args == exc2.args
+
+
+async def assert_aiter(
+ source: Stream,
+ values: List[Any],
+ exception: Exception | None = None,
+) -> None:
+ """Check the results of a stream using a streamcontext."""
+ results = []
+ exception_type = (type(exception),) if exception else ()
+ try:
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ results.append(item)
+ except exception_type as exc:
+ assert exception is not None
+ assert compare_exceptions(exc, exception)
+ else:
+ assert exception is None
+ assert results == values
+
+
+async def assert_await(
+ source: Stream,
+ values: List[Any],
+ exception: Exception | None = None,
+) -> None:
+ """Check the results of a stream using by awaiting it."""
+ exception_type = (type(exception),) if exception else ()
+ try:
+ result = await source
+ except StreamEmpty:
+ assert values == []
+ assert exception is None
+ except exception_type as exc:
+ assert exception is not None
+ assert compare_exceptions(exc, exception)
+ else:
+ assert result == values[-1]
+ assert exception is None
+
+
+@pytest.fixture(params=[assert_aiter, assert_await], ids=["aiter", "await"])
+def assert_run(request: SubRequest) -> Callable:
+ """Parametrized fixture returning a stream runner."""
+ return request.param
+
+
+@pytest.fixture
+def event_loop():
+ """Fixture providing a test event loop.
+
+ The event loop simulate and records the sleep operation,
+ available as event_loop.steps
+
+ It also tracks simulated resources and make sure they are
+ all released before the loop is closed.
+ """
+
+ class TimeTrackingTestLoop(asyncio.BaseEventLoop):
+ stuck_threshold = 100
+
+ def __init__(self):
+ super().__init__()
+ self._time = 0
+ self._timers = []
+ self._selector = Mock()
+ self.clear()
+
+ # Loop internals
+
+ def _run_once(self):
+ super()._run_once()
+ # Update internals
+ self.busy_count += 1
+ self._timers = sorted(when for when in self._timers if when > loop.time())
+ # Time advance
+ if self.time_to_go:
+ when = self._timers.pop(0)
+ step = when - loop.time()
+ self.steps.append(step)
+ self.advance_time(step)
+ self.busy_count = 0
+
+ def _process_events(self, event_list):
+ return
+
+ def _write_to_self(self):
+ return
+
+ # Time management
+
+ def time(self):
+ return self._time
+
+ def advance_time(self, advance):
+ if advance:
+ self._time += advance
+
+ def call_at(self, when, callback, *args, **kwargs):
+ self._timers.append(when)
+ return super().call_at(when, callback, *args, **kwargs)
+
+ @property
+ def stuck(self):
+ return self.busy_count > self.stuck_threshold
+
+ @property
+ def time_to_go(self):
+ return self._timers and (self.stuck or not self._ready)
+
+ # Resource management
+
+ def clear(self):
+ self.steps = []
+ self.open_resources = 0
+ self.resources = 0
+ self.busy_count = 0
+
+ @contextmanager
+ def assert_cleanup(self):
+ self.clear()
+ yield self
+ assert self.open_resources == 0
+ self.clear()
+
+ loop = TimeTrackingTestLoop()
+ asyncio.set_event_loop(loop)
+ with loop.assert_cleanup():
+ yield loop
+ loop.close()