aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiostream/stream/misc.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/misc.py83
1 files changed, 83 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
new file mode 100644
index 00000000..1be834e6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/misc.py
@@ -0,0 +1,83 @@
+"""Extra operators."""
+from __future__ import annotations
+
+import asyncio
+import builtins
+
+from typing import TypeVar, Awaitable, Callable, AsyncIterable, AsyncIterator, Any
+
+from .combine import amap, smap
+from ..core import pipable_operator
+
+__all__ = ["action", "print"]
+
+
+T = TypeVar("T")
+
+
+@pipable_operator
+def action(
+ source: AsyncIterable[T],
+ func: Callable[[T], Awaitable[Any] | Any],
+ ordered: bool = True,
+ task_limit: int | None = None,
+) -> AsyncIterator[T]:
+ """Perform an action for each element of an asynchronous sequence
+ without modifying it.
+
+ The given function can be synchronous or asynchronous.
+
+ The results can either be returned in or out of order, depending on
+ the corresponding ``ordered`` argument. This argument is ignored if the
+ provided function is synchronous.
+
+ The coroutines run concurrently but their amount can be limited using
+ the ``task_limit`` argument. A value of ``1`` will cause the coroutines
+ to run sequentially. This argument is ignored if the provided function
+ is synchronous.
+ """
+ if asyncio.iscoroutinefunction(func):
+
+ async def ainnerfunc(arg: T, *_: object) -> T:
+ awaitable = func(arg)
+ assert isinstance(awaitable, Awaitable)
+ await awaitable
+ return arg
+
+ return amap.raw(source, ainnerfunc, ordered=ordered, task_limit=task_limit)
+
+ else:
+
+ def innerfunc(arg: T, *_: object) -> T:
+ func(arg)
+ return arg
+
+ return smap.raw(source, innerfunc)
+
+
+@pipable_operator
+def print(
+ source: AsyncIterable[T],
+ template: str = "{}",
+ sep: str = " ",
+ end: str = "\n",
+ file: Any | None = None,
+ flush: bool = False,
+) -> AsyncIterator[T]:
+ """Print each element of an asynchronous sequence without modifying it.
+
+ An optional template can be provided to be formatted with the elements.
+ All the keyword arguments are forwarded to the builtin function print.
+ """
+
+ def func(value: T) -> None:
+ string = template.format(value)
+ builtins.print(
+ string,
+ sep=sep,
+ end=end,
+ file=file,
+ flush=flush,
+ )
+
+ return action.raw(source, func)