diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiostream/stream')
9 files changed, 1346 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py b/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py new file mode 100644 index 00000000..cd20d6c9 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py @@ -0,0 +1,10 @@ +"""Gather all the stream operators.""" + +from .create import * +from .transform import * +from .select import * +from .combine import * +from .aggregate import * +from .time import * +from .misc import * +from .advanced import * diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py b/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py new file mode 100644 index 00000000..106f0f1d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py @@ -0,0 +1,222 @@ +"""Advanced operators (to deal with streams of higher order) .""" +from __future__ import annotations + +from typing import AsyncIterator, AsyncIterable, TypeVar, Union, cast +from typing_extensions import ParamSpec + +from . import combine + +from ..core import Streamer, pipable_operator +from ..manager import StreamerManager + + +__all__ = ["concat", "flatten", "switch", "concatmap", "flatmap", "switchmap"] + + +T = TypeVar("T") +U = TypeVar("U") +P = ParamSpec("P") + + +# Helper to manage stream of higher order + + +@pipable_operator +async def base_combine( + source: AsyncIterable[AsyncIterable[T]], + switch: bool = False, + ordered: bool = False, + task_limit: int | None = None, +) -> AsyncIterator[T]: + """Base operator for managing an asynchronous sequence of sequences. + + The sequences are awaited concurrently, although it's possible to limit + the amount of running sequences using the `task_limit` argument. + + The ``switch`` argument enables the switch mecanism, which cause the + previous subsequence to be discarded when a new one is created. + + The items can either be generated in order or as soon as they're received, + depending on the ``ordered`` argument. + """ + + # Task limit + if task_limit is not None and not task_limit > 0: + raise ValueError("The task limit must be None or greater than 0") + + # Safe context + async with StreamerManager[Union[AsyncIterable[T], T]]() as manager: + main_streamer: Streamer[ + AsyncIterable[T] | T + ] | None = await manager.enter_and_create_task(source) + + # Loop over events + while manager.tasks: + # Extract streamer groups + substreamers = manager.streamers[1:] + mainstreamers = [main_streamer] if main_streamer in manager.tasks else [] + + # Switch - use the main streamer then the substreamer + if switch: + filters = mainstreamers + substreamers + # Concat - use the first substreamer then the main streamer + elif ordered: + filters = substreamers[:1] + mainstreamers + # Flat - use the substreamers then the main streamer + else: + filters = substreamers + mainstreamers + + # Wait for next event + streamer, task = await manager.wait_single_event(filters) + + # Get result + try: + result = task.result() + + # End of stream + except StopAsyncIteration: + # Main streamer is finished + if streamer is main_streamer: + main_streamer = None + + # A substreamer is finished + else: + await manager.clean_streamer(streamer) + + # Re-schedule the main streamer if necessary + if main_streamer is not None and main_streamer not in manager.tasks: + manager.create_task(main_streamer) + + # Process result + else: + # Switch mecanism + if switch and streamer is main_streamer: + await manager.clean_streamers(substreamers) + + # Setup a new source + if streamer is main_streamer: + assert isinstance(result, AsyncIterable) + await manager.enter_and_create_task(result) + + # Re-schedule the main streamer if task limit allows it + if task_limit is None or task_limit > len(manager.tasks): + manager.create_task(streamer) + + # Yield the result + else: + item = cast("T", result) + yield item + + # Re-schedule the streamer + manager.create_task(streamer) + + +# Advanced operators (for streams of higher order) + + +@pipable_operator +def concat( + source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None +) -> AsyncIterator[T]: + """Given an asynchronous sequence of sequences, generate the elements + of the sequences in order. + + The sequences are awaited concurrently, although it's possible to limit + the amount of running sequences using the `task_limit` argument. + + Errors raised in the source or an element sequence are propagated. + """ + return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=True) + + +@pipable_operator +def flatten( + source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None +) -> AsyncIterator[T]: + """Given an asynchronous sequence of sequences, generate the elements + of the sequences as soon as they're received. + + The sequences are awaited concurrently, although it's possible to limit + the amount of running sequences using the `task_limit` argument. + + Errors raised in the source or an element sequence are propagated. + """ + return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=False) + + +@pipable_operator +def switch(source: AsyncIterable[AsyncIterable[T]]) -> AsyncIterator[T]: + """Given an asynchronous sequence of sequences, generate the elements of + the most recent sequence. + + Element sequences are generated eagerly, and closed once they are + superseded by a more recent sequence. Once the main sequence is finished, + the last subsequence will be exhausted completely. + + Errors raised in the source or an element sequence (that was not already + closed) are propagated. + """ + return base_combine.raw(source, switch=True) + + +# Advanced *-map operators + + +@pipable_operator +def concatmap( + source: AsyncIterable[T], + func: combine.SmapCallable[T, AsyncIterable[U]], + *more_sources: AsyncIterable[T], + task_limit: int | None = None, +) -> AsyncIterator[U]: + """Apply a given function that creates a sequence from the elements of one + or several asynchronous sequences, and generate the elements of the created + sequences in order. + + The function is applied as described in `map`, and must return an + asynchronous sequence. The returned sequences are awaited concurrently, + although it's possible to limit the amount of running sequences using + the `task_limit` argument. + """ + mapped = combine.smap.raw(source, func, *more_sources) + return concat.raw(mapped, task_limit=task_limit) + + +@pipable_operator +def flatmap( + source: AsyncIterable[T], + func: combine.SmapCallable[T, AsyncIterable[U]], + *more_sources: AsyncIterable[T], + task_limit: int | None = None, +) -> AsyncIterator[U]: + """Apply a given function that creates a sequence from the elements of one + or several asynchronous sequences, and generate the elements of the created + sequences as soon as they arrive. + + The function is applied as described in `map`, and must return an + asynchronous sequence. The returned sequences are awaited concurrently, + although it's possible to limit the amount of running sequences using + the `task_limit` argument. + + Errors raised in a source or output sequence are propagated. + """ + mapped = combine.smap.raw(source, func, *more_sources) + return flatten.raw(mapped, task_limit=task_limit) + + +@pipable_operator +def switchmap( + source: AsyncIterable[T], + func: combine.SmapCallable[T, AsyncIterable[U]], + *more_sources: AsyncIterable[T], +) -> AsyncIterator[U]: + """Apply a given function that creates a sequence from the elements of one + or several asynchronous sequences and generate the elements of the most + recently created sequence. + + The function is applied as described in `map`, and must return an + asynchronous sequence. Errors raised in a source or output sequence (that + was not already closed) are propagated. + """ + mapped = combine.smap.raw(source, func, *more_sources) + return switch.raw(mapped) diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py b/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py new file mode 100644 index 00000000..8ed7c0e1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py @@ -0,0 +1,90 @@ +"""Aggregation operators.""" +from __future__ import annotations + +import asyncio +import builtins +import operator as op +from typing import AsyncIterator, Awaitable, Callable, TypeVar, AsyncIterable, cast + + +from . import select +from ..aiter_utils import anext +from ..core import pipable_operator, streamcontext + +__all__ = ["accumulate", "reduce", "list"] + +T = TypeVar("T") + + +@pipable_operator +async def accumulate( + source: AsyncIterable[T], + func: Callable[[T, T], Awaitable[T] | T] = op.add, + initializer: T | None = None, +) -> AsyncIterator[T]: + """Generate a series of accumulated sums (or other binary function) + from an asynchronous sequence. + + If ``initializer`` is present, it is placed before the items + of the sequence in the calculation, and serves as a default + when the sequence is empty. + """ + iscorofunc = asyncio.iscoroutinefunction(func) + async with streamcontext(source) as streamer: + # Initialize + if initializer is None: + try: + value = await anext(streamer) + except StopAsyncIteration: + return + else: + value = initializer + # First value + yield value + # Iterate streamer + async for item in streamer: + returned = func(value, item) + if iscorofunc: + awaitable_value = cast("Awaitable[T]", returned) + value = await awaitable_value + else: + value = cast("T", returned) + yield value + + +@pipable_operator +def reduce( + source: AsyncIterable[T], + func: Callable[[T, T], Awaitable[T] | T], + initializer: T | None = None, +) -> AsyncIterator[T]: + """Apply a function of two arguments cumulatively to the items + of an asynchronous sequence, reducing the sequence to a single value. + + If ``initializer`` is present, it is placed before the items + of the sequence in the calculation, and serves as a default when the + sequence is empty. + """ + acc = accumulate.raw(source, func, initializer) + return select.item.raw(acc, -1) + + +@pipable_operator +async def list(source: AsyncIterable[T]) -> AsyncIterator[builtins.list[T]]: + """Build a list from an asynchronous sequence. + + All the intermediate steps are generated, starting from the empty list. + + This operator can be used to easily convert a stream into a list:: + + lst = await stream.list(x) + + ..note:: The same list object is produced at each step in order to avoid + memory copies. + """ + result: builtins.list[T] = [] + yield result + async with streamcontext(source) as streamer: + async for item in streamer: + result.append(item) + yield result diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py b/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py new file mode 100644 index 00000000..a782a730 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py @@ -0,0 +1,282 @@ +"""Combination operators.""" +from __future__ import annotations + +import asyncio +import builtins + +from typing import ( + Awaitable, + Protocol, + TypeVar, + AsyncIterable, + AsyncIterator, + Callable, + cast, +) +from typing_extensions import ParamSpec + +from ..aiter_utils import AsyncExitStack, anext +from ..core import streamcontext, pipable_operator + +from . import create +from . import select +from . import advanced +from . import aggregate + +__all__ = ["chain", "zip", "map", "merge", "ziplatest", "amap", "smap"] + +T = TypeVar("T") +U = TypeVar("U") +K = TypeVar("K") +P = ParamSpec("P") + + +@pipable_operator +async def chain( + source: AsyncIterable[T], *more_sources: AsyncIterable[T] +) -> AsyncIterator[T]: + """Chain asynchronous sequences together, in the order they are given. + + Note: the sequences are not iterated until it is required, + so if the operation is interrupted, the remaining sequences + will be left untouched. + """ + sources = source, *more_sources + for source in sources: + async with streamcontext(source) as streamer: + async for item in streamer: + yield item + + +@pipable_operator +async def zip( + source: AsyncIterable[T], *more_sources: AsyncIterable[T] +) -> AsyncIterator[tuple[T, ...]]: + """Combine and forward the elements of several asynchronous sequences. + + Each generated value is a tuple of elements, using the same order as + their respective sources. The generation continues until the shortest + sequence is exhausted. + + Note: the different sequences are awaited in parrallel, so that their + waiting times don't add up. + """ + sources = source, *more_sources + + # One sources + if len(sources) == 1: + (source,) = sources + async with streamcontext(source) as streamer: + async for item in streamer: + yield (item,) + return + + # N sources + async with AsyncExitStack() as stack: + # Handle resources + streamers = [ + await stack.enter_async_context(streamcontext(source)) for source in sources + ] + # Loop over items + while True: + try: + coros = builtins.map(anext, streamers) + items = await asyncio.gather(*coros) + except StopAsyncIteration: + break + else: + yield tuple(items) + + +X = TypeVar("X", contravariant=True) +Y = TypeVar("Y", covariant=True) + + +class SmapCallable(Protocol[X, Y]): + def __call__(self, arg: X, /, *args: X) -> Y: + ... + + +class AmapCallable(Protocol[X, Y]): + async def __call__(self, arg: X, /, *args: X) -> Y: + ... + + +class MapCallable(Protocol[X, Y]): + def __call__(self, arg: X, /, *args: X) -> Awaitable[Y] | Y: + ... + + +@pipable_operator +async def smap( + source: AsyncIterable[T], + func: SmapCallable[T, U], + *more_sources: AsyncIterable[T], +) -> AsyncIterator[U]: + """Apply a given function to the elements of one or several + asynchronous sequences. + + Each element is used as a positional argument, using the same order as + their respective sources. The generation continues until the shortest + sequence is exhausted. The function is treated synchronously. + + Note: if more than one sequence is provided, they're awaited concurrently + so that their waiting times don't add up. + """ + stream = zip(source, *more_sources) + async with streamcontext(stream) as streamer: + async for item in streamer: + yield func(*item) + + +@pipable_operator +def amap( + source: AsyncIterable[T], + corofn: AmapCallable[T, U], + *more_sources: AsyncIterable[T], + ordered: bool = True, + task_limit: int | None = None, +) -> AsyncIterator[U]: + """Apply a given coroutine function to the elements of one or several + asynchronous sequences. + + Each element is used as a positional argument, using the same order as + their respective sources. The generation continues until the shortest + sequence is exhausted. + + The results can either be returned in or out of order, depending on + the corresponding ``ordered`` argument. + + The coroutines run concurrently but their amount can be limited using + the ``task_limit`` argument. A value of ``1`` will cause the coroutines + to run sequentially. + + If more than one sequence is provided, they're also awaited concurrently, + so that their waiting times don't add up. + """ + + async def func(arg: T, *args: T) -> AsyncIterable[U]: + yield await corofn(arg, *args) + + if ordered: + return advanced.concatmap.raw( + source, func, *more_sources, task_limit=task_limit + ) + return advanced.flatmap.raw(source, func, *more_sources, task_limit=task_limit) + + +@pipable_operator +def map( + source: AsyncIterable[T], + func: MapCallable[T, U], + *more_sources: AsyncIterable[T], + ordered: bool = True, + task_limit: int | None = None, +) -> AsyncIterator[U]: + """Apply a given function to the elements of one or several + asynchronous sequences. + + Each element is used as a positional argument, using the same order as + their respective sources. The generation continues until the shortest + sequence is exhausted. The function can either be synchronous or + asynchronous (coroutine function). + + The results can either be returned in or out of order, depending on + the corresponding ``ordered`` argument. This argument is ignored if the + provided function is synchronous. + + The coroutines run concurrently but their amount can be limited using + the ``task_limit`` argument. A value of ``1`` will cause the coroutines + to run sequentially. This argument is ignored if the provided function + is synchronous. + + If more than one sequence is provided, they're also awaited concurrently, + so that their waiting times don't add up. + + It might happen that the provided function returns a coroutine but is not + a coroutine function per se. In this case, one can wrap the function with + ``aiostream.async_`` in order to force ``map`` to await the resulting + coroutine. The following example illustrates the use ``async_`` with a + lambda function:: + + from aiostream import stream, async_ + ... + ys = stream.map(xs, async_(lambda ms: asyncio.sleep(ms / 1000))) + """ + if asyncio.iscoroutinefunction(func): + return amap.raw( + source, func, *more_sources, ordered=ordered, task_limit=task_limit + ) + sync_func = cast("SmapCallable[T, U]", func) + return smap.raw(source, sync_func, *more_sources) + + +@pipable_operator +def merge( + source: AsyncIterable[T], *more_sources: AsyncIterable[T] +) -> AsyncIterator[T]: + """Merge several asynchronous sequences together. + + All the sequences are iterated simultaneously and their elements + are forwarded as soon as they're available. The generation continues + until all the sequences are exhausted. + """ + sources = [source, *more_sources] + source_stream: AsyncIterable[AsyncIterable[T]] = create.iterate.raw(sources) + return advanced.flatten.raw(source_stream) + + +@pipable_operator +def ziplatest( + source: AsyncIterable[T], + *more_sources: AsyncIterable[T], + partial: bool = True, + default: T | None = None, +) -> AsyncIterator[tuple[T | None, ...]]: + """Combine several asynchronous sequences together, producing a tuple with + the lastest element of each sequence whenever a new element is received. + + The value to use when a sequence has not procuded any element yet is given + by the ``default`` argument (defaulting to ``None``). + + The producing of partial results can be disabled by setting the optional + argument ``partial`` to ``False``. + + All the sequences are iterated simultaneously and their elements + are forwarded as soon as they're available. The generation continues + until all the sequences are exhausted. + """ + sources = source, *more_sources + n = len(sources) + + # Custom getter + def getter(dct: dict[int, T]) -> Callable[[int], T | None]: + return lambda key: dct.get(key, default) + + # Add source index to the items + def make_func(i: int) -> SmapCallable[T, dict[int, T]]: + def func(x: T, *_: object) -> dict[int, T]: + return {i: x} + + return func + + new_sources = [smap.raw(source, make_func(i)) for i, source in enumerate(sources)] + + # Merge the sources + merged = merge.raw(*new_sources) + + # Accumulate the current state in a dict + accumulated = aggregate.accumulate.raw(merged, lambda x, e: {**x, **e}) + + # Filter partial result + filtered = ( + accumulated + if partial + else select.filter.raw(accumulated, lambda x: len(x) == n) + ) + + # Convert the state dict to a tuple + def dict_to_tuple(x: dict[int, T], *_: object) -> tuple[T | None, ...]: + return tuple(builtins.map(getter(x), range(n))) + + return smap.raw(filtered, dict_to_tuple) diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/create.py b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py new file mode 100644 index 00000000..d7676b71 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py @@ -0,0 +1,191 @@ +"""Non-pipable creation operators.""" +from __future__ import annotations + +import sys +import asyncio +import inspect +import builtins +import itertools + +from typing import ( + AsyncIterable, + Awaitable, + Iterable, + Protocol, + TypeVar, + AsyncIterator, + cast, +) +from typing_extensions import ParamSpec + +from ..stream import time +from ..core import operator, streamcontext + +__all__ = [ + "iterate", + "preserve", + "just", + "call", + "throw", + "empty", + "never", + "repeat", + "range", + "count", +] + +T = TypeVar("T") +P = ParamSpec("P") + +# Hack for python 3.8 compatibility +if sys.version_info < (3, 9): + P = TypeVar("P") + +# Convert regular iterables + + +@operator +async def from_iterable(it: Iterable[T]) -> AsyncIterator[T]: + """Generate values from a regular iterable.""" + for item in it: + await asyncio.sleep(0) + yield item + + +@operator +def from_async_iterable(ait: AsyncIterable[T]) -> AsyncIterator[T]: + """Generate values from an asynchronous iterable. + + Note: the corresponding iterator will be explicitely closed + when leaving the context manager.""" + return streamcontext(ait) + + +@operator +def iterate(it: AsyncIterable[T] | Iterable[T]) -> AsyncIterator[T]: + """Generate values from a sychronous or asynchronous iterable.""" + if isinstance(it, AsyncIterable): + return from_async_iterable.raw(it) + if isinstance(it, Iterable): + return from_iterable.raw(it) + raise TypeError(f"{type(it).__name__!r} object is not (async) iterable") + + +@operator +async def preserve(ait: AsyncIterable[T]) -> AsyncIterator[T]: + """Generate values from an asynchronous iterable without + explicitly closing the corresponding iterator.""" + async for item in ait: + yield item + + +# Simple operators + + +@operator +async def just(value: T) -> AsyncIterator[T]: + """Await if possible, and generate a single value.""" + if inspect.isawaitable(value): + yield await value + else: + yield value + + +Y = TypeVar("Y", covariant=True) + + +class SyncCallable(Protocol[P, Y]): + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Y: + ... + + +class AsyncCallable(Protocol[P, Y]): + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[Y]: + ... + + +@operator +async def call( + func: SyncCallable[P, T] | AsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs +) -> AsyncIterator[T]: + """Call the given function and generate a single value. + + Await if the provided function is asynchronous. + """ + if asyncio.iscoroutinefunction(func): + async_func = cast("AsyncCallable[P, T]", func) + yield await async_func(*args, **kwargs) + else: + sync_func = cast("SyncCallable[P, T]", func) + yield sync_func(*args, **kwargs) + + +@operator +async def throw(exc: Exception) -> AsyncIterator[None]: + """Throw an exception without generating any value.""" + if False: + yield + raise exc + + +@operator +async def empty() -> AsyncIterator[None]: + """Terminate without generating any value.""" + if False: + yield + + +@operator +async def never() -> AsyncIterator[None]: + """Hang forever without generating any value.""" + if False: + yield + future: asyncio.Future[None] = asyncio.Future() + try: + await future + finally: + future.cancel() + + +@operator +def repeat( + value: T, times: int | None = None, *, interval: float = 0.0 +) -> AsyncIterator[T]: + """Generate the same value a given number of times. + + If ``times`` is ``None``, the value is repeated indefinitely. + An optional interval can be given to space the values out. + """ + args = () if times is None else (times,) + it = itertools.repeat(value, *args) + agen = from_iterable.raw(it) + return time.spaceout.raw(agen, interval) if interval else agen + + +# Counting operators + + +@operator +def range(*args: int, interval: float = 0.0) -> AsyncIterator[int]: + """Generate a given range of numbers. + + It supports the same arguments as the builtin function. + An optional interval can be given to space the values out. + """ + agen = from_iterable.raw(builtins.range(*args)) + return time.spaceout.raw(agen, interval) if interval else agen + + +@operator +def count( + start: int = 0, step: int = 1, *, interval: float = 0.0 +) -> AsyncIterator[int]: + """Generate consecutive numbers indefinitely. + + Optional starting point and increment can be defined, + respectively defaulting to ``0`` and ``1``. + + An optional interval can be given to space the values out. + """ + agen = from_iterable.raw(itertools.count(start, step)) + return time.spaceout.raw(agen, interval) if interval else agen diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py new file mode 100644 index 00000000..1be834e6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py @@ -0,0 +1,83 @@ +"""Extra operators.""" +from __future__ import annotations + +import asyncio +import builtins + +from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any + +from .combine import amap, smap +from ..core import pipable_operator + +__all__ = ["action", "print"] + + +T = TypeVar("T") + + +@pipable_operator +def action( + source: AsyncIterable[T], + func: Callable[[T], Awaitable[Any] | Any], + ordered: bool = True, + task_limit: int | None = None, +) -> AsyncIterator[T]: + """Perform an action for each element of an asynchronous sequence + without modifying it. + + The given function can be synchronous or asynchronous. + + The results can either be returned in or out of order, depending on + the corresponding ``ordered`` argument. This argument is ignored if the + provided function is synchronous. + + The coroutines run concurrently but their amount can be limited using + the ``task_limit`` argument. A value of ``1`` will cause the coroutines + to run sequentially. This argument is ignored if the provided function + is synchronous. + """ + if asyncio.iscoroutinefunction(func): + + async def ainnerfunc(arg: T, *_: object) -> T: + awaitable = func(arg) + assert isinstance(awaitable, Awaitable) + await awaitable + return arg + + return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit) + + else: + + def innerfunc(arg: T, *_: object) -> T: + func(arg) + return arg + + return smap.raw(source, innerfunc) + + +@pipable_operator +def print( + source: AsyncIterable[T], + template: str = "{}", + sep: str = " ", + end: str = "\n", + file: Any | None = None, + flush: bool = False, +) -> AsyncIterator[T]: + """Print each element of an asynchronous sequence without modifying it. + + An optional template can be provided to be formatted with the elements. + All the keyword arguments are forwarded to the builtin function print. + """ + + def func(value: T) -> None: + string = template.format(value) + builtins.print( + string, + sep=sep, + end=end, + file=file, + flush=flush, + ) + + return action.raw(source, func) diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/select.py b/.venv/lib/python3.12/site-packages/aiostream/stream/select.py new file mode 100644 index 00000000..9390f464 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/select.py @@ -0,0 +1,284 @@ +"""Selection operators.""" +from __future__ import annotations + +import asyncio +import builtins +import collections + +from typing import Awaitable, Callable, TypeVar, AsyncIterable, AsyncIterator + +from . import transform +from ..aiter_utils import aiter, anext +from ..core import streamcontext, pipable_operator + +__all__ = [ + "take", + "takelast", + "skip", + "skiplast", + "getitem", + "filter", + "until", + "dropwhile", + "takewhile", +] + +T = TypeVar("T") + + +@pipable_operator +async def take(source: AsyncIterable[T], n: int) -> AsyncIterator[T]: + """Forward the first ``n`` elements from an asynchronous sequence. + + If ``n`` is negative, it simply terminates before iterating the source. + """ + enumerated = transform.enumerate.raw(source) + async with streamcontext(enumerated) as streamer: + if n <= 0: + return + async for i, item in streamer: + yield item + if i >= n - 1: + return + + +@pipable_operator +async def takelast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]: + """Forward the last ``n`` elements from an asynchronous sequence. + + If ``n`` is negative, it simply terminates after iterating the source. + + Note: it is required to reach the end of the source before the first + element is generated. + """ + queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0) + async with streamcontext(source) as streamer: + async for item in streamer: + queue.append(item) + for item in queue: + yield item + + +@pipable_operator +async def skip(source: AsyncIterable[T], n: int) -> AsyncIterator[T]: + """Forward an asynchronous sequence, skipping the first ``n`` elements. + + If ``n`` is negative, no elements are skipped. + """ + enumerated = transform.enumerate.raw(source) + async with streamcontext(enumerated) as streamer: + async for i, item in streamer: + if i >= n: + yield item + + +@pipable_operator +async def skiplast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]: + """Forward an asynchronous sequence, skipping the last ``n`` elements. + + If ``n`` is negative, no elements are skipped. + + Note: it is required to reach the ``n+1`` th element of the source + before the first element is generated. + """ + queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0) + async with streamcontext(source) as streamer: + async for item in streamer: + if n <= 0: + yield item + continue + if len(queue) == n: + yield queue[0] + queue.append(item) + + +@pipable_operator +async def filterindex( + source: AsyncIterable[T], func: Callable[[int], bool] +) -> AsyncIterator[T]: + """Filter an asynchronous sequence using the index of the elements. + + The given function is synchronous, takes the index as an argument, + and returns ``True`` if the corresponding should be forwarded, + ``False`` otherwise. + """ + enumerated = transform.enumerate.raw(source) + async with streamcontext(enumerated) as streamer: + async for i, item in streamer: + if func(i): + yield item + + +@pipable_operator +def slice(source: AsyncIterable[T], *args: int) -> AsyncIterator[T]: + """Slice an asynchronous sequence. + + The arguments are the same as the builtin type slice. + + There are two limitations compare to regular slices: + - Positive stop index with negative start index is not supported + - Negative step is not supported + """ + s = builtins.slice(*args) + start, stop, step = s.start or 0, s.stop, s.step or 1 + aiterator = aiter(source) + # Filter the first items + if start < 0: + aiterator = takelast.raw(aiterator, abs(start)) + elif start > 0: + aiterator = skip.raw(aiterator, start) + # Filter the last items + if stop is not None: + if stop >= 0 and start < 0: + raise ValueError("Positive stop with negative start is not supported") + elif stop >= 0: + aiterator = take.raw(aiterator, stop - start) + else: + aiterator = skiplast.raw(aiterator, abs(stop)) + # Filter step items + if step is not None: + if step > 1: + aiterator = filterindex.raw(aiterator, lambda i: i % step == 0) + elif step < 0: + raise ValueError("Negative step not supported") + # Return + return aiterator + + +@pipable_operator +async def item(source: AsyncIterable[T], index: int) -> AsyncIterator[T]: + """Forward the ``n``th element of an asynchronous sequence. + + The index can be negative and works like regular indexing. + If the index is out of range, and ``IndexError`` is raised. + """ + # Prepare + if index >= 0: + source = skip.raw(source, index) + else: + source = takelast(source, abs(index)) + async with streamcontext(source) as streamer: + # Get first item + try: + result = await anext(streamer) + except StopAsyncIteration: + raise IndexError("Index out of range") + # Check length + if index < 0: + count = 1 + async for _ in streamer: + count += 1 + if count != abs(index): + raise IndexError("Index out of range") + # Yield result + yield result + + +@pipable_operator +def getitem(source: AsyncIterable[T], index: int | builtins.slice) -> AsyncIterator[T]: + """Forward one or several items from an asynchronous sequence. + + The argument can either be a slice or an integer. + See the slice and item operators for more information. + """ + if isinstance(index, builtins.slice): + return slice.raw(source, index.start, index.stop, index.step) + if isinstance(index, int): + return item.raw(source, index) + raise TypeError("Not a valid index (int or slice)") + + +@pipable_operator +async def filter( + source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]] +) -> AsyncIterator[T]: + """Filter an asynchronous sequence using an arbitrary function. + + The function takes the item as an argument and returns ``True`` + if it should be forwarded, ``False`` otherwise. + The function can either be synchronous or asynchronous. + """ + iscorofunc = asyncio.iscoroutinefunction(func) + async with streamcontext(source) as streamer: + async for item in streamer: + result = func(item) + if iscorofunc: + assert isinstance(result, Awaitable) + result = await result + if result: + yield item + + +@pipable_operator +async def until( + source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]] +) -> AsyncIterator[T]: + """Forward an asynchronous sequence until a condition is met. + + Contrary to the ``takewhile`` operator, the last tested element is included + in the sequence. + + The given function takes the item as an argument and returns a boolean + corresponding to the condition to meet. The function can either be + synchronous or asynchronous. + """ + iscorofunc = asyncio.iscoroutinefunction(func) + async with streamcontext(source) as streamer: + async for item in streamer: + result = func(item) + if iscorofunc: + assert isinstance(result, Awaitable) + result = await result + yield item + if result: + return + + +@pipable_operator +async def takewhile( + source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]] +) -> AsyncIterator[T]: + """Forward an asynchronous sequence while a condition is met. + + Contrary to the ``until`` operator, the last tested element is not included + in the sequence. + + The given function takes the item as an argument and returns a boolean + corresponding to the condition to meet. The function can either be + synchronous or asynchronous. + """ + iscorofunc = asyncio.iscoroutinefunction(func) + async with streamcontext(source) as streamer: + async for item in streamer: + result = func(item) + if iscorofunc: + assert isinstance(result, Awaitable) + result = await result + if not result: + return + yield item + + +@pipable_operator +async def dropwhile( + source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]] +) -> AsyncIterator[T]: + """Discard the elements from an asynchronous sequence + while a condition is met. + + The given function takes the item as an argument and returns a boolean + corresponding to the condition to meet. The function can either be + synchronous or asynchronous. + """ + iscorofunc = asyncio.iscoroutinefunction(func) + async with streamcontext(source) as streamer: + async for item in streamer: + result = func(item) + if iscorofunc: + assert isinstance(result, Awaitable) + result = await result + if not result: + yield item + break + async for item in streamer: + yield item diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/time.py b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py new file mode 100644 index 00000000..c9c0df88 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py @@ -0,0 +1,56 @@ +"""Time-specific operators.""" +from __future__ import annotations +import asyncio + +from ..aiter_utils import anext +from ..core import streamcontext, pipable_operator + +from typing import TypeVar, AsyncIterable, AsyncIterator + +__all__ = ["spaceout", "delay", "timeout"] + + +T = TypeVar("T") + + +@pipable_operator +async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]: + """Make sure the elements of an asynchronous sequence are separated + in time by the given interval. + """ + timeout = 0.0 + loop = asyncio.get_event_loop() + async with streamcontext(source) as streamer: + async for item in streamer: + delta = timeout - loop.time() + delay = delta if delta > 0 else 0.0 + await asyncio.sleep(delay) + yield item + timeout = loop.time() + interval + + +@pipable_operator +async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]: + """Raise a time-out if an element of the asynchronous sequence + takes too long to arrive. + + Note: the timeout is not global but specific to each step of + the iteration. + """ + async with streamcontext(source) as streamer: + while True: + try: + item = await asyncio.wait_for(anext(streamer), timeout) + except StopAsyncIteration: + break + else: + yield item + + +@pipable_operator +async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]: + """Delay the iteration of an asynchronous sequence.""" + await asyncio.sleep(delay) + async with streamcontext(source) as streamer: + async for item in streamer: + yield item diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py b/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py new file mode 100644 index 00000000..f11bffa6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py @@ -0,0 +1,128 @@ +"""Transformation operators.""" + +from __future__ import annotations + +import asyncio +import itertools +from typing import ( + Protocol, + TypeVar, + AsyncIterable, + AsyncIterator, + Awaitable, + cast, +) + +from ..core import streamcontext, pipable_operator + +from . import select +from . import create +from . import aggregate +from .combine import map, amap, smap + +__all__ = ["map", "enumerate", "starmap", "cycle", "chunks"] + +# map, amap and smap are also transform operators +map, amap, smap + +T = TypeVar("T") +U = TypeVar("U") + + +@pipable_operator +async def enumerate( + source: AsyncIterable[T], start: int = 0, step: int = 1 +) -> AsyncIterator[tuple[int, T]]: + """Generate ``(index, value)`` tuples from an asynchronous sequence. + + This index is computed using a starting point and an increment, + respectively defaulting to ``0`` and ``1``. + """ + count = itertools.count(start, step) + async with streamcontext(source) as streamer: + async for item in streamer: + yield next(count), item + + +X = TypeVar("X", contravariant=True) +Y = TypeVar("Y", covariant=True) + + +class AsyncStarmapCallable(Protocol[X, Y]): + def __call__(self, arg: X, /, *args: X) -> Awaitable[Y]: + ... + + +class SyncStarmapCallable(Protocol[X, Y]): + def __call__(self, arg: X, /, *args: X) -> Y: + ... + + +@pipable_operator +def starmap( + source: AsyncIterable[tuple[T, ...]], + func: SyncStarmapCallable[T, U] | AsyncStarmapCallable[T, U], + ordered: bool = True, + task_limit: int | None = None, +) -> AsyncIterator[U]: + """Apply a given function to the unpacked elements of + an asynchronous sequence. + + Each element is unpacked before applying the function. + The given function can either be synchronous or asynchronous. + + The results can either be returned in or out of order, depending on + the corresponding ``ordered`` argument. This argument is ignored if + the provided function is synchronous. + + The coroutines run concurrently but their amount can be limited using + the ``task_limit`` argument. A value of ``1`` will cause the coroutines + to run sequentially. This argument is ignored if the provided function + is synchronous. + """ + if asyncio.iscoroutinefunction(func): + async_func = cast("AsyncStarmapCallable[T, U]", func) + + async def astarfunc(args: tuple[T, ...], *_: object) -> U: + awaitable = async_func(*args) + return await awaitable + + return amap.raw(source, astarfunc, ordered=ordered, task_limit=task_limit) + + else: + sync_func = cast("SyncStarmapCallable[T, U]", func) + + def starfunc(args: tuple[T, ...], *_: object) -> U: + return sync_func(*args) + + return smap.raw(source, starfunc) + + +@pipable_operator +async def cycle(source: AsyncIterable[T]) -> AsyncIterator[T]: + """Iterate indefinitely over an asynchronous sequence. + + Note: it does not perform any buffering, but re-iterate over + the same given sequence instead. If the sequence is not + re-iterable, the generator might end up looping indefinitely + without yielding any item. + """ + while True: + async with streamcontext(source) as streamer: + async for item in streamer: + yield item + # Prevent blocking while loop if the stream is empty + await asyncio.sleep(0) + + +@pipable_operator +async def chunks(source: AsyncIterable[T], n: int) -> AsyncIterator[list[T]]: + """Generate chunks of size ``n`` from an asynchronous sequence. + + The chunks are lists, and the last chunk might contain less than ``n`` + elements. + """ + async with streamcontext(source) as streamer: + async for first in streamer: + xs = select.take(create.preserve(streamer), n - 1) + yield [first] + await aggregate.list(xs) |