1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
|
"""Extra operators."""
from __future__ import annotations
import asyncio
import builtins
from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any
from .combine import amap, smap
from ..core import pipable_operator
__all__ = ["action", "print"]
T = TypeVar("T")
@pipable_operator
def action(
source: AsyncIterable[T],
func: Callable[[T], Awaitable[Any] | Any],
ordered: bool = True,
task_limit: int | None = None,
) -> AsyncIterator[T]:
"""Perform an action for each element of an asynchronous sequence
without modifying it.
The given function can be synchronous or asynchronous.
The results can either be returned in or out of order, depending on
the corresponding ``ordered`` argument. This argument is ignored if the
provided function is synchronous.
The coroutines run concurrently but their amount can be limited using
the ``task_limit`` argument. A value of ``1`` will cause the coroutines
to run sequentially. This argument is ignored if the provided function
is synchronous.
"""
if asyncio.iscoroutinefunction(func):
async def ainnerfunc(arg: T, *_: object) -> T:
awaitable = func(arg)
assert isinstance(awaitable, Awaitable)
await awaitable
return arg
return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit)
else:
def innerfunc(arg: T, *_: object) -> T:
func(arg)
return arg
return smap.raw(source, innerfunc)
@pipable_operator
def print(
source: AsyncIterable[T],
template: str = "{}",
sep: str = " ",
end: str = "\n",
file: Any | None = None,
flush: bool = False,
) -> AsyncIterator[T]:
"""Print each element of an asynchronous sequence without modifying it.
An optional template can be provided to be formatted with the elements.
All the keyword arguments are forwarded to the builtin function print.
"""
def func(value: T) -> None:
string = template.format(value)
builtins.print(
string,
sep=sep,
end=end,
file=file,
flush=flush,
)
return action.raw(source, func)
|