aboutsummaryrefslogtreecommitdiff
"""Aggregation operators."""
from __future__ import annotations

import asyncio
import builtins
import operator as op
from typing import AsyncIterator, Awaitable, Callable, TypeVar, AsyncIterable, cast


from . import select
from ..aiter_utils import anext
from ..core import pipable_operator, streamcontext

__all__ = ["accumulate", "reduce", "list"]

T = TypeVar("T")


@pipable_operator
async def accumulate(
    source: AsyncIterable[T],
    func: Callable[[T, T], Awaitable[T] | T] = op.add,
    initializer: T | None = None,
) -> AsyncIterator[T]:
    """Generate a series of accumulated sums (or other binary function)
    from an asynchronous sequence.

    If ``initializer`` is present, it is placed before the items
    of the sequence in the calculation, and serves as a default
    when the sequence is empty.
    """
    iscorofunc = asyncio.iscoroutinefunction(func)
    async with streamcontext(source) as streamer:
        # Initialize
        if initializer is None:
            try:
                value = await anext(streamer)
            except StopAsyncIteration:
                return
        else:
            value = initializer
        # First value
        yield value
        # Iterate streamer
        async for item in streamer:
            returned = func(value, item)
            if iscorofunc:
                awaitable_value = cast("Awaitable[T]", returned)
                value = await awaitable_value
            else:
                value = cast("T", returned)
            yield value


@pipable_operator
def reduce(
    source: AsyncIterable[T],
    func: Callable[[T, T], Awaitable[T] | T],
    initializer: T | None = None,
) -> AsyncIterator[T]:
    """Apply a function of two arguments cumulatively to the items
    of an asynchronous sequence, reducing the sequence to a single value.

    If ``initializer`` is present, it is placed before the items
    of the sequence in the calculation, and serves as a default when the
    sequence is empty.
    """
    acc = accumulate.raw(source, func, initializer)
    return select.item.raw(acc, -1)


@pipable_operator
async def list(source: AsyncIterable[T]) -> AsyncIterator[builtins.list[T]]:
    """Build a list from an asynchronous sequence.

    All the intermediate steps are generated, starting from the empty list.

    This operator can be used to easily convert a stream into a list::

        lst = await stream.list(x)

    ..note:: The same list object is produced at each step in order to avoid
    memory copies.
    """
    result: builtins.list[T] = []
    yield result
    async with streamcontext(source) as streamer:
        async for item in streamer:
            result.append(item)
            yield result