aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/aiostream/stream/aggregate.py
blob: 8ed7c0e15f0d3c6d1f9e34f81fe5a7760ffd87b9 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
"""Aggregation operators."""
from __future__ import annotations

import asyncio
import builtins
import operator as op
from typing import AsyncIterator, Awaitable, Callable, TypeVar, AsyncIterable, cast


from . import select
from ..aiter_utils import anext
from ..core import pipable_operator, streamcontext

__all__ = ["accumulate", "reduce", "list"]

T = TypeVar("T")


@pipable_operator
async def accumulate(
    source: AsyncIterable[T],
    func: Callable[[T, T], Awaitable[T] | T] = op.add,
    initializer: T | None = None,
) -> AsyncIterator[T]:
    """Generate a series of accumulated sums (or other binary function)
    from an asynchronous sequence.

    If ``initializer`` is present, it is placed before the items
    of the sequence in the calculation, and serves as a default
    when the sequence is empty.
    """
    iscorofunc = asyncio.iscoroutinefunction(func)
    async with streamcontext(source) as streamer:
        # Initialize
        if initializer is None:
            try:
                value = await anext(streamer)
            except StopAsyncIteration:
                return
        else:
            value = initializer
        # First value
        yield value
        # Iterate streamer
        async for item in streamer:
            returned = func(value, item)
            if iscorofunc:
                awaitable_value = cast("Awaitable[T]", returned)
                value = await awaitable_value
            else:
                value = cast("T", returned)
            yield value


@pipable_operator
def reduce(
    source: AsyncIterable[T],
    func: Callable[[T, T], Awaitable[T] | T],
    initializer: T | None = None,
) -> AsyncIterator[T]:
    """Apply a function of two arguments cumulatively to the items
    of an asynchronous sequence, reducing the sequence to a single value.

    If ``initializer`` is present, it is placed before the items
    of the sequence in the calculation, and serves as a default when the
    sequence is empty.
    """
    acc = accumulate.raw(source, func, initializer)
    return select.item.raw(acc, -1)


@pipable_operator
async def list(source: AsyncIterable[T]) -> AsyncIterator[builtins.list[T]]:
    """Build a list from an asynchronous sequence.

    All the intermediate steps are generated, starting from the empty list.

    This operator can be used to easily convert a stream into a list::

        lst = await stream.list(x)

    ..note:: The same list object is produced at each step in order to avoid
    memory copies.
    """
    result: builtins.list[T] = []
    yield result
    async with streamcontext(source) as streamer:
        async for item in streamer:
            result.append(item)
            yield result