aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/aiostream/stream/create.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/aiostream/stream/create.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiostream/stream/create.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/create.py191
1 files changed, 191 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/create.py b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py
new file mode 100644
index 00000000..d7676b71
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/create.py
@@ -0,0 +1,191 @@
+"""Non-pipable creation operators."""
+from __future__ import annotations
+
+import sys
+import asyncio
+import inspect
+import builtins
+import itertools
+
+from typing import (
+ AsyncIterable,
+ Awaitable,
+ Iterable,
+ Protocol,
+ TypeVar,
+ AsyncIterator,
+ cast,
+)
+from typing_extensions import ParamSpec
+
+from ..stream import time
+from ..core import operator, streamcontext
+
+__all__ = [
+ "iterate",
+ "preserve",
+ "just",
+ "call",
+ "throw",
+ "empty",
+ "never",
+ "repeat",
+ "range",
+ "count",
+]
+
+T = TypeVar("T")
+P = ParamSpec("P")
+
+# Hack for python 3.8 compatibility
+if sys.version_info < (3, 9):
+ P = TypeVar("P")
+
+# Convert regular iterables
+
+
+@operator
+async def from_iterable(it: Iterable[T]) -> AsyncIterator[T]:
+ """Generate values from a regular iterable."""
+ for item in it:
+ await asyncio.sleep(0)
+ yield item
+
+
+@operator
+def from_async_iterable(ait: AsyncIterable[T]) -> AsyncIterator[T]:
+ """Generate values from an asynchronous iterable.
+
+ Note: the corresponding iterator will be explicitely closed
+ when leaving the context manager."""
+ return streamcontext(ait)
+
+
+@operator
+def iterate(it: AsyncIterable[T] | Iterable[T]) -> AsyncIterator[T]:
+ """Generate values from a sychronous or asynchronous iterable."""
+ if isinstance(it, AsyncIterable):
+ return from_async_iterable.raw(it)
+ if isinstance(it, Iterable):
+ return from_iterable.raw(it)
+ raise TypeError(f"{type(it).__name__!r} object is not (async) iterable")
+
+
+@operator
+async def preserve(ait: AsyncIterable[T]) -> AsyncIterator[T]:
+ """Generate values from an asynchronous iterable without
+ explicitly closing the corresponding iterator."""
+ async for item in ait:
+ yield item
+
+
+# Simple operators
+
+
+@operator
+async def just(value: T) -> AsyncIterator[T]:
+ """Await if possible, and generate a single value."""
+ if inspect.isawaitable(value):
+ yield await value
+ else:
+ yield value
+
+
+Y = TypeVar("Y", covariant=True)
+
+
+class SyncCallable(Protocol[P, Y]):
+ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Y:
+ ...
+
+
+class AsyncCallable(Protocol[P, Y]):
+ def __call__(self, *args: P.args, **kwargs: P.kwargs) -> Awaitable[Y]:
+ ...
+
+
+@operator
+async def call(
+ func: SyncCallable[P, T] | AsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs
+) -> AsyncIterator[T]:
+ """Call the given function and generate a single value.
+
+ Await if the provided function is asynchronous.
+ """
+ if asyncio.iscoroutinefunction(func):
+ async_func = cast("AsyncCallable[P, T]", func)
+ yield await async_func(*args, **kwargs)
+ else:
+ sync_func = cast("SyncCallable[P, T]", func)
+ yield sync_func(*args, **kwargs)
+
+
+@operator
+async def throw(exc: Exception) -> AsyncIterator[None]:
+ """Throw an exception without generating any value."""
+ if False:
+ yield
+ raise exc
+
+
+@operator
+async def empty() -> AsyncIterator[None]:
+ """Terminate without generating any value."""
+ if False:
+ yield
+
+
+@operator
+async def never() -> AsyncIterator[None]:
+ """Hang forever without generating any value."""
+ if False:
+ yield
+ future: asyncio.Future[None] = asyncio.Future()
+ try:
+ await future
+ finally:
+ future.cancel()
+
+
+@operator
+def repeat(
+ value: T, times: int | None = None, *, interval: float = 0.0
+) -> AsyncIterator[T]:
+ """Generate the same value a given number of times.
+
+ If ``times`` is ``None``, the value is repeated indefinitely.
+ An optional interval can be given to space the values out.
+ """
+ args = () if times is None else (times,)
+ it = itertools.repeat(value, *args)
+ agen = from_iterable.raw(it)
+ return time.spaceout.raw(agen, interval) if interval else agen
+
+
+# Counting operators
+
+
+@operator
+def range(*args: int, interval: float = 0.0) -> AsyncIterator[int]:
+ """Generate a given range of numbers.
+
+ It supports the same arguments as the builtin function.
+ An optional interval can be given to space the values out.
+ """
+ agen = from_iterable.raw(builtins.range(*args))
+ return time.spaceout.raw(agen, interval) if interval else agen
+
+
+@operator
+def count(
+ start: int = 0, step: int = 1, *, interval: float = 0.0
+) -> AsyncIterator[int]:
+ """Generate consecutive numbers indefinitely.
+
+ Optional starting point and increment can be defined,
+ respectively defaulting to ``0`` and ``1``.
+
+ An optional interval can be given to space the values out.
+ """
+ agen = from_iterable.raw(itertools.count(start, step))
+ return time.spaceout.raw(agen, interval) if interval else agen