diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/aiostream | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiostream')
16 files changed, 2577 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiostream/__init__.py b/.venv/lib/python3.12/site-packages/aiostream/__init__.py new file mode 100644 index 00000000..3ac80c16 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/__init__.py @@ -0,0 +1,31 @@ +"""Generator-based operators for asynchronous iteration. + +The two main modules are: +- stream: provide all the stream operators (to create new stream objects) +- pipe: provides all the pipe operators (to combine operators using '|') + +Additionally, three core objects are exposed: +- streamcontext: a context for safe stream iteration +- StreamEmpty: the exception raised when an empty stream is awaited +- operator: a decorator to create stream operators from async generators + +Some utility modules are also provided: +- aiter_utils: utilties for asynchronous iteration +- context_utils: utilites for asynchronous context +- test_utils: utilities for testing stream operators (require pytest) +""" + +from . import stream, pipe +from .aiter_utils import async_, await_ +from .core import StreamEmpty, operator, pipable_operator, streamcontext + +__all__ = [ + "stream", + "pipe", + "async_", + "await_", + "operator", + "pipable_operator", + "streamcontext", + "StreamEmpty", +] diff --git a/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py b/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py new file mode 100644 index 00000000..f68ea846 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/aiter_utils.py @@ -0,0 +1,262 @@ +"""Utilities for asynchronous iteration.""" +from __future__ import annotations +from types import TracebackType + +import warnings +import functools +from typing import ( + TYPE_CHECKING, + AsyncContextManager, + AsyncGenerator, + AsyncIterable, + Awaitable, + Callable, + Type, + TypeVar, + AsyncIterator, + Any, +) + +if TYPE_CHECKING: + from typing_extensions import ParamSpec + + P = ParamSpec("P") + +from contextlib import AsyncExitStack + +__all__ = [ + "aiter", + "anext", + "await_", + "async_", + "is_async_iterable", + "assert_async_iterable", + "is_async_iterator", + "assert_async_iterator", + "AsyncIteratorContext", + "aitercontext", + "AsyncExitStack", +] + + +# Magic method shorcuts + + +def aiter(obj: AsyncIterable[T]) -> AsyncIterator[T]: + """Access aiter magic method.""" + assert_async_iterable(obj) + return obj.__aiter__() + + +def anext(obj: AsyncIterator[T]) -> Awaitable[T]: + """Access anext magic method.""" + assert_async_iterator(obj) + return obj.__anext__() + + +# Async / await helper functions + + +async def await_(obj: Awaitable[T]) -> T: + """Identity coroutine function.""" + return await obj + + +def async_(fn: Callable[P, Awaitable[T]]) -> Callable[P, Awaitable[T]]: + """Wrap the given function into a coroutine function.""" + + @functools.wraps(fn) + async def wrapper(*args: P.args, **kwargs: P.kwargs) -> T: + return await fn(*args, **kwargs) + + return wrapper + + +# Iterability helpers + + +def is_async_iterable(obj: object) -> bool: + """Check if the given object is an asynchronous iterable.""" + return hasattr(obj, "__aiter__") + + +def assert_async_iterable(obj: object) -> None: + """Raise a TypeError if the given object is not an + asynchronous iterable. + """ + if not is_async_iterable(obj): + raise TypeError(f"{type(obj).__name__!r} object is not async iterable") + + +def is_async_iterator(obj: object) -> bool: + """Check if the given object is an asynchronous iterator.""" + return hasattr(obj, "__anext__") + + +def assert_async_iterator(obj: object) -> None: + """Raise a TypeError if the given object is not an + asynchronous iterator. + """ + if not is_async_iterator(obj): + raise TypeError(f"{type(obj).__name__!r} object is not an async iterator") + + +# Async iterator context + +T = TypeVar("T") +Self = TypeVar("Self", bound="AsyncIteratorContext[Any]") + + +class AsyncIteratorContext(AsyncIterator[T], AsyncContextManager[Any]): + """Asynchronous iterator with context management. + + The context management makes sure the aclose asynchronous method + of the corresponding iterator has run before it exits. It also issues + warnings and RuntimeError if it is used incorrectly. + + Correct usage:: + + ait = some_asynchronous_iterable() + async with AsyncIteratorContext(ait) as safe_ait: + async for item in safe_ait: + <block> + + It is nonetheless not meant to use directly. + Prefer aitercontext helper instead. + """ + + _STANDBY = "STANDBY" + _RUNNING = "RUNNING" + _FINISHED = "FINISHED" + + def __init__(self, aiterator: AsyncIterator[T]): + """Initialize with an asynchrnous iterator.""" + assert_async_iterator(aiterator) + if isinstance(aiterator, AsyncIteratorContext): + raise TypeError(f"{aiterator!r} is already an AsyncIteratorContext") + self._state = self._STANDBY + self._aiterator = aiterator + + def __aiter__(self: Self) -> Self: + return self + + def __anext__(self) -> Awaitable[T]: + if self._state == self._FINISHED: + raise RuntimeError( + f"{type(self).__name__} is closed and cannot be iterated" + ) + if self._state == self._STANDBY: + warnings.warn( + f"{type(self).__name__} is iterated outside of its context", + stacklevel=2, + ) + return anext(self._aiterator) + + async def __aenter__(self: Self) -> Self: + if self._state == self._RUNNING: + raise RuntimeError(f"{type(self).__name__} has already been entered") + if self._state == self._FINISHED: + raise RuntimeError( + f"{type(self).__name__} is closed and cannot be iterated" + ) + self._state = self._RUNNING + return self + + async def __aexit__( + self, + typ: Type[BaseException] | None, + value: BaseException | None, + traceback: TracebackType | None, + ) -> bool: + try: + if self._state == self._FINISHED: + return False + try: + # No exception to throw + if typ is None: + return False + + # Prevent GeneratorExit from being silenced + if typ is GeneratorExit: + return False + + # No method to throw + if not hasattr(self._aiterator, "athrow"): + return False + + # No frame to throw + if not getattr(self._aiterator, "ag_frame", True): + return False + + # Cannot throw at the moment + if getattr(self._aiterator, "ag_running", False): + return False + + # Throw + try: + assert isinstance(self._aiterator, AsyncGenerator) + await self._aiterator.athrow(typ, value, traceback) + raise RuntimeError("Async iterator didn't stop after athrow()") + + # Exception has been (most probably) silenced + except StopAsyncIteration as exc: + return exc is not value + + # A (possibly new) exception has been raised + except BaseException as exc: + if exc is value: + return False + raise + finally: + # Look for an aclose method + aclose = getattr(self._aiterator, "aclose", None) + + # The ag_running attribute has been introduced with python 3.8 + running = getattr(self._aiterator, "ag_running", False) + closed = not getattr(self._aiterator, "ag_frame", True) + + # A RuntimeError is raised if aiterator is running or closed + if aclose and not running and not closed: + try: + await aclose() + + # Work around bpo-35409 + except GeneratorExit: + pass # pragma: no cover + finally: + self._state = self._FINISHED + + async def aclose(self) -> None: + await self.__aexit__(None, None, None) + + async def athrow(self, exc: Exception) -> T: + if self._state == self._FINISHED: + raise RuntimeError(f"{type(self).__name__} is closed and cannot be used") + assert isinstance(self._aiterator, AsyncGenerator) + item: T = await self._aiterator.athrow(exc) + return item + + +def aitercontext( + aiterable: AsyncIterable[T], +) -> AsyncIteratorContext[T]: + """Return an asynchronous context manager from an asynchronous iterable. + + The context management makes sure the aclose asynchronous method + has run before it exits. It also issues warnings and RuntimeError + if it is used incorrectly. + + It is safe to use with any asynchronous iterable and prevent + asynchronous iterator context to be wrapped twice. + + Correct usage:: + + ait = some_asynchronous_iterable() + async with aitercontext(ait) as safe_ait: + async for item in safe_ait: + <block> + """ + aiterator = aiter(aiterable) + if isinstance(aiterator, AsyncIteratorContext): + return aiterator + return AsyncIteratorContext(aiterator) diff --git a/.venv/lib/python3.12/site-packages/aiostream/core.py b/.venv/lib/python3.12/site-packages/aiostream/core.py new file mode 100644 index 00000000..a11f0823 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/core.py @@ -0,0 +1,567 @@ +"""Core objects for stream operators.""" +from __future__ import annotations + +import inspect +import functools +import sys +import warnings + +from .aiter_utils import AsyncIteratorContext, aiter, assert_async_iterable +from typing import ( + Any, + AsyncIterator, + Callable, + Generator, + Iterator, + Protocol, + Union, + TypeVar, + cast, + AsyncIterable, + Awaitable, +) + +from typing_extensions import ParamSpec, Concatenate + + +__all__ = ["Stream", "Streamer", "StreamEmpty", "operator", "streamcontext"] + + +# Exception + + +class StreamEmpty(Exception): + """Exception raised when awaiting an empty stream.""" + + pass + + +# Helpers + +T = TypeVar("T") +X = TypeVar("X") +A = TypeVar("A", contravariant=True) +P = ParamSpec("P") +Q = ParamSpec("Q") + +# Hack for python 3.8 compatibility +if sys.version_info < (3, 9): + P = TypeVar("P") + + +async def wait_stream(aiterable: BaseStream[T]) -> T: + """Wait for an asynchronous iterable to finish and return the last item. + + The iterable is executed within a safe stream context. + A StreamEmpty exception is raised if the sequence is empty. + """ + + class Unassigned: + pass + + last_item: Unassigned | T = Unassigned() + + async with streamcontext(aiterable) as streamer: + async for item in streamer: + last_item = item + + if isinstance(last_item, Unassigned): + raise StreamEmpty() + return last_item + + +# Core objects + + +class BaseStream(AsyncIterable[T], Awaitable[T]): + """ + Base class for streams. + + See `Stream` and `Streamer` for more information. + """ + + def __init__(self, factory: Callable[[], AsyncIterable[T]]) -> None: + """Initialize the stream with an asynchronous iterable factory. + + The factory is a callable and takes no argument. + The factory return value is an asynchronous iterable. + """ + aiter = factory() + assert_async_iterable(aiter) + self._generator = self._make_generator(aiter, factory) + + def _make_generator( + self, first: AsyncIterable[T], factory: Callable[[], AsyncIterable[T]] + ) -> Iterator[AsyncIterable[T]]: + """Generate asynchronous iterables when required. + + The first iterable is created beforehand for extra checking. + """ + yield first + del first + while True: + yield factory() + + def __await__(self) -> Generator[Any, None, T]: + """Await protocol. + + Safely iterate and return the last element. + """ + return wait_stream(self).__await__() + + def __or__(self, func: Callable[[BaseStream[T]], X]) -> X: + """Pipe protocol. + + Allow to pipe stream operators. + """ + return func(self) + + def __add__(self, value: AsyncIterable[X]) -> Stream[Union[X, T]]: + """Addition protocol. + + Concatenate with a given asynchronous sequence. + """ + from .stream import chain + + return chain(self, value) + + def __getitem__(self, value: Union[int, slice]) -> Stream[T]: + """Get item protocol. + + Accept index or slice to extract the corresponding item(s) + """ + from .stream import getitem + + return getitem(self, value) + + # Disable sync iteration + # This is necessary because __getitem__ is defined + # which is a valid fallback for for-loops in python + __iter__: None = None + + +class Stream(BaseStream[T]): + """Enhanced asynchronous iterable. + + It provides the following features: + + - **Operator pipe-lining** - using pipe symbol ``|`` + - **Repeatability** - every iteration creates a different iterator + - **Safe iteration context** - using ``async with`` and the ``stream`` + method + - **Simplified execution** - get the last element from a stream using + ``await`` + - **Slicing and indexing** - using square brackets ``[]`` + - **Concatenation** - using addition symbol ``+`` + + It is not meant to be instanciated directly. + Use the stream operators instead. + + Example:: + + xs = stream.count() # xs is a stream object + ys = xs | pipe.skip(5) # pipe xs and skip the first 5 elements + zs = ys[5:10:2] # slice ys using start, stop and step + + async with zs.stream() as streamer: # stream zs in a safe context + async for z in streamer: # iterate the zs streamer + print(z) # Prints 10, 12, 14 + + result = await zs # await zs and return its last element + print(result) # Prints 14 + result = await zs # zs can be used several times + print(result) # Prints 14 + """ + + def stream(self) -> Streamer[T]: + """Return a streamer context for safe iteration. + + Example:: + + xs = stream.count() + async with xs.stream() as streamer: + async for item in streamer: + <block> + + """ + return self.__aiter__() + + def __aiter__(self) -> Streamer[T]: + """Asynchronous iteration protocol. + + Return a streamer context for safe iteration. + """ + return streamcontext(next(self._generator)) + + # Advertise the proper synthax for entering a stream context + + __aexit__: None = None + + async def __aenter__(self) -> None: + raise TypeError( + "A stream object cannot be used as a context manager. " + "Use the `stream` method instead: " + "`async with xs.stream() as streamer`" + ) + + +class Streamer(AsyncIteratorContext[T], BaseStream[T]): + """Enhanced asynchronous iterator context. + + It is similar to AsyncIteratorContext but provides the stream + magic methods for concatenation, indexing and awaiting. + + It's not meant to be instanciated directly, use streamcontext instead. + + Example:: + + ait = some_asynchronous_iterable() + async with streamcontext(ait) as streamer: + async for item in streamer: + await streamer[5] + """ + + pass + + +def streamcontext(aiterable: AsyncIterable[T]) -> Streamer[T]: + """Return a stream context manager from an asynchronous iterable. + + The context management makes sure the aclose asynchronous method + of the corresponding iterator has run before it exits. It also issues + warnings and RuntimeError if it is used incorrectly. + + It is safe to use with any asynchronous iterable and prevent + asynchronous iterator context to be wrapped twice. + + Correct usage:: + + ait = some_asynchronous_iterable() + async with streamcontext(ait) as streamer: + async for item in streamer: + <block> + + For streams objects, it is possible to use the stream method instead:: + + xs = stream.count() + async with xs.stream() as streamer: + async for item in streamer: + <block> + """ + aiterator = aiter(aiterable) + if isinstance(aiterator, Streamer): + return aiterator + return Streamer(aiterator) + + +# Operator type protocol + + +class OperatorType(Protocol[P, T]): + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Stream[T]: + ... + + def raw(self, *args: P.args, **kwargs: P.kwargs) -> AsyncIterator[T]: + ... + + +class PipableOperatorType(Protocol[A, P, T]): + def __call__( + self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs + ) -> Stream[T]: + ... + + def raw( + self, source: AsyncIterable[A], /, *args: P.args, **kwargs: P.kwargs + ) -> AsyncIterator[T]: + ... + + def pipe( + self, *args: P.args, **kwargs: P.kwargs + ) -> Callable[[AsyncIterable[A]], Stream[T]]: + ... + + +# Operator decorator + + +def operator( + func: Callable[P, AsyncIterator[T]] | None = None, + pipable: bool | None = None, +) -> OperatorType[P, T]: + """Create a stream operator from an asynchronous generator + (or any function returning an asynchronous iterable). + + Decorator usage:: + + @operator + async def random(offset=0., width=1.): + while True: + yield offset + width * random.random() + + The return value is a dynamically created class. + It has the same name, module and doc as the original function. + + A new stream is created by simply instanciating the operator:: + + xs = random() + + The original function is called at instanciation to check that + signature match. Other methods are available: + + - `original`: the original function as a static method + - `raw`: same as original but add extra checking + + The `pipable` argument is deprecated, use `pipable_operator` instead. + """ + + # Handle compatibility with legacy (aiostream <= 0.4) + if pipable is not None or func is None: + warnings.warn( + "The `pipable` argument is deprecated. Use either `@operator` or `@pipable_operator` directly.", + DeprecationWarning, + ) + if func is None: + return pipable_operator if pipable else operator # type: ignore + if pipable is True: + return pipable_operator(func) # type: ignore + + # First check for classmethod instance, to avoid more confusing errors later on + if isinstance(func, classmethod): + raise ValueError( + "An operator cannot be created from a class method, " + "since the decorated function becomes an operator class" + ) + + # Gather data + bases = (Stream,) + name = func.__name__ + module = func.__module__ + extra_doc = func.__doc__ + doc = extra_doc or f"Regular {name} stream operator." + + # Extract signature + signature = inspect.signature(func) + parameters = list(signature.parameters.values()) + if parameters and parameters[0].name in ("self", "cls"): + raise ValueError( + "An operator cannot be created from a method, " + "since the decorated function becomes an operator class" + ) + + # Look for "more_sources" + for i, p in enumerate(parameters): + if p.name == "more_sources" and p.kind == inspect.Parameter.VAR_POSITIONAL: + more_sources_index = i + break + else: + more_sources_index = None + + # Injected parameters + self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD) + inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD) + + # Wrapped static method + original = func + original.__qualname__ = name + ".original" + + # Raw static method + raw = func + raw.__qualname__ = name + ".raw" + + # Init method + def init(self: BaseStream[T], *args: P.args, **kwargs: P.kwargs) -> None: + if more_sources_index is not None: + for source in args[more_sources_index:]: + assert_async_iterable(source) + factory = functools.partial(raw, *args, **kwargs) + return BaseStream.__init__(self, factory) + + # Customize init signature + new_parameters = [self_parameter] + parameters + init.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined] + + # Customize init method + init.__qualname__ = name + ".__init__" + init.__name__ = "__init__" + init.__module__ = module + init.__doc__ = f"Initialize the {name} stream." + + # Gather attributes + attrs = { + "__init__": init, + "__module__": module, + "__doc__": doc, + "raw": staticmethod(raw), + "original": staticmethod(original), + } + + # Create operator class + return cast("OperatorType[P, T]", type(name, bases, attrs)) + + +def pipable_operator( + func: Callable[Concatenate[AsyncIterable[X], P], AsyncIterator[T]], +) -> PipableOperatorType[X, P, T]: + """Create a pipable stream operator from an asynchronous generator + (or any function returning an asynchronous iterable). + + Decorator usage:: + + @pipable_operator + async def multiply(source, factor): + async with streamcontext(source) as streamer: + async for item in streamer: + yield factor * item + + The first argument is expected to be the asynchronous iteratable used + for piping. + + The return value is a dynamically created class. + It has the same name, module and doc as the original function. + + A new stream is created by simply instanciating the operator:: + + xs = random() + ys = multiply(xs, 2) + + The original function is called at instanciation to check that + signature match. The source is also checked for asynchronous iteration. + + The operator also have a pipe class method that can be used along + with the piping synthax:: + + xs = random() + ys = xs | multiply.pipe(2) + + This is strictly equivalent to the previous example. + + Other methods are available: + + - `original`: the original function as a static method + - `raw`: same as original but add extra checking + + The raw method is useful to create new operators from existing ones:: + + @pipable_operator + def double(source): + return multiply.raw(source, 2) + """ + + # First check for classmethod instance, to avoid more confusing errors later on + if isinstance(func, classmethod): + raise ValueError( + "An operator cannot be created from a class method, " + "since the decorated function becomes an operator class" + ) + + # Gather data + bases = (Stream,) + name = func.__name__ + module = func.__module__ + extra_doc = func.__doc__ + doc = extra_doc or f"Regular {name} stream operator." + + # Extract signature + signature = inspect.signature(func) + parameters = list(signature.parameters.values()) + if parameters and parameters[0].name in ("self", "cls"): + raise ValueError( + "An operator cannot be created from a method, " + "since the decorated function becomes an operator class" + ) + + # Look for "more_sources" + for i, p in enumerate(parameters): + if p.name == "more_sources" and p.kind == inspect.Parameter.VAR_POSITIONAL: + more_sources_index = i + break + else: + more_sources_index = None + + # Injected parameters + self_parameter = inspect.Parameter("self", inspect.Parameter.POSITIONAL_OR_KEYWORD) + cls_parameter = inspect.Parameter("cls", inspect.Parameter.POSITIONAL_OR_KEYWORD) + + # Wrapped static method + original = func + original.__qualname__ = name + ".original" + + # Raw static method + def raw( + arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs + ) -> AsyncIterator[T]: + assert_async_iterable(arg) + if more_sources_index is not None: + for source in args[more_sources_index - 1 :]: + assert_async_iterable(source) + return func(arg, *args, **kwargs) + + # Custonize raw method + raw.__signature__ = signature # type: ignore[attr-defined] + raw.__qualname__ = name + ".raw" + raw.__module__ = module + raw.__doc__ = doc + + # Init method + def init( + self: BaseStream[T], arg: AsyncIterable[X], *args: P.args, **kwargs: P.kwargs + ) -> None: + assert_async_iterable(arg) + if more_sources_index is not None: + for source in args[more_sources_index - 1 :]: + assert_async_iterable(source) + factory = functools.partial(raw, arg, *args, **kwargs) + return BaseStream.__init__(self, factory) + + # Customize init signature + new_parameters = [self_parameter] + parameters + init.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined] + + # Customize init method + init.__qualname__ = name + ".__init__" + init.__name__ = "__init__" + init.__module__ = module + init.__doc__ = f"Initialize the {name} stream." + + # Pipe class method + def pipe( + cls: PipableOperatorType[X, P, T], + /, + *args: P.args, + **kwargs: P.kwargs, + ) -> Callable[[AsyncIterable[X]], Stream[T]]: + return lambda source: cls(source, *args, **kwargs) + + # Customize pipe signature + if parameters and parameters[0].kind in ( + inspect.Parameter.POSITIONAL_ONLY, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + ): + new_parameters = [cls_parameter] + parameters[1:] + else: + new_parameters = [cls_parameter] + parameters + pipe.__signature__ = signature.replace(parameters=new_parameters) # type: ignore[attr-defined] + + # Customize pipe method + pipe.__qualname__ = name + ".pipe" + pipe.__module__ = module + pipe.__doc__ = f'Pipable "{name}" stream operator.' + if extra_doc: + pipe.__doc__ += "\n\n " + extra_doc + + # Gather attributes + attrs = { + "__init__": init, + "__module__": module, + "__doc__": doc, + "raw": staticmethod(raw), + "original": staticmethod(original), + "pipe": classmethod(pipe), # type: ignore[arg-type] + } + + # Create operator class + return cast( + "PipableOperatorType[X, P, T]", + type(name, bases, attrs), + ) diff --git a/.venv/lib/python3.12/site-packages/aiostream/manager.py b/.venv/lib/python3.12/site-packages/aiostream/manager.py new file mode 100644 index 00000000..bab224a5 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/manager.py @@ -0,0 +1,159 @@ +"""Provide a context to easily manage several streamers running +concurrently. +""" +from __future__ import annotations + +import asyncio +from .aiter_utils import AsyncExitStack + +from .aiter_utils import anext +from .core import streamcontext +from typing import ( + TYPE_CHECKING, + Awaitable, + List, + Set, + Tuple, + Generic, + TypeVar, + Any, + Type, + AsyncIterable, +) +from types import TracebackType + +if TYPE_CHECKING: + from asyncio import Task + from aiostream.core import Streamer + +T = TypeVar("T") + + +class TaskGroup: + def __init__(self) -> None: + self._pending: set[Task[Any]] = set() + + async def __aenter__(self) -> TaskGroup: + return self + + async def __aexit__( + self, + typ: Type[BaseException] | None, + value: BaseException | None, + traceback: TracebackType | None, + ) -> None: + while self._pending: + task = self._pending.pop() + await self.cancel_task(task) + + def create_task(self, coro: Awaitable[T]) -> Task[T]: + task = asyncio.ensure_future(coro) + self._pending.add(task) + return task + + async def wait_any(self, tasks: List[Task[T]]) -> Set[Task[T]]: + done, _ = await asyncio.wait(tasks, return_when="FIRST_COMPLETED") + self._pending -= done + return done + + async def wait_all(self, tasks: List[Task[T]]) -> Set[Task[T]]: + if not tasks: + return set() + done, _ = await asyncio.wait(tasks) + self._pending -= done + return done + + async def cancel_task(self, task: Task[Any]) -> None: + try: + # The task is already cancelled + if task.cancelled(): + pass + # The task is already finished + elif task.done(): + # Discard the pending exception (if any). + # This makes sense since we don't know in which context the exception + # was meant to be processed. For instance, a `StopAsyncIteration` + # might be raised to notify that the end of a streamer has been reached. + task.exception() + # The task needs to be cancelled and awaited + else: + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + # Silence any exception raised while cancelling the task. + # This might happen if the `CancelledError` is silenced, and the + # corresponding async generator returns, causing the `anext` call + # to raise a `StopAsyncIteration`. + except Exception: + pass + finally: + self._pending.discard(task) + + +class StreamerManager(Generic[T]): + def __init__(self) -> None: + self.tasks: dict[Streamer[T], Task[T]] = {} + self.streamers: list[Streamer[T]] = [] + self.group: TaskGroup = TaskGroup() + self.stack = AsyncExitStack() + + async def __aenter__(self) -> StreamerManager[T]: + await self.stack.__aenter__() + await self.stack.enter_async_context(self.group) + return self + + async def __aexit__( + self, + typ: Type[BaseException] | None, + value: BaseException | None, + traceback: TracebackType | None, + ) -> bool: + for streamer in self.streamers: + task = self.tasks.pop(streamer, None) + if task is not None: + self.stack.push_async_callback(self.group.cancel_task, task) + self.stack.push_async_exit(streamer) + self.tasks.clear() + self.streamers.clear() + return await self.stack.__aexit__(typ, value, traceback) + + async def enter_and_create_task(self, aiter: AsyncIterable[T]) -> Streamer[T]: + streamer = streamcontext(aiter) + await streamer.__aenter__() + self.streamers.append(streamer) + self.create_task(streamer) + return streamer + + def create_task(self, streamer: Streamer[T]) -> None: + assert streamer in self.streamers + assert streamer not in self.tasks + self.tasks[streamer] = self.group.create_task(anext(streamer)) + + async def wait_single_event( + self, filters: list[Streamer[T]] + ) -> Tuple[Streamer[T], Task[T]]: + tasks = [self.tasks[streamer] for streamer in filters] + done = await self.group.wait_any(tasks) + for streamer in filters: + if self.tasks.get(streamer) in done: + return streamer, self.tasks.pop(streamer) + assert False + + async def clean_streamer(self, streamer: Streamer[T]) -> None: + task = self.tasks.pop(streamer, None) + if task is not None: + await self.group.cancel_task(task) + await streamer.aclose() + self.streamers.remove(streamer) + + async def clean_streamers(self, streamers: list[Streamer[T]]) -> None: + tasks = [ + self.group.create_task(self.clean_streamer(streamer)) + for streamer in streamers + ] + done = await self.group.wait_all(tasks) + # Raise exception if any + for task in done: + task.result() diff --git a/.venv/lib/python3.12/site-packages/aiostream/pipe.py b/.venv/lib/python3.12/site-packages/aiostream/pipe.py new file mode 100644 index 00000000..c27e101a --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/pipe.py @@ -0,0 +1,39 @@ +"""Gather the pipe operators.""" +from __future__ import annotations + +from . import stream + +accumulate = stream.accumulate.pipe +action = stream.action.pipe +amap = stream.amap.pipe +chain = stream.chain.pipe +chunks = stream.chunks.pipe +concat = stream.concat.pipe +concatmap = stream.concatmap.pipe +cycle = stream.cycle.pipe +delay = stream.delay.pipe +dropwhile = stream.dropwhile.pipe +enumerate = stream.enumerate.pipe +filter = stream.filter.pipe +flatmap = stream.flatmap.pipe +flatten = stream.flatten.pipe +getitem = stream.getitem.pipe +list = stream.list.pipe +map = stream.map.pipe +merge = stream.merge.pipe +print = stream.print.pipe +reduce = stream.reduce.pipe +skip = stream.skip.pipe +skiplast = stream.skiplast.pipe +smap = stream.smap.pipe +spaceout = stream.spaceout.pipe +starmap = stream.starmap.pipe +switch = stream.switch.pipe +switchmap = stream.switchmap.pipe +take = stream.take.pipe +takelast = stream.takelast.pipe +takewhile = stream.takewhile.pipe +timeout = stream.timeout.pipe +until = stream.until.pipe +zip = stream.zip.pipe +ziplatest = stream.ziplatest.pipe diff --git a/.venv/lib/python3.12/site-packages/aiostream/py.typed b/.venv/lib/python3.12/site-packages/aiostream/py.typed new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/py.typed diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py b/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py new file mode 100644 index 00000000..cd20d6c9 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/__init__.py @@ -0,0 +1,10 @@ +"""Gather all the stream operators.""" + +from .create import * +from .transform import * +from .select import * +from .combine import * +from .aggregate import * +from .time import * +from .misc import * +from .advanced import * diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py b/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py new file mode 100644 index 00000000..106f0f1d --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/advanced.py @@ -0,0 +1,222 @@ +"""Advanced operators (to deal with streams of higher order) .""" +from __future__ import annotations + +from typing import AsyncIterator, AsyncIterable, TypeVar, Union, cast +from typing_extensions import ParamSpec + +from . import combine + +from ..core import Streamer, pipable_operator +from ..manager import StreamerManager + + +__all__ = ["concat", "flatten", "switch", "concatmap", "flatmap", "switchmap"] + + +T = TypeVar("T") +U = TypeVar("U") +P = ParamSpec("P") + + +# Helper to manage stream of higher order + + +@pipable_operator +async def base_combine( + source: AsyncIterable[AsyncIterable[T]], + switch: bool = False, + ordered: bool = False, + task_limit: int | None = None, +) -> AsyncIterator[T]: + """Base operator for managing an asynchronous sequence of sequences. + + The sequences are awaited concurrently, although it's possible to limit + the amount of running sequences using the `task_limit` argument. + + The ``switch`` argument enables the switch mecanism, which cause the + previous subsequence to be discarded when a new one is created. + + The items can either be generated in order or as soon as they're received, + depending on the ``ordered`` argument. + """ + + # Task limit + if task_limit is not None and not task_limit > 0: + raise ValueError("The task limit must be None or greater than 0") + + # Safe context + async with StreamerManager[Union[AsyncIterable[T], T]]() as manager: + main_streamer: Streamer[ + AsyncIterable[T] | T + ] | None = await manager.enter_and_create_task(source) + + # Loop over events + while manager.tasks: + # Extract streamer groups + substreamers = manager.streamers[1:] + mainstreamers = [main_streamer] if main_streamer in manager.tasks else [] + + # Switch - use the main streamer then the substreamer + if switch: + filters = mainstreamers + substreamers + # Concat - use the first substreamer then the main streamer + elif ordered: + filters = substreamers[:1] + mainstreamers + # Flat - use the substreamers then the main streamer + else: + filters = substreamers + mainstreamers + + # Wait for next event + streamer, task = await manager.wait_single_event(filters) + + # Get result + try: + result = task.result() + + # End of stream + except StopAsyncIteration: + # Main streamer is finished + if streamer is main_streamer: + main_streamer = None + + # A substreamer is finished + else: + await manager.clean_streamer(streamer) + + # Re-schedule the main streamer if necessary + if main_streamer is not None and main_streamer not in manager.tasks: + manager.create_task(main_streamer) + + # Process result + else: + # Switch mecanism + if switch and streamer is main_streamer: + await manager.clean_streamers(substreamers) + + # Setup a new source + if streamer is main_streamer: + assert isinstance(result, AsyncIterable) + await manager.enter_and_create_task(result) + + # Re-schedule the main streamer if task limit allows it + if task_limit is None or task_limit > len(manager.tasks): + manager.create_task(streamer) + + # Yield the result + else: + item = cast("T", result) + yield item + + # Re-schedule the streamer + manager.create_task(streamer) + + +# Advanced operators (for streams of higher order) + + +@pipable_operator +def concat( + source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None +) -> AsyncIterator[T]: + """Given an asynchronous sequence of sequences, generate the elements + of the sequences in order. + + The sequences are awaited concurrently, although it's possible to limit + the amount of running sequences using the `task_limit` argument. + + Errors raised in the source or an element sequence are propagated. + """ + return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=True) + + +@pipable_operator +def flatten( + source: AsyncIterable[AsyncIterable[T]], task_limit: int | None = None +) -> AsyncIterator[T]: + """Given an asynchronous sequence of sequences, generate the elements + of the sequences as soon as they're received. + + The sequences are awaited concurrently, although it's possible to limit + the amount of running sequences using the `task_limit` argument. + + Errors raised in the source or an element sequence are propagated. + """ + return base_combine.raw(source, task_limit=task_limit, switch=False, ordered=False) + + +@pipable_operator +def switch(source: AsyncIterable[AsyncIterable[T]]) -> AsyncIterator[T]: + """Given an asynchronous sequence of sequences, generate the elements of + the most recent sequence. + + Element sequences are generated eagerly, and closed once they are + superseded by a more recent sequence. Once the main sequence is finished, + the last subsequence will be exhausted completely. + + Errors raised in the source or an element sequence (that was not already + closed) are propagated. + """ + return base_combine.raw(source, switch=True) + + +# Advanced *-map operators + + +@pipable_operator +def concatmap( + source: AsyncIterable[T], + func: combine.SmapCallable[T, AsyncIterable[U]], + *more_sources: AsyncIterable[T], + task_limit: int | None = None, +) -> AsyncIterator[U]: + """Apply a given function that creates a sequence from the elements of one + or several asynchronous sequences, and generate the elements of the created + sequences in order. + + The function is applied as described in `map`, and must return an + asynchronous sequence. The returned sequences are awaited concurrently, + although it's possible to limit the amount of running sequences using + the `task_limit` argument. + """ + mapped = combine.smap.raw(source, func, *more_sources) + return concat.raw(mapped, task_limit=task_limit) + + +@pipable_operator +def flatmap( + source: AsyncIterable[T], + func: combine.SmapCallable[T, AsyncIterable[U]], + *more_sources: AsyncIterable[T], + task_limit: int | None = None, +) -> AsyncIterator[U]: + """Apply a given function that creates a sequence from the elements of one + or several asynchronous sequences, and generate the elements of the created + sequences as soon as they arrive. + + The function is applied as described in `map`, and must return an + asynchronous sequence. The returned sequences are awaited concurrently, + although it's possible to limit the amount of running sequences using + the `task_limit` argument. + + Errors raised in a source or output sequence are propagated. + """ + mapped = combine.smap.raw(source, func, *more_sources) + return flatten.raw(mapped, task_limit=task_limit) + + +@pipable_operator +def switchmap( + source: AsyncIterable[T], + func: combine.SmapCallable[T, AsyncIterable[U]], + *more_sources: AsyncIterable[T], +) -> AsyncIterator[U]: + """Apply a given function that creates a sequence from the elements of one + or several asynchronous sequences and generate the elements of the most + recently created sequence. + + The function is applied as described in `map`, and must return an + asynchronous sequence. Errors raised in a source or output sequence (that + was not already closed) are propagated. + """ + mapped = combine.smap.raw(source, func, *more_sources) + return switch.raw(mapped) diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py b/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py new file mode 100644 index 00000000..8ed7c0e1 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py @@ -0,0 +1,90 @@ +"""Aggregation operators.""" +from __future__ import annotations + +import asyncio +import builtins +import operator as op +from typing import AsyncIterator, Awaitable, Callable, TypeVar, AsyncIterable, cast + + +from . import select +from ..aiter_utils import anext +from ..core import pipable_operator, streamcontext + +__all__ = ["accumulate", "reduce", "list"] + +T = TypeVar("T") + + +@pipable_operator +async def accumulate( + source: AsyncIterable[T], + func: Callable[[T, T], Awaitable[T] | T] = op.add, + initializer: T | None = None, +) -> AsyncIterator[T]: + """Generate a series of accumulated sums (or other binary function) + from an asynchronous sequence. + + If ``initializer`` is present, it is placed before the items + of the sequence in the calculation, and serves as a default + when the sequence is empty. + """ + iscorofunc = asyncio.iscoroutinefunction(func) + async with streamcontext(source) as streamer: + # Initialize + if initializer is None: + try: + value = await anext(streamer) + except StopAsyncIteration: + return + else: + value = initializer + # First value + yield value + # Iterate streamer + async for item in streamer: + returned = func(value, item) + if iscorofunc: + awaitable_value = cast("Awaitable[T]", returned) + value = await awaitable_value + else: + value = cast("T", returned) + yield value + + +@pipable_operator +def reduce( + source: AsyncIterable[T], + func: Callable[[T, T], Awaitable[T] | T], + initializer: T | None = None, +) -> AsyncIterator[T]: + """Apply a function of two arguments cumulatively to the items + of an asynchronous sequence, reducing the sequence to a single value. + + If ``initializer`` is present, it is placed before the items + of the sequence in the calculation, and serves as a default when the + sequence is empty. + """ + acc = accumulate.raw(source, func, initializer) + return select.item.raw(acc, -1) + + +@pipable_operator +async def list(source: AsyncIterable[T]) -> AsyncIterator[builtins.list[T]]: + """Build a list from an asynchronous sequence. + + All the intermediate steps are generated, starting from the empty list. + + This operator can be used to easily convert a stream into a list:: + + lst = await stream.list(x) + + ..note:: The same list object is produced at each step in order to avoid + memory copies. + """ + result: builtins.list[T] = [] + yield result + async with streamcontext(source) as streamer: + async for item in streamer: + result.append(item) + yield result diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py b/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py new file mode 100644 index 00000000..a782a730 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/combine.py @@ -0,0 +1,282 @@ +"""Combination operators.""" +from __future__ import annotations + +import asyncio +import builtins + +from typing import ( + Awaitable, + Protocol, + TypeVar, + AsyncIterable, + AsyncIterator, + Callable, + cast, +) +from typing_extensions import ParamSpec + +from ..aiter_utils import AsyncExitStack, anext +from ..core import streamcontext, pipable_operator + +from . import create +from . import select +from . import advanced +from . import aggregate + +__all__ = ["chain", "zip", "map", "merge", "ziplatest", "amap", "smap"] + +T = TypeVar("T") +U = TypeVar("U") +K = TypeVar("K") +P = ParamSpec("P") + + +@pipable_operator +async def chain( + source: AsyncIterable[T], *more_sources: AsyncIterable[T] +) -> AsyncIterator[T]: + """Chain asynchronous sequences together, in the order they are given. + + Note: the sequences are not iterated until it is required, + so if the operation is interrupted, the remaining sequences + will be left untouched. + """ + sources = source, *more_sources + for source in sources: + async with streamcontext(source) as streamer: + async for item in streamer: + yield item + + +@pipable_operator +async def zip( + source: AsyncIterable[T], *more_sources: AsyncIterable[T] +) -> AsyncIterator[tuple[T, ...]]: + """Combine and forward the elements of several asynchronous sequences. + + Each generated value is a tuple of elements, using the same order as + their respective sources. The generation continues until the shortest + sequence is exhausted. + + Note: the different sequences are awaited in parrallel, so that their + waiting times don't add up. + """ + sources = source, *more_sources + + # One sources + if len(sources) == 1: + (source,) = sources + async with streamcontext(source) as streamer: + async for item in streamer: + yield (item,) + return + + # N sources + async with AsyncExitStack() as stack: + # Handle resources + streamers = [ + await stack.enter_async_context(streamcontext(source)) for source in sources + ] + # Loop over items + while True: + try: + coros = builtins.map(anext, streamers) + items = await asyncio.gather(*coros) + except StopAsyncIteration: + break + else: + yield tuple(items) + + +X = TypeVar("X", contravariant=True) +Y = TypeVar("Y", covariant=True) + + +class SmapCallable(Protocol[X, Y]): + def __call__(self, arg: X, /, *args: X) -> Y: + ... + + +class AmapCallable(Protocol[X, Y]): + async def __call__(self, arg: X, /, *args: X) -> Y: + ... + + +class MapCallable(Protocol[X, Y]): + def __call__(self, arg: X, /, *args: X) -> Awaitable[Y] | Y: + ... + + +@pipable_operator +async def smap( + source: AsyncIterable[T], + func: SmapCallable[T, U], + *more_sources: AsyncIterable[T], +) -> AsyncIterator[U]: + """Apply a given function to the elements of one or several + asynchronous sequences. + + Each element is used as a positional argument, using the same order as + their respective sources. The generation continues until the shortest + sequence is exhausted. The function is treated synchronously. + + Note: if more than one sequence is provided, they're awaited concurrently + so that their waiting times don't add up. + """ + stream = zip(source, *more_sources) + async with streamcontext(stream) as streamer: + async for item in streamer: + yield func(*item) + + +@pipable_operator +def amap( + source: AsyncIterable[T], + corofn: AmapCallable[T, U], + *more_sources: AsyncIterable[T], + ordered: bool = True, + task_limit: int | None = None, +) -> AsyncIterator[U]: + """Apply a given coroutine function to the elements of one or several + asynchronous sequences. + + Each element is used as a positional argument, using the same order as + their respective sources. The generation continues until the shortest + sequence is exhausted. + + The results can either be returned in or out of order, depending on + the corresponding ``ordered`` argument. + + The coroutines run concurrently but their amount can be limited using + the ``task_limit`` argument. A value of ``1`` will cause the coroutines + to run sequentially. + + If more than one sequence is provided, they're also awaited concurrently, + so that their waiting times don't add up. + """ + + async def func(arg: T, *args: T) -> AsyncIterable[U]: + yield await corofn(arg, *args) + + if ordered: + return advanced.concatmap.raw( + source, func, *more_sources, task_limit=task_limit + ) + return advanced.flatmap.raw(source, func, *more_sources, task_limit=task_limit) + + +@pipable_operator +def map( + source: AsyncIterable[T], + func: MapCallable[T, U], + *more_sources: AsyncIterable[T], + ordered: bool = True, + task_limit: int | None = None, +) -> AsyncIterator[U]: + """Apply a given function to the elements of one or several + asynchronous sequences. + + Each element is used as a positional argument, using the same order as + their respective sources. The generation continues until the shortest + sequence is exhausted. The function can either be synchronous or + asynchronous (coroutine function). + + The results can either be returned in or out of order, depending on + the corresponding ``ordered`` argument. This argument is ignored if the + provided function is synchronous. + + The coroutines run concurrently but their amount can be limited using + the ``task_limit`` argument. A value of ``1`` will cause the coroutines + to run sequentially. This argument is ignored if the provided function + is synchronous. + + If more than one sequence is provided, they're also awaited concurrently, + so that their waiting times don't add up. + + It might happen that the provided function returns a coroutine but is not + a coroutine function per se. In this case, one can wrap the function with + ``aiostream.async_`` in order to force ``map`` to await the resulting + coroutine. The following example illustrates the use ``async_`` with a + lambda function:: + + from aiostream import stream, async_ + ... + ys = stream.map(xs, async_(lambda ms: asyncio.sleep(ms / 1000))) + """ + if asyncio.iscoroutinefunction(func): + return amap.raw( + source, func, *more_sources, ordered=ordered, task_limit=task_limit + ) + sync_func = cast("SmapCallable[T, U]", func) + return smap.raw(source, sync_func, *more_sources) + + +@pipable_operator +def merge( + source: AsyncIterable[T], *more_sources: AsyncIterable[T] +) -> AsyncIterator[T]: + """Merge several asynchronous sequences together. + + All the sequences are iterated simultaneously and their elements + are forwarded as soon as they're available. The generation continues + until all the sequences are exhausted. + """ + sources = [source, *more_sources] + source_stream: AsyncIterable[AsyncIterable[T]] = create.iterate.raw(sources) + return advanced.flatten.raw(source_stream) + + +@pipable_operator +def ziplatest( + source: AsyncIterable[T], + *more_sources: AsyncIterable[T], + partial: bool = True, + default: T | None = None, +) -> AsyncIterator[tuple[T | None, ...]]: + """Combine several asynchronous sequences together, producing a tuple with + the lastest element of each sequence whenever a new element is received. + + The value to use when a sequence has not procuded any element yet is given + by the ``default`` argument (defaulting to ``None``). + + The producing of partial results can be disabled by setting the optional + argument ``partial`` to ``False``. + + All the sequences are iterated simultaneously and their elements + are forwarded as soon as they're available. The generation continues + until all the sequences are exhausted. + """ + sources = source, *more_sources + n = len(sources) + + # Custom getter + def getter(dct: dict[int, T]) -> Callable[[int], T | None]: + return lambda key: dct.get(key, default) + + # Add source index to the items + def make_func(i: int) -> SmapCallable[T, dict[int, T]]: + def func(x: T, *_: object) -> dict[int, T]: + return {i: x} + + return func + + new_sources = [smap.raw(source, make_func(i)) for i, source in enumerate(sources)] + + # Merge the sources + merged = merge.raw(*new_sources) + + # Accumulate the current state in a dict + accumulated = aggregate.accumulate.raw(merged, lambda x, e: {**x, **e}) + + # Filter partial result + filtered = ( + accumulated + if partial + else select.filter.raw(accumulated, lambda x: len(x) == n) + ) + + # Convert the state dict to a tuple + def dict_to_tuple(x: dict[int, T], *_: object) -> tuple[T | None, ...]: + return tuple(builtins.map(getter(x), range(n))) + + return smap.raw(filtered, dict_to_tuple) diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/create.py b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py new file mode 100644 index 00000000..d7676b71 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py @@ -0,0 +1,191 @@ +"""Non-pipable creation operators.""" +from __future__ import annotations + +import sys +import asyncio +import inspect +import builtins +import itertools + +from typing import ( + AsyncIterable, + Awaitable, + Iterable, + Protocol, + TypeVar, + AsyncIterator, + cast, +) +from typing_extensions import ParamSpec + +from ..stream import time +from ..core import operator, streamcontext + +__all__ = [ + "iterate", + "preserve", + "just", + "call", + "throw", + "empty", + "never", + "repeat", + "range", + "count", +] + +T = TypeVar("T") +P = ParamSpec("P") + +# Hack for python 3.8 compatibility +if sys.version_info < (3, 9): + P = TypeVar("P") + +# Convert regular iterables + + +@operator +async def from_iterable(it: Iterable[T]) -> AsyncIterator[T]: + """Generate values from a regular iterable.""" + for item in it: + await asyncio.sleep(0) + yield item + + +@operator +def from_async_iterable(ait: AsyncIterable[T]) -> AsyncIterator[T]: + """Generate values from an asynchronous iterable. + + Note: the corresponding iterator will be explicitely closed + when leaving the context manager.""" + return streamcontext(ait) + + +@operator +def iterate(it: AsyncIterable[T] | Iterable[T]) -> AsyncIterator[T]: + """Generate values from a sychronous or asynchronous iterable.""" + if isinstance(it, AsyncIterable): + return from_async_iterable.raw(it) + if isinstance(it, Iterable): + return from_iterable.raw(it) + raise TypeError(f"{type(it).__name__!r} object is not (async) iterable") + + +@operator +async def preserve(ait: AsyncIterable[T]) -> AsyncIterator[T]: + """Generate values from an asynchronous iterable without + explicitly closing the corresponding iterator.""" + async for item in ait: + yield item + + +# Simple operators + + +@operator +async def just(value: T) -> AsyncIterator[T]: + """Await if possible, and generate a single value.""" + if inspect.isawaitable(value): + yield await value + else: + yield value + + +Y = TypeVar("Y", covariant=True) + + +class SyncCallable(Protocol[P, Y]): + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Y: + ... + + +class AsyncCallable(Protocol[P, Y]): + def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[Y]: + ... + + +@operator +async def call( + func: SyncCallable[P, T] | AsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs +) -> AsyncIterator[T]: + """Call the given function and generate a single value. + + Await if the provided function is asynchronous. + """ + if asyncio.iscoroutinefunction(func): + async_func = cast("AsyncCallable[P, T]", func) + yield await async_func(*args, **kwargs) + else: + sync_func = cast("SyncCallable[P, T]", func) + yield sync_func(*args, **kwargs) + + +@operator +async def throw(exc: Exception) -> AsyncIterator[None]: + """Throw an exception without generating any value.""" + if False: + yield + raise exc + + +@operator +async def empty() -> AsyncIterator[None]: + """Terminate without generating any value.""" + if False: + yield + + +@operator +async def never() -> AsyncIterator[None]: + """Hang forever without generating any value.""" + if False: + yield + future: asyncio.Future[None] = asyncio.Future() + try: + await future + finally: + future.cancel() + + +@operator +def repeat( + value: T, times: int | None = None, *, interval: float = 0.0 +) -> AsyncIterator[T]: + """Generate the same value a given number of times. + + If ``times`` is ``None``, the value is repeated indefinitely. + An optional interval can be given to space the values out. + """ + args = () if times is None else (times,) + it = itertools.repeat(value, *args) + agen = from_iterable.raw(it) + return time.spaceout.raw(agen, interval) if interval else agen + + +# Counting operators + + +@operator +def range(*args: int, interval: float = 0.0) -> AsyncIterator[int]: + """Generate a given range of numbers. + + It supports the same arguments as the builtin function. + An optional interval can be given to space the values out. + """ + agen = from_iterable.raw(builtins.range(*args)) + return time.spaceout.raw(agen, interval) if interval else agen + + +@operator +def count( + start: int = 0, step: int = 1, *, interval: float = 0.0 +) -> AsyncIterator[int]: + """Generate consecutive numbers indefinitely. + + Optional starting point and increment can be defined, + respectively defaulting to ``0`` and ``1``. + + An optional interval can be given to space the values out. + """ + agen = from_iterable.raw(itertools.count(start, step)) + return time.spaceout.raw(agen, interval) if interval else agen diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py new file mode 100644 index 00000000..1be834e6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py @@ -0,0 +1,83 @@ +"""Extra operators.""" +from __future__ import annotations + +import asyncio +import builtins + +from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any + +from .combine import amap, smap +from ..core import pipable_operator + +__all__ = ["action", "print"] + + +T = TypeVar("T") + + +@pipable_operator +def action( + source: AsyncIterable[T], + func: Callable[[T], Awaitable[Any] | Any], + ordered: bool = True, + task_limit: int | None = None, +) -> AsyncIterator[T]: + """Perform an action for each element of an asynchronous sequence + without modifying it. + + The given function can be synchronous or asynchronous. + + The results can either be returned in or out of order, depending on + the corresponding ``ordered`` argument. This argument is ignored if the + provided function is synchronous. + + The coroutines run concurrently but their amount can be limited using + the ``task_limit`` argument. A value of ``1`` will cause the coroutines + to run sequentially. This argument is ignored if the provided function + is synchronous. + """ + if asyncio.iscoroutinefunction(func): + + async def ainnerfunc(arg: T, *_: object) -> T: + awaitable = func(arg) + assert isinstance(awaitable, Awaitable) + await awaitable + return arg + + return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit) + + else: + + def innerfunc(arg: T, *_: object) -> T: + func(arg) + return arg + + return smap.raw(source, innerfunc) + + +@pipable_operator +def print( + source: AsyncIterable[T], + template: str = "{}", + sep: str = " ", + end: str = "\n", + file: Any | None = None, + flush: bool = False, +) -> AsyncIterator[T]: + """Print each element of an asynchronous sequence without modifying it. + + An optional template can be provided to be formatted with the elements. + All the keyword arguments are forwarded to the builtin function print. + """ + + def func(value: T) -> None: + string = template.format(value) + builtins.print( + string, + sep=sep, + end=end, + file=file, + flush=flush, + ) + + return action.raw(source, func) diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/select.py b/.venv/lib/python3.12/site-packages/aiostream/stream/select.py new file mode 100644 index 00000000..9390f464 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/select.py @@ -0,0 +1,284 @@ +"""Selection operators.""" +from __future__ import annotations + +import asyncio +import builtins +import collections + +from typing import Awaitable, Callable, TypeVar, AsyncIterable, AsyncIterator + +from . import transform +from ..aiter_utils import aiter, anext +from ..core import streamcontext, pipable_operator + +__all__ = [ + "take", + "takelast", + "skip", + "skiplast", + "getitem", + "filter", + "until", + "dropwhile", + "takewhile", +] + +T = TypeVar("T") + + +@pipable_operator +async def take(source: AsyncIterable[T], n: int) -> AsyncIterator[T]: + """Forward the first ``n`` elements from an asynchronous sequence. + + If ``n`` is negative, it simply terminates before iterating the source. + """ + enumerated = transform.enumerate.raw(source) + async with streamcontext(enumerated) as streamer: + if n <= 0: + return + async for i, item in streamer: + yield item + if i >= n - 1: + return + + +@pipable_operator +async def takelast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]: + """Forward the last ``n`` elements from an asynchronous sequence. + + If ``n`` is negative, it simply terminates after iterating the source. + + Note: it is required to reach the end of the source before the first + element is generated. + """ + queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0) + async with streamcontext(source) as streamer: + async for item in streamer: + queue.append(item) + for item in queue: + yield item + + +@pipable_operator +async def skip(source: AsyncIterable[T], n: int) -> AsyncIterator[T]: + """Forward an asynchronous sequence, skipping the first ``n`` elements. + + If ``n`` is negative, no elements are skipped. + """ + enumerated = transform.enumerate.raw(source) + async with streamcontext(enumerated) as streamer: + async for i, item in streamer: + if i >= n: + yield item + + +@pipable_operator +async def skiplast(source: AsyncIterable[T], n: int) -> AsyncIterator[T]: + """Forward an asynchronous sequence, skipping the last ``n`` elements. + + If ``n`` is negative, no elements are skipped. + + Note: it is required to reach the ``n+1`` th element of the source + before the first element is generated. + """ + queue: collections.deque[T] = collections.deque(maxlen=n if n > 0 else 0) + async with streamcontext(source) as streamer: + async for item in streamer: + if n <= 0: + yield item + continue + if len(queue) == n: + yield queue[0] + queue.append(item) + + +@pipable_operator +async def filterindex( + source: AsyncIterable[T], func: Callable[[int], bool] +) -> AsyncIterator[T]: + """Filter an asynchronous sequence using the index of the elements. + + The given function is synchronous, takes the index as an argument, + and returns ``True`` if the corresponding should be forwarded, + ``False`` otherwise. + """ + enumerated = transform.enumerate.raw(source) + async with streamcontext(enumerated) as streamer: + async for i, item in streamer: + if func(i): + yield item + + +@pipable_operator +def slice(source: AsyncIterable[T], *args: int) -> AsyncIterator[T]: + """Slice an asynchronous sequence. + + The arguments are the same as the builtin type slice. + + There are two limitations compare to regular slices: + - Positive stop index with negative start index is not supported + - Negative step is not supported + """ + s = builtins.slice(*args) + start, stop, step = s.start or 0, s.stop, s.step or 1 + aiterator = aiter(source) + # Filter the first items + if start < 0: + aiterator = takelast.raw(aiterator, abs(start)) + elif start > 0: + aiterator = skip.raw(aiterator, start) + # Filter the last items + if stop is not None: + if stop >= 0 and start < 0: + raise ValueError("Positive stop with negative start is not supported") + elif stop >= 0: + aiterator = take.raw(aiterator, stop - start) + else: + aiterator = skiplast.raw(aiterator, abs(stop)) + # Filter step items + if step is not None: + if step > 1: + aiterator = filterindex.raw(aiterator, lambda i: i % step == 0) + elif step < 0: + raise ValueError("Negative step not supported") + # Return + return aiterator + + +@pipable_operator +async def item(source: AsyncIterable[T], index: int) -> AsyncIterator[T]: + """Forward the ``n``th element of an asynchronous sequence. + + The index can be negative and works like regular indexing. + If the index is out of range, and ``IndexError`` is raised. + """ + # Prepare + if index >= 0: + source = skip.raw(source, index) + else: + source = takelast(source, abs(index)) + async with streamcontext(source) as streamer: + # Get first item + try: + result = await anext(streamer) + except StopAsyncIteration: + raise IndexError("Index out of range") + # Check length + if index < 0: + count = 1 + async for _ in streamer: + count += 1 + if count != abs(index): + raise IndexError("Index out of range") + # Yield result + yield result + + +@pipable_operator +def getitem(source: AsyncIterable[T], index: int | builtins.slice) -> AsyncIterator[T]: + """Forward one or several items from an asynchronous sequence. + + The argument can either be a slice or an integer. + See the slice and item operators for more information. + """ + if isinstance(index, builtins.slice): + return slice.raw(source, index.start, index.stop, index.step) + if isinstance(index, int): + return item.raw(source, index) + raise TypeError("Not a valid index (int or slice)") + + +@pipable_operator +async def filter( + source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]] +) -> AsyncIterator[T]: + """Filter an asynchronous sequence using an arbitrary function. + + The function takes the item as an argument and returns ``True`` + if it should be forwarded, ``False`` otherwise. + The function can either be synchronous or asynchronous. + """ + iscorofunc = asyncio.iscoroutinefunction(func) + async with streamcontext(source) as streamer: + async for item in streamer: + result = func(item) + if iscorofunc: + assert isinstance(result, Awaitable) + result = await result + if result: + yield item + + +@pipable_operator +async def until( + source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]] +) -> AsyncIterator[T]: + """Forward an asynchronous sequence until a condition is met. + + Contrary to the ``takewhile`` operator, the last tested element is included + in the sequence. + + The given function takes the item as an argument and returns a boolean + corresponding to the condition to meet. The function can either be + synchronous or asynchronous. + """ + iscorofunc = asyncio.iscoroutinefunction(func) + async with streamcontext(source) as streamer: + async for item in streamer: + result = func(item) + if iscorofunc: + assert isinstance(result, Awaitable) + result = await result + yield item + if result: + return + + +@pipable_operator +async def takewhile( + source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]] +) -> AsyncIterator[T]: + """Forward an asynchronous sequence while a condition is met. + + Contrary to the ``until`` operator, the last tested element is not included + in the sequence. + + The given function takes the item as an argument and returns a boolean + corresponding to the condition to meet. The function can either be + synchronous or asynchronous. + """ + iscorofunc = asyncio.iscoroutinefunction(func) + async with streamcontext(source) as streamer: + async for item in streamer: + result = func(item) + if iscorofunc: + assert isinstance(result, Awaitable) + result = await result + if not result: + return + yield item + + +@pipable_operator +async def dropwhile( + source: AsyncIterable[T], func: Callable[[T], bool | Awaitable[bool]] +) -> AsyncIterator[T]: + """Discard the elements from an asynchronous sequence + while a condition is met. + + The given function takes the item as an argument and returns a boolean + corresponding to the condition to meet. The function can either be + synchronous or asynchronous. + """ + iscorofunc = asyncio.iscoroutinefunction(func) + async with streamcontext(source) as streamer: + async for item in streamer: + result = func(item) + if iscorofunc: + assert isinstance(result, Awaitable) + result = await result + if not result: + yield item + break + async for item in streamer: + yield item diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/time.py b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py new file mode 100644 index 00000000..c9c0df88 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py @@ -0,0 +1,56 @@ +"""Time-specific operators.""" +from __future__ import annotations +import asyncio + +from ..aiter_utils import anext +from ..core import streamcontext, pipable_operator + +from typing import TypeVar, AsyncIterable, AsyncIterator + +__all__ = ["spaceout", "delay", "timeout"] + + +T = TypeVar("T") + + +@pipable_operator +async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]: + """Make sure the elements of an asynchronous sequence are separated + in time by the given interval. + """ + timeout = 0.0 + loop = asyncio.get_event_loop() + async with streamcontext(source) as streamer: + async for item in streamer: + delta = timeout - loop.time() + delay = delta if delta > 0 else 0.0 + await asyncio.sleep(delay) + yield item + timeout = loop.time() + interval + + +@pipable_operator +async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]: + """Raise a time-out if an element of the asynchronous sequence + takes too long to arrive. + + Note: the timeout is not global but specific to each step of + the iteration. + """ + async with streamcontext(source) as streamer: + while True: + try: + item = await asyncio.wait_for(anext(streamer), timeout) + except StopAsyncIteration: + break + else: + yield item + + +@pipable_operator +async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]: + """Delay the iteration of an asynchronous sequence.""" + await asyncio.sleep(delay) + async with streamcontext(source) as streamer: + async for item in streamer: + yield item diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py b/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py new file mode 100644 index 00000000..f11bffa6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/transform.py @@ -0,0 +1,128 @@ +"""Transformation operators.""" + +from __future__ import annotations + +import asyncio +import itertools +from typing import ( + Protocol, + TypeVar, + AsyncIterable, + AsyncIterator, + Awaitable, + cast, +) + +from ..core import streamcontext, pipable_operator + +from . import select +from . import create +from . import aggregate +from .combine import map, amap, smap + +__all__ = ["map", "enumerate", "starmap", "cycle", "chunks"] + +# map, amap and smap are also transform operators +map, amap, smap + +T = TypeVar("T") +U = TypeVar("U") + + +@pipable_operator +async def enumerate( + source: AsyncIterable[T], start: int = 0, step: int = 1 +) -> AsyncIterator[tuple[int, T]]: + """Generate ``(index, value)`` tuples from an asynchronous sequence. + + This index is computed using a starting point and an increment, + respectively defaulting to ``0`` and ``1``. + """ + count = itertools.count(start, step) + async with streamcontext(source) as streamer: + async for item in streamer: + yield next(count), item + + +X = TypeVar("X", contravariant=True) +Y = TypeVar("Y", covariant=True) + + +class AsyncStarmapCallable(Protocol[X, Y]): + def __call__(self, arg: X, /, *args: X) -> Awaitable[Y]: + ... + + +class SyncStarmapCallable(Protocol[X, Y]): + def __call__(self, arg: X, /, *args: X) -> Y: + ... + + +@pipable_operator +def starmap( + source: AsyncIterable[tuple[T, ...]], + func: SyncStarmapCallable[T, U] | AsyncStarmapCallable[T, U], + ordered: bool = True, + task_limit: int | None = None, +) -> AsyncIterator[U]: + """Apply a given function to the unpacked elements of + an asynchronous sequence. + + Each element is unpacked before applying the function. + The given function can either be synchronous or asynchronous. + + The results can either be returned in or out of order, depending on + the corresponding ``ordered`` argument. This argument is ignored if + the provided function is synchronous. + + The coroutines run concurrently but their amount can be limited using + the ``task_limit`` argument. A value of ``1`` will cause the coroutines + to run sequentially. This argument is ignored if the provided function + is synchronous. + """ + if asyncio.iscoroutinefunction(func): + async_func = cast("AsyncStarmapCallable[T, U]", func) + + async def astarfunc(args: tuple[T, ...], *_: object) -> U: + awaitable = async_func(*args) + return await awaitable + + return amap.raw(source, astarfunc, ordered=ordered, task_limit=task_limit) + + else: + sync_func = cast("SyncStarmapCallable[T, U]", func) + + def starfunc(args: tuple[T, ...], *_: object) -> U: + return sync_func(*args) + + return smap.raw(source, starfunc) + + +@pipable_operator +async def cycle(source: AsyncIterable[T]) -> AsyncIterator[T]: + """Iterate indefinitely over an asynchronous sequence. + + Note: it does not perform any buffering, but re-iterate over + the same given sequence instead. If the sequence is not + re-iterable, the generator might end up looping indefinitely + without yielding any item. + """ + while True: + async with streamcontext(source) as streamer: + async for item in streamer: + yield item + # Prevent blocking while loop if the stream is empty + await asyncio.sleep(0) + + +@pipable_operator +async def chunks(source: AsyncIterable[T], n: int) -> AsyncIterator[list[T]]: + """Generate chunks of size ``n`` from an asynchronous sequence. + + The chunks are lists, and the last chunk might contain less than ``n`` + elements. + """ + async with streamcontext(source) as streamer: + async for first in streamer: + xs = select.take(create.preserve(streamer), n - 1) + yield [first] + await aggregate.list(xs) diff --git a/.venv/lib/python3.12/site-packages/aiostream/test_utils.py b/.venv/lib/python3.12/site-packages/aiostream/test_utils.py new file mode 100644 index 00000000..56761d2e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/test_utils.py @@ -0,0 +1,173 @@ +"""Utilities for testing stream operators.""" +from __future__ import annotations + +import asyncio +from unittest.mock import Mock +from contextlib import contextmanager + +import pytest + +from .core import StreamEmpty, streamcontext, pipable_operator +from typing import TYPE_CHECKING, Any, Callable, List + +if TYPE_CHECKING: + from _pytest.fixtures import SubRequest + from aiostream.core import Stream + +__all__ = ["add_resource", "assert_run", "event_loop"] + + +@pipable_operator +async def add_resource(source, cleanup_time): + """Simulate an open resource in a stream operator.""" + try: + loop = asyncio.get_event_loop() + loop.open_resources += 1 + loop.resources += 1 + async with streamcontext(source) as streamer: + async for item in streamer: + yield item + finally: + try: + await asyncio.sleep(cleanup_time) + finally: + loop.open_resources -= 1 + + +def compare_exceptions( + exc1: Exception, + exc2: Exception, +) -> bool: + """Compare two exceptions together.""" + return exc1 == exc2 or exc1.__class__ == exc2.__class__ and exc1.args == exc2.args + + +async def assert_aiter( + source: Stream, + values: List[Any], + exception: Exception | None = None, +) -> None: + """Check the results of a stream using a streamcontext.""" + results = [] + exception_type = (type(exception),) if exception else () + try: + async with streamcontext(source) as streamer: + async for item in streamer: + results.append(item) + except exception_type as exc: + assert exception is not None + assert compare_exceptions(exc, exception) + else: + assert exception is None + assert results == values + + +async def assert_await( + source: Stream, + values: List[Any], + exception: Exception | None = None, +) -> None: + """Check the results of a stream using by awaiting it.""" + exception_type = (type(exception),) if exception else () + try: + result = await source + except StreamEmpty: + assert values == [] + assert exception is None + except exception_type as exc: + assert exception is not None + assert compare_exceptions(exc, exception) + else: + assert result == values[-1] + assert exception is None + + +@pytest.fixture(params=[assert_aiter, assert_await], ids=["aiter", "await"]) +def assert_run(request: SubRequest) -> Callable: + """Parametrized fixture returning a stream runner.""" + return request.param + + +@pytest.fixture +def event_loop(): + """Fixture providing a test event loop. + + The event loop simulate and records the sleep operation, + available as event_loop.steps + + It also tracks simulated resources and make sure they are + all released before the loop is closed. + """ + + class TimeTrackingTestLoop(asyncio.BaseEventLoop): + stuck_threshold = 100 + + def __init__(self): + super().__init__() + self._time = 0 + self._timers = [] + self._selector = Mock() + self.clear() + + # Loop internals + + def _run_once(self): + super()._run_once() + # Update internals + self.busy_count += 1 + self._timers = sorted(when for when in self._timers if when > loop.time()) + # Time advance + if self.time_to_go: + when = self._timers.pop(0) + step = when - loop.time() + self.steps.append(step) + self.advance_time(step) + self.busy_count = 0 + + def _process_events(self, event_list): + return + + def _write_to_self(self): + return + + # Time management + + def time(self): + return self._time + + def advance_time(self, advance): + if advance: + self._time += advance + + def call_at(self, when, callback, *args, **kwargs): + self._timers.append(when) + return super().call_at(when, callback, *args, **kwargs) + + @property + def stuck(self): + return self.busy_count > self.stuck_threshold + + @property + def time_to_go(self): + return self._timers and (self.stuck or not self._ready) + + # Resource management + + def clear(self): + self.steps = [] + self.open_resources = 0 + self.resources = 0 + self.busy_count = 0 + + @contextmanager + def assert_cleanup(self): + self.clear() + yield self + assert self.open_resources == 0 + self.clear() + + loop = TimeTrackingTestLoop() + asyncio.set_event_loop(loop) + with loop.assert_cleanup(): + yield loop + loop.close() |