1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
"""Transformation operators."""
from __future__ import annotations
import asyncio
import itertools
from typing import (
Protocol,
TypeVar,
AsyncIterable,
AsyncIterator,
Awaitable,
cast,
)
from ..core import streamcontext, pipable_operator
from . import select
from . import create
from . import aggregate
from .combine import map, amap, smap
__all__ = ["map", "enumerate", "starmap", "cycle", "chunks"]
# map, amap and smap are also transform operators
map, amap, smap
T = TypeVar("T")
U = TypeVar("U")
@pipable_operator
async def enumerate(
source: AsyncIterable[T], start: int = 0, step: int = 1
) -> AsyncIterator[tuple[int, T]]:
"""Generate ``(index, value)`` tuples from an asynchronous sequence.
This index is computed using a starting point and an increment,
respectively defaulting to ``0`` and ``1``.
"""
count = itertools.count(start, step)
async with streamcontext(source) as streamer:
async for item in streamer:
yield next(count), item
X = TypeVar("X", contravariant=True)
Y = TypeVar("Y", covariant=True)
class AsyncStarmapCallable(Protocol[X, Y]):
def __call__(self, arg: X, /, *args: X) -> Awaitable[Y]:
...
class SyncStarmapCallable(Protocol[X, Y]):
def __call__(self, arg: X, /, *args: X) -> Y:
...
@pipable_operator
def starmap(
source: AsyncIterable[tuple[T, ...]],
func: SyncStarmapCallable[T, U] | AsyncStarmapCallable[T, U],
ordered: bool = True,
task_limit: int | None = None,
) -> AsyncIterator[U]:
"""Apply a given function to the unpacked elements of
an asynchronous sequence.
Each element is unpacked before applying the function.
The given function can either be synchronous or asynchronous.
The results can either be returned in or out of order, depending on
the corresponding ``ordered`` argument. This argument is ignored if
the provided function is synchronous.
The coroutines run concurrently but their amount can be limited using
the ``task_limit`` argument. A value of ``1`` will cause the coroutines
to run sequentially. This argument is ignored if the provided function
is synchronous.
"""
if asyncio.iscoroutinefunction(func):
async_func = cast("AsyncStarmapCallable[T, U]", func)
async def astarfunc(args: tuple[T, ...], *_: object) -> U:
awaitable = async_func(*args)
return await awaitable
return amap.raw(source, astarfunc, ordered=ordered, task_limit=task_limit)
else:
sync_func = cast("SyncStarmapCallable[T, U]", func)
def starfunc(args: tuple[T, ...], *_: object) -> U:
return sync_func(*args)
return smap.raw(source, starfunc)
@pipable_operator
async def cycle(source: AsyncIterable[T]) -> AsyncIterator[T]:
"""Iterate indefinitely over an asynchronous sequence.
Note: it does not perform any buffering, but re-iterate over
the same given sequence instead. If the sequence is not
re-iterable, the generator might end up looping indefinitely
without yielding any item.
"""
while True:
async with streamcontext(source) as streamer:
async for item in streamer:
yield item
# Prevent blocking while loop if the stream is empty
await asyncio.sleep(0)
@pipable_operator
async def chunks(source: AsyncIterable[T], n: int) -> AsyncIterator[list[T]]:
"""Generate chunks of size ``n`` from an asynchronous sequence.
The chunks are lists, and the last chunk might contain less than ``n``
elements.
"""
async with streamcontext(source) as streamer:
async for first in streamer:
xs = select.take(create.preserve(streamer), n - 1)
yield [first] + await aggregate.list(xs)
|