diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/aiostream/stream/time.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiostream/stream/time.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/aiostream/stream/time.py | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/time.py b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py new file mode 100644 index 00000000..c9c0df88 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py @@ -0,0 +1,56 @@ +"""Time-specific operators.""" +from __future__ import annotations +import asyncio + +from ..aiter_utils import anext +from ..core import streamcontext, pipable_operator + +from typing import TypeVar, AsyncIterable, AsyncIterator + +__all__ = ["spaceout", "delay", "timeout"] + + +T = TypeVar("T") + + +@pipable_operator +async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]: + """Make sure the elements of an asynchronous sequence are separated + in time by the given interval. + """ + timeout = 0.0 + loop = asyncio.get_event_loop() + async with streamcontext(source) as streamer: + async for item in streamer: + delta = timeout - loop.time() + delay = delta if delta > 0 else 0.0 + await asyncio.sleep(delay) + yield item + timeout = loop.time() + interval + + +@pipable_operator +async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]: + """Raise a time-out if an element of the asynchronous sequence + takes too long to arrive. + + Note: the timeout is not global but specific to each step of + the iteration. + """ + async with streamcontext(source) as streamer: + while True: + try: + item = await asyncio.wait_for(anext(streamer), timeout) + except StopAsyncIteration: + break + else: + yield item + + +@pipable_operator +async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]: + """Delay the iteration of an asynchronous sequence.""" + await asyncio.sleep(delay) + async with streamcontext(source) as streamer: + async for item in streamer: + yield item |