1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
"""Aggregation operators."""
from __future__ import annotations
import asyncio
import builtins
import operator as op
from typing import AsyncIterator, Awaitable, Callable, TypeVar, AsyncIterable, cast
from . import select
from ..aiter_utils import anext
from ..core import pipable_operator, streamcontext
__all__ = ["accumulate", "reduce", "list"]
T = TypeVar("T")
@pipable_operator
async def accumulate(
source: AsyncIterable[T],
func: Callable[[T, T], Awaitable[T] | T] = op.add,
initializer: T | None = None,
) -> AsyncIterator[T]:
"""Generate a series of accumulated sums (or other binary function)
from an asynchronous sequence.
If ``initializer`` is present, it is placed before the items
of the sequence in the calculation, and serves as a default
when the sequence is empty.
"""
iscorofunc = asyncio.iscoroutinefunction(func)
async with streamcontext(source) as streamer:
# Initialize
if initializer is None:
try:
value = await anext(streamer)
except StopAsyncIteration:
return
else:
value = initializer
# First value
yield value
# Iterate streamer
async for item in streamer:
returned = func(value, item)
if iscorofunc:
awaitable_value = cast("Awaitable[T]", returned)
value = await awaitable_value
else:
value = cast("T", returned)
yield value
@pipable_operator
def reduce(
source: AsyncIterable[T],
func: Callable[[T, T], Awaitable[T] | T],
initializer: T | None = None,
) -> AsyncIterator[T]:
"""Apply a function of two arguments cumulatively to the items
of an asynchronous sequence, reducing the sequence to a single value.
If ``initializer`` is present, it is placed before the items
of the sequence in the calculation, and serves as a default when the
sequence is empty.
"""
acc = accumulate.raw(source, func, initializer)
return select.item.raw(acc, -1)
@pipable_operator
async def list(source: AsyncIterable[T]) -> AsyncIterator[builtins.list[T]]:
"""Build a list from an asynchronous sequence.
All the intermediate steps are generated, starting from the empty list.
This operator can be used to easily convert a stream into a list::
lst = await stream.list(x)
..note:: The same list object is produced at each step in order to avoid
memory copies.
"""
result: builtins.list[T] = []
yield result
async with streamcontext(source) as streamer:
async for item in streamer:
result.append(item)
yield result
|