from __future__ import annotations
from json import JSONDecodeError
from typing import Any, Generic, Optional, TypeVar, Union
from httpx import Headers, QueryParams
from pydantic import ValidationError
from ..base_request_builder import (
APIResponse,
BaseFilterRequestBuilder,
BaseRPCRequestBuilder,
BaseSelectRequestBuilder,
CountMethod,
SingleAPIResponse,
pre_delete,
pre_insert,
pre_select,
pre_update,
pre_upsert,
)
from ..exceptions import APIError, generate_default_error_message
from ..types import ReturnMethod
from ..utils import AsyncClient, get_origin_and_cast
_ReturnT = TypeVar("_ReturnT")
class AsyncQueryRequestBuilder(Generic[_ReturnT]):
def __init__(
self,
session: AsyncClient,
path: str,
http_method: str,
headers: Headers,
params: QueryParams,
json: dict,
) -> None:
self.session = session
self.path = path
self.http_method = http_method
self.headers = headers
self.params = params
self.json = None if http_method in {"GET", "HEAD"} else json
async def execute(self) -> APIResponse[_ReturnT]:
"""Execute the query.
.. tip::
This is the last method called, after the query is built.
Returns:
:class:`APIResponse`
Raises:
:class:`APIError` If the API raised an error.
"""
r = await self.session.request(
self.http_method,
self.path,
json=self.json,
params=self.params,
headers=self.headers,
)
try:
if r.is_success:
if self.http_method != "HEAD":
body = r.text
if self.headers.get("Accept") == "text/csv":
return body
if self.headers.get(
"Accept"
) and "application/vnd.pgrst.plan" in self.headers.get("Accept"):
if "+json" not in self.headers.get("Accept"):
return body
return APIResponse[_ReturnT].from_http_request_response(r)
else:
raise APIError(r.json())
except ValidationError as e:
raise APIError(r.json()) from e
except JSONDecodeError:
raise APIError(generate_default_error_message(r))
class AsyncSingleRequestBuilder(Generic[_ReturnT]):
def __init__(
self,
session: AsyncClient,
path: str,
http_method: str,
headers: Headers,
params: QueryParams,
json: dict,
) -> None:
self.session = session
self.path = path
self.http_method = http_method
self.headers = headers
self.params = params
self.json = json
async def execute(self) -> SingleAPIResponse[_ReturnT]:
"""Execute the query.
.. tip::
This is the last method called, after the query is built.
Returns:
:class:`SingleAPIResponse`
Raises:
:class:`APIError` If the API raised an error.
"""
r = await self.session.request(
self.http_method,
self.path,
json=self.json,
params=self.params,
headers=self.headers,
)
try:
if (
200 <= r.status_code <= 299
): # Response.ok from JS (https://developer.mozilla.org/en-US/docs/Web/API/Response/ok)
return SingleAPIResponse[_ReturnT].from_http_request_response(r)
else:
raise APIError(r.json())
except ValidationError as e:
raise APIError(r.json()) from e
except JSONDecodeError:
raise APIError(generate_default_error_message(r))
class AsyncMaybeSingleRequestBuilder(AsyncSingleRequestBuilder[_ReturnT]):
async def execute(self) -> Optional[SingleAPIResponse[_ReturnT]]:
r = None
try:
r = await AsyncSingleRequestBuilder[_ReturnT].execute(self)
except APIError as e:
if e.details and "The result contains 0 rows" in e.details:
return None
if not r:
raise APIError(
{
"message": "Missing response",
"code": "204",
"hint": "Please check traceback of the code",
"details": "Postgrest couldn't retrieve response, please check traceback of the code. Please create an issue in `supabase-community/postgrest-py` if needed.",
}
)
return r
# ignoring type checking as a workaround for https://github.com/python/mypy/issues/9319
class AsyncFilterRequestBuilder(BaseFilterRequestBuilder[_ReturnT], AsyncQueryRequestBuilder[_ReturnT]): # type: ignore
def __init__(
self,
session: AsyncClient,
path: str,
http_method: str,
headers: Headers,
params: QueryParams,
json: dict,
) -> None:
get_origin_and_cast(BaseFilterRequestBuilder[_ReturnT]).__init__(
self, session, headers, params
)
get_origin_and_cast(AsyncQueryRequestBuilder[_ReturnT]).__init__(
self, session, path, http_method, headers, params, json
)
# this exists for type-safety. see https://gist.github.com/anand2312/93d3abf401335fd3310d9e30112303bf
class AsyncRPCFilterRequestBuilder(
BaseRPCRequestBuilder[_ReturnT], AsyncSingleRequestBuilder[_ReturnT]
):
def __init__(
self,
session: AsyncClient,
path: str,
http_method: str,
headers: Headers,
params: QueryParams,
json: dict,
) -> None:
get_origin_and_cast(BaseFilterRequestBuilder[_ReturnT]).__init__(
self, session, headers, params
)
get_origin_and_cast(AsyncSingleRequestBuilder[_ReturnT]).__init__(
self, session, path, http_method, headers, params, json
)
# ignoring type checking as a workaround for https://github.com/python/mypy/issues/9319
class AsyncSelectRequestBuilder(BaseSelectRequestBuilder[_ReturnT], AsyncQueryRequestBuilder[_ReturnT]): # type: ignore
def __init__(
self,
session: AsyncClient,
path: str,
http_method: str,
headers: Headers,
params: QueryParams,
json: dict,
) -> None:
get_origin_and_cast(BaseSelectRequestBuilder[_ReturnT]).__init__(
self, session, headers, params
)
get_origin_and_cast(AsyncQueryRequestBuilder[_ReturnT]).__init__(
self, session, path, http_method, headers, params, json
)
def single(self) -> AsyncSingleRequestBuilder[_ReturnT]:
"""Specify that the query will only return a single row in response.
.. caution::
The API will raise an error if the query returned more than one row.
"""
self.headers["Accept"] = "application/vnd.pgrst.object+json"
return AsyncSingleRequestBuilder[_ReturnT](
headers=self.headers,
http_method=self.http_method,
json=self.json,
params=self.params,
path=self.path,
session=self.session, # type: ignore
)
def maybe_single(self) -> AsyncMaybeSingleRequestBuilder[_ReturnT]:
"""Retrieves at most one row from the result. Result must be at most one row (e.g. using `eq` on a UNIQUE column), otherwise this will result in an error."""
self.headers["Accept"] = "application/vnd.pgrst.object+json"
return AsyncMaybeSingleRequestBuilder[_ReturnT](
headers=self.headers,
http_method=self.http_method,
json=self.json,
params=self.params,
path=self.path,
session=self.session, # type: ignore
)
def text_search(
self, column: str, query: str, options: dict[str, Any] = {}
) -> AsyncFilterRequestBuilder[_ReturnT]:
type_ = options.get("type")
type_part = ""
if type_ == "plain":
type_part = "pl"
elif type_ == "phrase":
type_part = "ph"
elif type_ == "web_search":
type_part = "w"
config_part = f"({options.get('config')})" if options.get("config") else ""
self.params = self.params.add(column, f"{type_part}fts{config_part}.{query}")
return AsyncQueryRequestBuilder[_ReturnT](
headers=self.headers,
http_method=self.http_method,
json=self.json,
params=self.params,
path=self.path,
session=self.session, # type: ignore
)
def csv(self) -> AsyncSingleRequestBuilder[str]:
"""Specify that the query must retrieve data as a single CSV string."""
self.headers["Accept"] = "text/csv"
return AsyncSingleRequestBuilder[str](
session=self.session, # type: ignore
path=self.path,
http_method=self.http_method,
headers=self.headers,
params=self.params,
json=self.json,
)
class AsyncRequestBuilder(Generic[_ReturnT]):
def __init__(self, session: AsyncClient, path: str) -> None:
self.session = session
self.path = path
def select(
self,
*columns: str,
count: Optional[CountMethod] = None,
head: Optional[bool] = None,
) -> AsyncSelectRequestBuilder[_ReturnT]:
"""Run a SELECT query.
Args:
*columns: The names of the columns to fetch.
count: The method to use to get the count of rows returned.
Returns:
:class:`AsyncSelectRequestBuilder`
"""
method, params, headers, json = pre_select(*columns, count=count, head=head)
return AsyncSelectRequestBuilder[_ReturnT](
self.session, self.path, method, headers, params, json
)
def insert(
self,
json: Union[dict, list],
*,
count: Optional[CountMethod] = None,
returning: ReturnMethod = ReturnMethod.representation,
upsert: bool = False,
default_to_null: bool = True,
) -> AsyncQueryRequestBuilder[_ReturnT]:
"""Run an INSERT query.
Args:
json: The row to be inserted.
count: The method to use to get the count of rows returned.
returning: Either 'minimal' or 'representation'
upsert: Whether the query should be an upsert.
default_to_null: Make missing fields default to `null`.
Otherwise, use the default value for the column.
Only applies for bulk inserts.
Returns:
:class:`AsyncQueryRequestBuilder`
"""
method, params, headers, json = pre_insert(
json,
count=count,
returning=returning,
upsert=upsert,
default_to_null=default_to_null,
)
return AsyncQueryRequestBuilder[_ReturnT](
self.session, self.path, method, headers, params, json
)
def upsert(
self,
json: Union[dict, list],
*,
count: Optional[CountMethod] = None,
returning: ReturnMethod = ReturnMethod.representation,
ignore_duplicates: bool = False,
on_conflict: str = "",
default_to_null: bool = True,
) -> AsyncQueryRequestBuilder[_ReturnT]:
"""Run an upsert (INSERT ... ON CONFLICT DO UPDATE) query.
Args:
json: The row to be inserted.
count: The method to use to get the count of rows returned.
returning: Either 'minimal' or 'representation'
ignore_duplicates: Whether duplicate rows should be ignored.
on_conflict: Specified columns to be made to work with UNIQUE constraint.
default_to_null: Make missing fields default to `null`. Otherwise, use the
default value for the column. This only applies when inserting new rows,
not when merging with existing rows under `ignoreDuplicates: false`.
This also only applies when doing bulk upserts.
Returns:
:class:`AsyncQueryRequestBuilder`
"""
method, params, headers, json = pre_upsert(
json,
count=count,
returning=returning,
ignore_duplicates=ignore_duplicates,
on_conflict=on_conflict,
default_to_null=default_to_null,
)
return AsyncQueryRequestBuilder[_ReturnT](
self.session, self.path, method, headers, params, json
)
def update(
self,
json: dict,
*,
count: Optional[CountMethod] = None,
returning: ReturnMethod = ReturnMethod.representation,
) -> AsyncFilterRequestBuilder[_ReturnT]:
"""Run an UPDATE query.
Args:
json: The updated fields.
count: The method to use to get the count of rows returned.
returning: Either 'minimal' or 'representation'
Returns:
:class:`AsyncFilterRequestBuilder`
"""
method, params, headers, json = pre_update(
json,
count=count,
returning=returning,
)
return AsyncFilterRequestBuilder[_ReturnT](
self.session, self.path, method, headers, params, json
)
def delete(
self,
*,
count: Optional[CountMethod] = None,
returning: ReturnMethod = ReturnMethod.representation,
) -> AsyncFilterRequestBuilder[_ReturnT]:
"""Run a DELETE query.
Args:
count: The method to use to get the count of rows returned.
returning: Either 'minimal' or 'representation'
Returns:
:class:`AsyncFilterRequestBuilder`
"""
method, params, headers, json = pre_delete(
count=count,
returning=returning,
)
return AsyncFilterRequestBuilder[_ReturnT](
self.session, self.path, method, headers, params, json
)