blob: c9c0df88a309f2e604a46ccd6bd690a14f0c41fd (
about) (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
"""Time-specific operators."""
from __future__ import annotations
import asyncio
from ..aiter_utils import anext
from ..core import streamcontext, pipable_operator
from typing import TypeVar, AsyncIterable, AsyncIterator
__all__ = ["spaceout", "delay", "timeout"]
T = TypeVar("T")
@pipable_operator
async def spaceout(source: AsyncIterable[T], interval: float) -> AsyncIterator[T]:
"""Make sure the elements of an asynchronous sequence are separated
in time by the given interval.
"""
timeout = 0.0
loop = asyncio.get_event_loop()
async with streamcontext(source) as streamer:
async for item in streamer:
delta = timeout - loop.time()
delay = delta if delta > 0 else 0.0
await asyncio.sleep(delay)
yield item
timeout = loop.time() + interval
@pipable_operator
async def timeout(source: AsyncIterable[T], timeout: float) -> AsyncIterator[T]:
"""Raise a time-out if an element of the asynchronous sequence
takes too long to arrive.
Note: the timeout is not global but specific to each step of
the iteration.
"""
async with streamcontext(source) as streamer:
while True:
try:
item = await asyncio.wait_for(anext(streamer), timeout)
except StopAsyncIteration:
break
else:
yield item
@pipable_operator
async def delay(source: AsyncIterable[T], delay: float) -> AsyncIterator[T]:
"""Delay the iteration of an asynchronous sequence."""
await asyncio.sleep(delay)
async with streamcontext(source) as streamer:
async for item in streamer:
yield item
|