1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
from __future__ import annotations
import json
from typing_extensions import Generic, TypeVar, Iterator, AsyncIterator
import httpx
from .._models import construct_type_unchecked
_T = TypeVar("_T")
class JSONLDecoder(Generic[_T]):
"""A decoder for [JSON Lines](https://jsonlines.org) format.
This class provides an iterator over a byte-iterator that parses each JSON Line
into a given type.
"""
http_response: httpx.Response
"""The HTTP response this decoder was constructed from"""
def __init__(
self,
*,
raw_iterator: Iterator[bytes],
line_type: type[_T],
http_response: httpx.Response,
) -> None:
super().__init__()
self.http_response = http_response
self._raw_iterator = raw_iterator
self._line_type = line_type
self._iterator = self.__decode__()
def close(self) -> None:
"""Close the response body stream.
This is called automatically if you consume the entire stream.
"""
self.http_response.close()
def __decode__(self) -> Iterator[_T]:
buf = b""
for chunk in self._raw_iterator:
for line in chunk.splitlines(keepends=True):
buf += line
if buf.endswith((b"\r", b"\n", b"\r\n")):
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
buf = b""
# flush
if buf:
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
def __next__(self) -> _T:
return self._iterator.__next__()
def __iter__(self) -> Iterator[_T]:
for item in self._iterator:
yield item
class AsyncJSONLDecoder(Generic[_T]):
"""A decoder for [JSON Lines](https://jsonlines.org) format.
This class provides an async iterator over a byte-iterator that parses each JSON Line
into a given type.
"""
http_response: httpx.Response
def __init__(
self,
*,
raw_iterator: AsyncIterator[bytes],
line_type: type[_T],
http_response: httpx.Response,
) -> None:
super().__init__()
self.http_response = http_response
self._raw_iterator = raw_iterator
self._line_type = line_type
self._iterator = self.__decode__()
async def close(self) -> None:
"""Close the response body stream.
This is called automatically if you consume the entire stream.
"""
await self.http_response.aclose()
async def __decode__(self) -> AsyncIterator[_T]:
buf = b""
async for chunk in self._raw_iterator:
for line in chunk.splitlines(keepends=True):
buf += line
if buf.endswith((b"\r", b"\n", b"\r\n")):
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
buf = b""
# flush
if buf:
yield construct_type_unchecked(
value=json.loads(buf),
type_=self._line_type,
)
async def __anext__(self) -> _T:
return await self._iterator.__anext__()
async def __aiter__(self) -> AsyncIterator[_T]:
async for item in self._iterator:
yield item
|