aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/aiostream/stream/time.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiostream/stream/time.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/time.py56
1 files changed, 56 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/time.py b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
new file mode 100644
index 00000000..c9c0df88
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
@@ -0,0 +1,56 @@
+"""Time-specific operators."""
+from __future__ import annotations
+import asyncio
+
+from ..aiter_utils import anext
+from ..core import streamcontext, pipable_operator
+
+from typing import TypeVar, AsyncIterable, AsyncIterator
+
+__all__ = ["spaceout", "delay", "timeout"]
+
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]:
+ """Make sure the elements of an asynchronous sequence are separated
+ in time by the given interval.
+ """
+ timeout = 0.0
+ loop = asyncio.get_event_loop()
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ delta = timeout - loop.time()
+ delay = delta if delta > 0 else 0.0
+ await asyncio.sleep(delay)
+ yield item
+ timeout = loop.time() + interval
+
+
+@pipable_operator
+async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]:
+ """Raise a time-out if an element of the asynchronous sequence
+ takes too long to arrive.
+
+ Note: the timeout is not global but specific to each step of
+ the iteration.
+ """
+ async with streamcontext(source) as streamer:
+ while True:
+ try:
+ item = await asyncio.wait_for(anext(streamer), timeout)
+ except StopAsyncIteration:
+ break
+ else:
+ yield item
+
+
+@pipable_operator
+async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]:
+ """Delay the iteration of an asynchronous sequence."""
+ await asyncio.sleep(delay)
+ async with streamcontext(source) as streamer:
+ async for item in streamer:
+ yield item