diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/anthropic/_decoders')
-rw-r--r-- | .venv/lib/python3.12/site-packages/anthropic/_decoders/jsonl.py | 123 |
1 files changed, 123 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/anthropic/_decoders/jsonl.py b/.venv/lib/python3.12/site-packages/anthropic/_decoders/jsonl.py new file mode 100644 index 00000000..ac5ac74f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/anthropic/_decoders/jsonl.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +import json +from typing_extensions import Generic, TypeVar, Iterator, AsyncIterator + +import httpx + +from .._models import construct_type_unchecked + +_T = TypeVar("_T") + + +class JSONLDecoder(Generic[_T]): + """A decoder for [JSON Lines](https://jsonlines.org) format. + + This class provides an iterator over a byte-iterator that parses each JSON Line + into a given type. + """ + + http_response: httpx.Response + """The HTTP response this decoder was constructed from""" + + def __init__( + self, + *, + raw_iterator: Iterator[bytes], + line_type: type[_T], + http_response: httpx.Response, + ) -> None: + super().__init__() + self.http_response = http_response + self._raw_iterator = raw_iterator + self._line_type = line_type + self._iterator = self.__decode__() + + def close(self) -> None: + """Close the response body stream. + + This is called automatically if you consume the entire stream. + """ + self.http_response.close() + + def __decode__(self) -> Iterator[_T]: + buf = b"" + for chunk in self._raw_iterator: + for line in chunk.splitlines(keepends=True): + buf += line + if buf.endswith((b"\r", b"\n", b"\r\n")): + yield construct_type_unchecked( + value=json.loads(buf), + type_=self._line_type, + ) + buf = b"" + + # flush + if buf: + yield construct_type_unchecked( + value=json.loads(buf), + type_=self._line_type, + ) + + def __next__(self) -> _T: + return self._iterator.__next__() + + def __iter__(self) -> Iterator[_T]: + for item in self._iterator: + yield item + + +class AsyncJSONLDecoder(Generic[_T]): + """A decoder for [JSON Lines](https://jsonlines.org) format. + + This class provides an async iterator over a byte-iterator that parses each JSON Line + into a given type. + """ + + http_response: httpx.Response + + def __init__( + self, + *, + raw_iterator: AsyncIterator[bytes], + line_type: type[_T], + http_response: httpx.Response, + ) -> None: + super().__init__() + self.http_response = http_response + self._raw_iterator = raw_iterator + self._line_type = line_type + self._iterator = self.__decode__() + + async def close(self) -> None: + """Close the response body stream. + + This is called automatically if you consume the entire stream. + """ + await self.http_response.aclose() + + async def __decode__(self) -> AsyncIterator[_T]: + buf = b"" + async for chunk in self._raw_iterator: + for line in chunk.splitlines(keepends=True): + buf += line + if buf.endswith((b"\r", b"\n", b"\r\n")): + yield construct_type_unchecked( + value=json.loads(buf), + type_=self._line_type, + ) + buf = b"" + + # flush + if buf: + yield construct_type_unchecked( + value=json.loads(buf), + type_=self._line_type, + ) + + async def __anext__(self) -> _T: + return await self._iterator.__anext__() + + async def __aiter__(self) -> AsyncIterator[_T]: + async for item in self._iterator: + yield item |