about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiostream/stream/time.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiostream/stream/time.py56
1 files changed, 56 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiostream/stream/time.py b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
new file mode 100644
index 00000000..c9c0df88
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
@@ -0,0 +1,56 @@
+"""Time-specific operators."""
+from __future__ import annotations
+import asyncio
+
+from ..aiter_utils import anext
+from ..core import streamcontext, pipable_operator
+
+from typing import TypeVar, AsyncIterable, AsyncIterator
+
+__all__ = ["spaceout", "delay", "timeout"]
+
+
+T = TypeVar("T")
+
+
+@pipable_operator
+async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]:
+    """Make sure the elements of an asynchronous sequence are separated
+    in time by the given interval.
+    """
+    timeout = 0.0
+    loop = asyncio.get_event_loop()
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            delta = timeout - loop.time()
+            delay = delta if delta > 0 else 0.0
+            await asyncio.sleep(delay)
+            yield item
+            timeout = loop.time() + interval
+
+
+@pipable_operator
+async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]:
+    """Raise a time-out if an element of the asynchronous sequence
+    takes too long to arrive.
+
+    Note: the timeout is not global but specific to each step of
+    the iteration.
+    """
+    async with streamcontext(source) as streamer:
+        while True:
+            try:
+                item = await asyncio.wait_for(anext(streamer), timeout)
+            except StopAsyncIteration:
+                break
+            else:
+                yield item
+
+
+@pipable_operator
+async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]:
+    """Delay the iteration of an asynchronous sequence."""
+    await asyncio.sleep(delay)
+    async with streamcontext(source) as streamer:
+        async for item in streamer:
+            yield item