aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/openai/_legacy_response.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/openai/_legacy_response.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/openai/_legacy_response.py')
-rw-r--r--.venv/lib/python3.12/site-packages/openai/_legacy_response.py488
1 files changed, 488 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/openai/_legacy_response.py b/.venv/lib/python3.12/site-packages/openai/_legacy_response.py
new file mode 100644
index 00000000..8880e5f1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/openai/_legacy_response.py
@@ -0,0 +1,488 @@
+from __future__ import annotations
+
+import os
+import inspect
+import logging
+import datetime
+import functools
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Union,
+ Generic,
+ TypeVar,
+ Callable,
+ Iterator,
+ AsyncIterator,
+ cast,
+ overload,
+)
+from typing_extensions import Awaitable, ParamSpec, override, deprecated, get_origin
+
+import anyio
+import httpx
+import pydantic
+
+from ._types import NoneType
+from ._utils import is_given, extract_type_arg, is_annotated_type, is_type_alias_type
+from ._models import BaseModel, is_basemodel, add_request_id
+from ._constants import RAW_RESPONSE_HEADER
+from ._streaming import Stream, AsyncStream, is_stream_class_type, extract_stream_chunk_type
+from ._exceptions import APIResponseValidationError
+
+if TYPE_CHECKING:
+ from ._models import FinalRequestOptions
+ from ._base_client import BaseClient
+
+
+P = ParamSpec("P")
+R = TypeVar("R")
+_T = TypeVar("_T")
+
+log: logging.Logger = logging.getLogger(__name__)
+
+
+class LegacyAPIResponse(Generic[R]):
+ """This is a legacy class as it will be replaced by `APIResponse`
+ and `AsyncAPIResponse` in the `_response.py` file in the next major
+ release.
+
+ For the sync client this will mostly be the same with the exception
+ of `content` & `text` will be methods instead of properties. In the
+ async client, all methods will be async.
+
+ A migration script will be provided & the migration in general should
+ be smooth.
+ """
+
+ _cast_to: type[R]
+ _client: BaseClient[Any, Any]
+ _parsed_by_type: dict[type[Any], Any]
+ _stream: bool
+ _stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None
+ _options: FinalRequestOptions
+
+ http_response: httpx.Response
+
+ retries_taken: int
+ """The number of retries made. If no retries happened this will be `0`"""
+
+ def __init__(
+ self,
+ *,
+ raw: httpx.Response,
+ cast_to: type[R],
+ client: BaseClient[Any, Any],
+ stream: bool,
+ stream_cls: type[Stream[Any]] | type[AsyncStream[Any]] | None,
+ options: FinalRequestOptions,
+ retries_taken: int = 0,
+ ) -> None:
+ self._cast_to = cast_to
+ self._client = client
+ self._parsed_by_type = {}
+ self._stream = stream
+ self._stream_cls = stream_cls
+ self._options = options
+ self.http_response = raw
+ self.retries_taken = retries_taken
+
+ @property
+ def request_id(self) -> str | None:
+ return self.http_response.headers.get("x-request-id") # type: ignore[no-any-return]
+
+ @overload
+ def parse(self, *, to: type[_T]) -> _T: ...
+
+ @overload
+ def parse(self) -> R: ...
+
+ def parse(self, *, to: type[_T] | None = None) -> R | _T:
+ """Returns the rich python representation of this response's data.
+
+ NOTE: For the async client: this will become a coroutine in the next major version.
+
+ For lower-level control, see `.read()`, `.json()`, `.iter_bytes()`.
+
+ You can customise the type that the response is parsed into through
+ the `to` argument, e.g.
+
+ ```py
+ from openai import BaseModel
+
+
+ class MyModel(BaseModel):
+ foo: str
+
+
+ obj = response.parse(to=MyModel)
+ print(obj.foo)
+ ```
+
+ We support parsing:
+ - `BaseModel`
+ - `dict`
+ - `list`
+ - `Union`
+ - `str`
+ - `int`
+ - `float`
+ - `httpx.Response`
+ """
+ cache_key = to if to is not None else self._cast_to
+ cached = self._parsed_by_type.get(cache_key)
+ if cached is not None:
+ return cached # type: ignore[no-any-return]
+
+ parsed = self._parse(to=to)
+ if is_given(self._options.post_parser):
+ parsed = self._options.post_parser(parsed)
+
+ if isinstance(parsed, BaseModel):
+ add_request_id(parsed, self.request_id)
+
+ self._parsed_by_type[cache_key] = parsed
+ return cast(R, parsed)
+
+ @property
+ def headers(self) -> httpx.Headers:
+ return self.http_response.headers
+
+ @property
+ def http_request(self) -> httpx.Request:
+ return self.http_response.request
+
+ @property
+ def status_code(self) -> int:
+ return self.http_response.status_code
+
+ @property
+ def url(self) -> httpx.URL:
+ return self.http_response.url
+
+ @property
+ def method(self) -> str:
+ return self.http_request.method
+
+ @property
+ def content(self) -> bytes:
+ """Return the binary response content.
+
+ NOTE: this will be removed in favour of `.read()` in the
+ next major version.
+ """
+ return self.http_response.content
+
+ @property
+ def text(self) -> str:
+ """Return the decoded response content.
+
+ NOTE: this will be turned into a method in the next major version.
+ """
+ return self.http_response.text
+
+ @property
+ def http_version(self) -> str:
+ return self.http_response.http_version
+
+ @property
+ def is_closed(self) -> bool:
+ return self.http_response.is_closed
+
+ @property
+ def elapsed(self) -> datetime.timedelta:
+ """The time taken for the complete request/response cycle to complete."""
+ return self.http_response.elapsed
+
+ def _parse(self, *, to: type[_T] | None = None) -> R | _T:
+ cast_to = to if to is not None else self._cast_to
+
+ # unwrap `TypeAlias('Name', T)` -> `T`
+ if is_type_alias_type(cast_to):
+ cast_to = cast_to.__value__ # type: ignore[unreachable]
+
+ # unwrap `Annotated[T, ...]` -> `T`
+ if cast_to and is_annotated_type(cast_to):
+ cast_to = extract_type_arg(cast_to, 0)
+
+ origin = get_origin(cast_to) or cast_to
+
+ if self._stream:
+ if to:
+ if not is_stream_class_type(to):
+ raise TypeError(f"Expected custom parse type to be a subclass of {Stream} or {AsyncStream}")
+
+ return cast(
+ _T,
+ to(
+ cast_to=extract_stream_chunk_type(
+ to,
+ failure_message="Expected custom stream type to be passed with a type argument, e.g. Stream[ChunkType]",
+ ),
+ response=self.http_response,
+ client=cast(Any, self._client),
+ ),
+ )
+
+ if self._stream_cls:
+ return cast(
+ R,
+ self._stream_cls(
+ cast_to=extract_stream_chunk_type(self._stream_cls),
+ response=self.http_response,
+ client=cast(Any, self._client),
+ ),
+ )
+
+ stream_cls = cast("type[Stream[Any]] | type[AsyncStream[Any]] | None", self._client._default_stream_cls)
+ if stream_cls is None:
+ raise MissingStreamClassError()
+
+ return cast(
+ R,
+ stream_cls(
+ cast_to=cast_to,
+ response=self.http_response,
+ client=cast(Any, self._client),
+ ),
+ )
+
+ if cast_to is NoneType:
+ return cast(R, None)
+
+ response = self.http_response
+ if cast_to == str:
+ return cast(R, response.text)
+
+ if cast_to == int:
+ return cast(R, int(response.text))
+
+ if cast_to == float:
+ return cast(R, float(response.text))
+
+ if cast_to == bool:
+ return cast(R, response.text.lower() == "true")
+
+ if inspect.isclass(origin) and issubclass(origin, HttpxBinaryResponseContent):
+ return cast(R, cast_to(response)) # type: ignore
+
+ if origin == LegacyAPIResponse:
+ raise RuntimeError("Unexpected state - cast_to is `APIResponse`")
+
+ if inspect.isclass(
+ origin # pyright: ignore[reportUnknownArgumentType]
+ ) and issubclass(origin, httpx.Response):
+ # Because of the invariance of our ResponseT TypeVar, users can subclass httpx.Response
+ # and pass that class to our request functions. We cannot change the variance to be either
+ # covariant or contravariant as that makes our usage of ResponseT illegal. We could construct
+ # the response class ourselves but that is something that should be supported directly in httpx
+ # as it would be easy to incorrectly construct the Response object due to the multitude of arguments.
+ if cast_to != httpx.Response:
+ raise ValueError(f"Subclasses of httpx.Response cannot be passed to `cast_to`")
+ return cast(R, response)
+
+ if (
+ inspect.isclass(
+ origin # pyright: ignore[reportUnknownArgumentType]
+ )
+ and not issubclass(origin, BaseModel)
+ and issubclass(origin, pydantic.BaseModel)
+ ):
+ raise TypeError("Pydantic models must subclass our base model type, e.g. `from openai import BaseModel`")
+
+ if (
+ cast_to is not object
+ and not origin is list
+ and not origin is dict
+ and not origin is Union
+ and not issubclass(origin, BaseModel)
+ ):
+ raise RuntimeError(
+ f"Unsupported type, expected {cast_to} to be a subclass of {BaseModel}, {dict}, {list}, {Union}, {NoneType}, {str} or {httpx.Response}."
+ )
+
+ # split is required to handle cases where additional information is included
+ # in the response, e.g. application/json; charset=utf-8
+ content_type, *_ = response.headers.get("content-type", "*").split(";")
+ if content_type != "application/json":
+ if is_basemodel(cast_to):
+ try:
+ data = response.json()
+ except Exception as exc:
+ log.debug("Could not read JSON from response data due to %s - %s", type(exc), exc)
+ else:
+ return self._client._process_response_data(
+ data=data,
+ cast_to=cast_to, # type: ignore
+ response=response,
+ )
+
+ if self._client._strict_response_validation:
+ raise APIResponseValidationError(
+ response=response,
+ message=f"Expected Content-Type response header to be `application/json` but received `{content_type}` instead.",
+ body=response.text,
+ )
+
+ # If the API responds with content that isn't JSON then we just return
+ # the (decoded) text without performing any parsing so that you can still
+ # handle the response however you need to.
+ return response.text # type: ignore
+
+ data = response.json()
+
+ return self._client._process_response_data(
+ data=data,
+ cast_to=cast_to, # type: ignore
+ response=response,
+ )
+
+ @override
+ def __repr__(self) -> str:
+ return f"<APIResponse [{self.status_code} {self.http_response.reason_phrase}] type={self._cast_to}>"
+
+
+class MissingStreamClassError(TypeError):
+ def __init__(self) -> None:
+ super().__init__(
+ "The `stream` argument was set to `True` but the `stream_cls` argument was not given. See `openai._streaming` for reference",
+ )
+
+
+def to_raw_response_wrapper(func: Callable[P, R]) -> Callable[P, LegacyAPIResponse[R]]:
+ """Higher order function that takes one of our bound API methods and wraps it
+ to support returning the raw `APIResponse` object directly.
+ """
+
+ @functools.wraps(func)
+ def wrapped(*args: P.args, **kwargs: P.kwargs) -> LegacyAPIResponse[R]:
+ extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
+ extra_headers[RAW_RESPONSE_HEADER] = "true"
+
+ kwargs["extra_headers"] = extra_headers
+
+ return cast(LegacyAPIResponse[R], func(*args, **kwargs))
+
+ return wrapped
+
+
+def async_to_raw_response_wrapper(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[LegacyAPIResponse[R]]]:
+ """Higher order function that takes one of our bound API methods and wraps it
+ to support returning the raw `APIResponse` object directly.
+ """
+
+ @functools.wraps(func)
+ async def wrapped(*args: P.args, **kwargs: P.kwargs) -> LegacyAPIResponse[R]:
+ extra_headers: dict[str, str] = {**(cast(Any, kwargs.get("extra_headers")) or {})}
+ extra_headers[RAW_RESPONSE_HEADER] = "true"
+
+ kwargs["extra_headers"] = extra_headers
+
+ return cast(LegacyAPIResponse[R], await func(*args, **kwargs))
+
+ return wrapped
+
+
+class HttpxBinaryResponseContent:
+ response: httpx.Response
+
+ def __init__(self, response: httpx.Response) -> None:
+ self.response = response
+
+ @property
+ def content(self) -> bytes:
+ return self.response.content
+
+ @property
+ def text(self) -> str:
+ return self.response.text
+
+ @property
+ def encoding(self) -> str | None:
+ return self.response.encoding
+
+ @property
+ def charset_encoding(self) -> str | None:
+ return self.response.charset_encoding
+
+ def json(self, **kwargs: Any) -> Any:
+ return self.response.json(**kwargs)
+
+ def read(self) -> bytes:
+ return self.response.read()
+
+ def iter_bytes(self, chunk_size: int | None = None) -> Iterator[bytes]:
+ return self.response.iter_bytes(chunk_size)
+
+ def iter_text(self, chunk_size: int | None = None) -> Iterator[str]:
+ return self.response.iter_text(chunk_size)
+
+ def iter_lines(self) -> Iterator[str]:
+ return self.response.iter_lines()
+
+ def iter_raw(self, chunk_size: int | None = None) -> Iterator[bytes]:
+ return self.response.iter_raw(chunk_size)
+
+ def write_to_file(
+ self,
+ file: str | os.PathLike[str],
+ ) -> None:
+ """Write the output to the given file.
+
+ Accepts a filename or any path-like object, e.g. pathlib.Path
+
+ Note: if you want to stream the data to the file instead of writing
+ all at once then you should use `.with_streaming_response` when making
+ the API request, e.g. `client.with_streaming_response.foo().stream_to_file('my_filename.txt')`
+ """
+ with open(file, mode="wb") as f:
+ for data in self.response.iter_bytes():
+ f.write(data)
+
+ @deprecated(
+ "Due to a bug, this method doesn't actually stream the response content, `.with_streaming_response.method()` should be used instead"
+ )
+ def stream_to_file(
+ self,
+ file: str | os.PathLike[str],
+ *,
+ chunk_size: int | None = None,
+ ) -> None:
+ with open(file, mode="wb") as f:
+ for data in self.response.iter_bytes(chunk_size):
+ f.write(data)
+
+ def close(self) -> None:
+ return self.response.close()
+
+ async def aread(self) -> bytes:
+ return await self.response.aread()
+
+ async def aiter_bytes(self, chunk_size: int | None = None) -> AsyncIterator[bytes]:
+ return self.response.aiter_bytes(chunk_size)
+
+ async def aiter_text(self, chunk_size: int | None = None) -> AsyncIterator[str]:
+ return self.response.aiter_text(chunk_size)
+
+ async def aiter_lines(self) -> AsyncIterator[str]:
+ return self.response.aiter_lines()
+
+ async def aiter_raw(self, chunk_size: int | None = None) -> AsyncIterator[bytes]:
+ return self.response.aiter_raw(chunk_size)
+
+ @deprecated(
+ "Due to a bug, this method doesn't actually stream the response content, `.with_streaming_response.method()` should be used instead"
+ )
+ async def astream_to_file(
+ self,
+ file: str | os.PathLike[str],
+ *,
+ chunk_size: int | None = None,
+ ) -> None:
+ path = anyio.Path(file)
+ async with await path.open(mode="wb") as f:
+ async for data in self.response.aiter_bytes(chunk_size):
+ await f.write(data)
+
+ async def aclose(self) -> None:
+ return await self.response.aclose()