aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/aiostream/stream/time.py
blob: c9c0df88a309f2e604a46ccd6bd690a14f0c41fd (about) (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
"""Time-specific operators."""
from __future__ import annotations
import asyncio

from ..aiter_utils import anext
from ..core import streamcontext, pipable_operator

from typing import TypeVar, AsyncIterable, AsyncIterator

__all__ = ["spaceout", "delay", "timeout"]


T = TypeVar("T")


@pipable_operator
async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]:
    """Make sure the elements of an asynchronous sequence are separated
    in time by the given interval.
    """
    timeout = 0.0
    loop = asyncio.get_event_loop()
    async with streamcontext(source) as streamer:
        async for item in streamer:
            delta = timeout - loop.time()
            delay = delta if delta > 0 else 0.0
            await asyncio.sleep(delay)
            yield item
            timeout = loop.time() + interval


@pipable_operator
async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]:
    """Raise a time-out if an element of the asynchronous sequence
    takes too long to arrive.

    Note: the timeout is not global but specific to each step of
    the iteration.
    """
    async with streamcontext(source) as streamer:
        while True:
            try:
                item = await asyncio.wait_for(anext(streamer), timeout)
            except StopAsyncIteration:
                break
            else:
                yield item


@pipable_operator
async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]:
    """Delay the iteration of an asynchronous sequence."""
    await asyncio.sleep(delay)
    async with streamcontext(source) as streamer:
        async for item in streamer:
            yield item