From 4a52a71956a8d46fcb7294ac71734504bb09bcc2 Mon Sep 17 00:00:00 2001 From: S. Solomon Darnell Date: Fri, 28 Mar 2025 21:52:21 -0500 Subject: two version of R2R are here --- .../site-packages/aiostream/stream/time.py | 56 ++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 .venv/lib/python3.12/site-packages/aiostream/stream/time.py (limited to '.venv/lib/python3.12/site-packages/aiostream/stream/time.py') diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/time.py b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py new file mode 100644 index 00000000..c9c0df88 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py @@ -0,0 +1,56 @@ +"""Time-specific operators.""" +from __future__ import annotations +import asyncio + +from ..aiter_utils import anext +from ..core import streamcontext, pipable_operator + +from typing import TypeVar, AsyncIterable, AsyncIterator + +__all__ = ["spaceout", "delay", "timeout"] + + +T = TypeVar("T") + + +@pipable_operator +async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]: + """Make sure the elements of an asynchronous sequence are separated + in time by the given interval. + """ + timeout = 0.0 + loop = asyncio.get_event_loop() + async with streamcontext(source) as streamer: + async for item in streamer: + delta = timeout - loop.time() + delay = delta if delta > 0 else 0.0 + await asyncio.sleep(delay) + yield item + timeout = loop.time() + interval + + +@pipable_operator +async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]: + """Raise a time-out if an element of the asynchronous sequence + takes too long to arrive. + + Note: the timeout is not global but specific to each step of + the iteration. + """ + async with streamcontext(source) as streamer: + while True: + try: + item = await asyncio.wait_for(anext(streamer), timeout) + except StopAsyncIteration: + break + else: + yield item + + +@pipable_operator +async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]: + """Delay the iteration of an asynchronous sequence.""" + await asyncio.sleep(delay) + async with streamcontext(source) as streamer: + async for item in streamer: + yield item -- cgit v1.2.3