aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
blob: 1be834e6e0c7c6857ad9404728a2b258c44b3b10 (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""Extra operators."""
from __future__ import annotations

import asyncio
import builtins

from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any

from .combine import amap, smap
from ..core import pipable_operator

__all__ = ["action", "print"]


T = TypeVar("T")


@pipable_operator
def action(
    source: AsyncIterable[T],
    func: Callable[[T], Awaitable[Any] | Any],
    ordered: bool = True,
    task_limit: int | None = None,
) -> AsyncIterator[T]:
    """Perform an action for each element of an asynchronous sequence
    without modifying it.

    The given function can be synchronous or asynchronous.

    The results can either be returned in or out of order, depending on
    the corresponding ``ordered`` argument. This argument is ignored if the
    provided function is synchronous.

    The coroutines run concurrently but their amount can be limited using
    the ``task_limit`` argument. A value of ``1`` will cause the coroutines
    to run sequentially. This argument is ignored if the provided function
    is synchronous.
    """
    if asyncio.iscoroutinefunction(func):

        async def ainnerfunc(arg: T, *_: object) -> T:
            awaitable = func(arg)
            assert isinstance(awaitable, Awaitable)
            await awaitable
            return arg

        return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit)

    else:

        def innerfunc(arg: T, *_: object) -> T:
            func(arg)
            return arg

        return smap.raw(source, innerfunc)


@pipable_operator
def print(
    source: AsyncIterable[T],
    template: str = "{}",
    sep: str = " ",
    end: str = "\n",
    file: Any | None = None,
    flush: bool = False,
) -> AsyncIterator[T]:
    """Print each element of an asynchronous sequence without modifying it.

    An optional template can be provided to be formatted with the elements.
    All the keyword arguments are forwarded to the builtin function print.
    """

    def func(value: T) -> None:
        string = template.format(value)
        builtins.print(
            string,
            sep=sep,
            end=end,
            file=file,
            flush=flush,
        )

    return action.raw(source, func)