about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg.py783
1 files changed, 783 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg.py b/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg.py
new file mode 100644
index 00000000..0554048c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/postgresql/psycopg.py
@@ -0,0 +1,783 @@
+# dialects/postgresql/psycopg.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+r"""
+.. dialect:: postgresql+psycopg
+    :name: psycopg (a.k.a. psycopg 3)
+    :dbapi: psycopg
+    :connectstring: postgresql+psycopg://user:password@host:port/dbname[?key=value&key=value...]
+    :url: https://pypi.org/project/psycopg/
+
+``psycopg`` is the package and module name for version 3 of the ``psycopg``
+database driver, formerly known as ``psycopg2``.  This driver is different
+enough from its ``psycopg2`` predecessor that SQLAlchemy supports it
+via a totally separate dialect; support for ``psycopg2`` is expected to remain
+for as long as that package continues to function for modern Python versions,
+and also remains the default dialect for the ``postgresql://`` dialect
+series.
+
+The SQLAlchemy ``psycopg`` dialect provides both a sync and an async
+implementation under the same dialect name. The proper version is
+selected depending on how the engine is created:
+
+* calling :func:`_sa.create_engine` with ``postgresql+psycopg://...`` will
+  automatically select the sync version, e.g.::
+
+    from sqlalchemy import create_engine
+
+    sync_engine = create_engine(
+        "postgresql+psycopg://scott:tiger@localhost/test"
+    )
+
+* calling :func:`_asyncio.create_async_engine` with
+  ``postgresql+psycopg://...`` will automatically select the async version,
+  e.g.::
+
+    from sqlalchemy.ext.asyncio import create_async_engine
+
+    asyncio_engine = create_async_engine(
+        "postgresql+psycopg://scott:tiger@localhost/test"
+    )
+
+The asyncio version of the dialect may also be specified explicitly using the
+``psycopg_async`` suffix, as::
+
+    from sqlalchemy.ext.asyncio import create_async_engine
+
+    asyncio_engine = create_async_engine(
+        "postgresql+psycopg_async://scott:tiger@localhost/test"
+    )
+
+.. seealso::
+
+    :ref:`postgresql_psycopg2` - The SQLAlchemy ``psycopg``
+    dialect shares most of its behavior with the ``psycopg2`` dialect.
+    Further documentation is available there.
+
+Using a different Cursor class
+------------------------------
+
+One of the differences between ``psycopg`` and the older ``psycopg2``
+is how bound parameters are handled: ``psycopg2`` would bind them
+client side, while ``psycopg`` by default will bind them server side.
+
+It's possible to configure ``psycopg`` to do client side binding by
+specifying the ``cursor_factory`` to be ``ClientCursor`` when creating
+the engine::
+
+    from psycopg import ClientCursor
+
+    client_side_engine = create_engine(
+        "postgresql+psycopg://...",
+        connect_args={"cursor_factory": ClientCursor},
+    )
+
+Similarly when using an async engine the ``AsyncClientCursor`` can be
+specified::
+
+    from psycopg import AsyncClientCursor
+
+    client_side_engine = create_async_engine(
+        "postgresql+psycopg://...",
+        connect_args={"cursor_factory": AsyncClientCursor},
+    )
+
+.. seealso::
+
+    `Client-side-binding cursors <https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#client-side-binding-cursors>`_
+
+"""  # noqa
+from __future__ import annotations
+
+from collections import deque
+import logging
+import re
+from typing import cast
+from typing import TYPE_CHECKING
+
+from . import ranges
+from ._psycopg_common import _PGDialect_common_psycopg
+from ._psycopg_common import _PGExecutionContext_common_psycopg
+from .base import INTERVAL
+from .base import PGCompiler
+from .base import PGIdentifierPreparer
+from .base import REGCONFIG
+from .json import JSON
+from .json import JSONB
+from .json import JSONPathType
+from .types import CITEXT
+from ... import pool
+from ... import util
+from ...engine import AdaptedConnection
+from ...sql import sqltypes
+from ...util.concurrency import await_fallback
+from ...util.concurrency import await_only
+
+if TYPE_CHECKING:
+    from typing import Iterable
+
+    from psycopg import AsyncConnection
+
+logger = logging.getLogger("sqlalchemy.dialects.postgresql")
+
+
+class _PGString(sqltypes.String):
+    render_bind_cast = True
+
+
+class _PGREGCONFIG(REGCONFIG):
+    render_bind_cast = True
+
+
+class _PGJSON(JSON):
+    def bind_processor(self, dialect):
+        return self._make_bind_processor(None, dialect._psycopg_Json)
+
+    def result_processor(self, dialect, coltype):
+        return None
+
+
+class _PGJSONB(JSONB):
+    def bind_processor(self, dialect):
+        return self._make_bind_processor(None, dialect._psycopg_Jsonb)
+
+    def result_processor(self, dialect, coltype):
+        return None
+
+
+class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType):
+    __visit_name__ = "json_int_index"
+
+    render_bind_cast = True
+
+
+class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType):
+    __visit_name__ = "json_str_index"
+
+    render_bind_cast = True
+
+
+class _PGJSONPathType(JSONPathType):
+    pass
+
+
+class _PGInterval(INTERVAL):
+    render_bind_cast = True
+
+
+class _PGTimeStamp(sqltypes.DateTime):
+    render_bind_cast = True
+
+
+class _PGDate(sqltypes.Date):
+    render_bind_cast = True
+
+
+class _PGTime(sqltypes.Time):
+    render_bind_cast = True
+
+
+class _PGInteger(sqltypes.Integer):
+    render_bind_cast = True
+
+
+class _PGSmallInteger(sqltypes.SmallInteger):
+    render_bind_cast = True
+
+
+class _PGNullType(sqltypes.NullType):
+    render_bind_cast = True
+
+
+class _PGBigInteger(sqltypes.BigInteger):
+    render_bind_cast = True
+
+
+class _PGBoolean(sqltypes.Boolean):
+    render_bind_cast = True
+
+
+class _PsycopgRange(ranges.AbstractSingleRangeImpl):
+    def bind_processor(self, dialect):
+        psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range
+
+        def to_range(value):
+            if isinstance(value, ranges.Range):
+                value = psycopg_Range(
+                    value.lower, value.upper, value.bounds, value.empty
+                )
+            return value
+
+        return to_range
+
+    def result_processor(self, dialect, coltype):
+        def to_range(value):
+            if value is not None:
+                value = ranges.Range(
+                    value._lower,
+                    value._upper,
+                    bounds=value._bounds if value._bounds else "[)",
+                    empty=not value._bounds,
+                )
+            return value
+
+        return to_range
+
+
+class _PsycopgMultiRange(ranges.AbstractMultiRangeImpl):
+    def bind_processor(self, dialect):
+        psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range
+        psycopg_Multirange = cast(
+            PGDialect_psycopg, dialect
+        )._psycopg_Multirange
+
+        NoneType = type(None)
+
+        def to_range(value):
+            if isinstance(value, (str, NoneType, psycopg_Multirange)):
+                return value
+
+            return psycopg_Multirange(
+                [
+                    psycopg_Range(
+                        element.lower,
+                        element.upper,
+                        element.bounds,
+                        element.empty,
+                    )
+                    for element in cast("Iterable[ranges.Range]", value)
+                ]
+            )
+
+        return to_range
+
+    def result_processor(self, dialect, coltype):
+        def to_range(value):
+            if value is None:
+                return None
+            else:
+                return ranges.MultiRange(
+                    ranges.Range(
+                        elem._lower,
+                        elem._upper,
+                        bounds=elem._bounds if elem._bounds else "[)",
+                        empty=not elem._bounds,
+                    )
+                    for elem in value
+                )
+
+        return to_range
+
+
+class PGExecutionContext_psycopg(_PGExecutionContext_common_psycopg):
+    pass
+
+
+class PGCompiler_psycopg(PGCompiler):
+    pass
+
+
+class PGIdentifierPreparer_psycopg(PGIdentifierPreparer):
+    pass
+
+
+def _log_notices(diagnostic):
+    logger.info("%s: %s", diagnostic.severity, diagnostic.message_primary)
+
+
+class PGDialect_psycopg(_PGDialect_common_psycopg):
+    driver = "psycopg"
+
+    supports_statement_cache = True
+    supports_server_side_cursors = True
+    default_paramstyle = "pyformat"
+    supports_sane_multi_rowcount = True
+
+    execution_ctx_cls = PGExecutionContext_psycopg
+    statement_compiler = PGCompiler_psycopg
+    preparer = PGIdentifierPreparer_psycopg
+    psycopg_version = (0, 0)
+
+    _has_native_hstore = True
+    _psycopg_adapters_map = None
+
+    colspecs = util.update_copy(
+        _PGDialect_common_psycopg.colspecs,
+        {
+            sqltypes.String: _PGString,
+            REGCONFIG: _PGREGCONFIG,
+            JSON: _PGJSON,
+            CITEXT: CITEXT,
+            sqltypes.JSON: _PGJSON,
+            JSONB: _PGJSONB,
+            sqltypes.JSON.JSONPathType: _PGJSONPathType,
+            sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType,
+            sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType,
+            sqltypes.Interval: _PGInterval,
+            INTERVAL: _PGInterval,
+            sqltypes.Date: _PGDate,
+            sqltypes.DateTime: _PGTimeStamp,
+            sqltypes.Time: _PGTime,
+            sqltypes.Integer: _PGInteger,
+            sqltypes.SmallInteger: _PGSmallInteger,
+            sqltypes.BigInteger: _PGBigInteger,
+            ranges.AbstractSingleRange: _PsycopgRange,
+            ranges.AbstractMultiRange: _PsycopgMultiRange,
+        },
+    )
+
+    def __init__(self, **kwargs):
+        super().__init__(**kwargs)
+
+        if self.dbapi:
+            m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
+            if m:
+                self.psycopg_version = tuple(
+                    int(x) for x in m.group(1, 2, 3) if x is not None
+                )
+
+            if self.psycopg_version < (3, 0, 2):
+                raise ImportError(
+                    "psycopg version 3.0.2 or higher is required."
+                )
+
+            from psycopg.adapt import AdaptersMap
+
+            self._psycopg_adapters_map = adapters_map = AdaptersMap(
+                self.dbapi.adapters
+            )
+
+            if self._native_inet_types is False:
+                import psycopg.types.string
+
+                adapters_map.register_loader(
+                    "inet", psycopg.types.string.TextLoader
+                )
+                adapters_map.register_loader(
+                    "cidr", psycopg.types.string.TextLoader
+                )
+
+            if self._json_deserializer:
+                from psycopg.types.json import set_json_loads
+
+                set_json_loads(self._json_deserializer, adapters_map)
+
+            if self._json_serializer:
+                from psycopg.types.json import set_json_dumps
+
+                set_json_dumps(self._json_serializer, adapters_map)
+
+    def create_connect_args(self, url):
+        # see https://github.com/psycopg/psycopg/issues/83
+        cargs, cparams = super().create_connect_args(url)
+
+        if self._psycopg_adapters_map:
+            cparams["context"] = self._psycopg_adapters_map
+        if self.client_encoding is not None:
+            cparams["client_encoding"] = self.client_encoding
+        return cargs, cparams
+
+    def _type_info_fetch(self, connection, name):
+        from psycopg.types import TypeInfo
+
+        return TypeInfo.fetch(connection.connection.driver_connection, name)
+
+    def initialize(self, connection):
+        super().initialize(connection)
+
+        # PGDialect.initialize() checks server version for <= 8.2 and sets
+        # this flag to False if so
+        if not self.insert_returning:
+            self.insert_executemany_returning = False
+
+        # HSTORE can't be registered until we have a connection so that
+        # we can look up its OID, so we set up this adapter in
+        # initialize()
+        if self.use_native_hstore:
+            info = self._type_info_fetch(connection, "hstore")
+            self._has_native_hstore = info is not None
+            if self._has_native_hstore:
+                from psycopg.types.hstore import register_hstore
+
+                # register the adapter for connections made subsequent to
+                # this one
+                assert self._psycopg_adapters_map
+                register_hstore(info, self._psycopg_adapters_map)
+
+                # register the adapter for this connection
+                assert connection.connection
+                register_hstore(info, connection.connection.driver_connection)
+
+    @classmethod
+    def import_dbapi(cls):
+        import psycopg
+
+        return psycopg
+
+    @classmethod
+    def get_async_dialect_cls(cls, url):
+        return PGDialectAsync_psycopg
+
+    @util.memoized_property
+    def _isolation_lookup(self):
+        return {
+            "READ COMMITTED": self.dbapi.IsolationLevel.READ_COMMITTED,
+            "READ UNCOMMITTED": self.dbapi.IsolationLevel.READ_UNCOMMITTED,
+            "REPEATABLE READ": self.dbapi.IsolationLevel.REPEATABLE_READ,
+            "SERIALIZABLE": self.dbapi.IsolationLevel.SERIALIZABLE,
+        }
+
+    @util.memoized_property
+    def _psycopg_Json(self):
+        from psycopg.types import json
+
+        return json.Json
+
+    @util.memoized_property
+    def _psycopg_Jsonb(self):
+        from psycopg.types import json
+
+        return json.Jsonb
+
+    @util.memoized_property
+    def _psycopg_TransactionStatus(self):
+        from psycopg.pq import TransactionStatus
+
+        return TransactionStatus
+
+    @util.memoized_property
+    def _psycopg_Range(self):
+        from psycopg.types.range import Range
+
+        return Range
+
+    @util.memoized_property
+    def _psycopg_Multirange(self):
+        from psycopg.types.multirange import Multirange
+
+        return Multirange
+
+    def _do_isolation_level(self, connection, autocommit, isolation_level):
+        connection.autocommit = autocommit
+        connection.isolation_level = isolation_level
+
+    def get_isolation_level(self, dbapi_connection):
+        status_before = dbapi_connection.info.transaction_status
+        value = super().get_isolation_level(dbapi_connection)
+
+        # don't rely on psycopg providing enum symbols, compare with
+        # eq/ne
+        if status_before == self._psycopg_TransactionStatus.IDLE:
+            dbapi_connection.rollback()
+        return value
+
+    def set_isolation_level(self, dbapi_connection, level):
+        if level == "AUTOCOMMIT":
+            self._do_isolation_level(
+                dbapi_connection, autocommit=True, isolation_level=None
+            )
+        else:
+            self._do_isolation_level(
+                dbapi_connection,
+                autocommit=False,
+                isolation_level=self._isolation_lookup[level],
+            )
+
+    def set_readonly(self, connection, value):
+        connection.read_only = value
+
+    def get_readonly(self, connection):
+        return connection.read_only
+
+    def on_connect(self):
+        def notices(conn):
+            conn.add_notice_handler(_log_notices)
+
+        fns = [notices]
+
+        if self.isolation_level is not None:
+
+            def on_connect(conn):
+                self.set_isolation_level(conn, self.isolation_level)
+
+            fns.append(on_connect)
+
+        # fns always has the notices function
+        def on_connect(conn):
+            for fn in fns:
+                fn(conn)
+
+        return on_connect
+
+    def is_disconnect(self, e, connection, cursor):
+        if isinstance(e, self.dbapi.Error) and connection is not None:
+            if connection.closed or connection.broken:
+                return True
+        return False
+
+    def _do_prepared_twophase(self, connection, command, recover=False):
+        dbapi_conn = connection.connection.dbapi_connection
+        if (
+            recover
+            # don't rely on psycopg providing enum symbols, compare with
+            # eq/ne
+            or dbapi_conn.info.transaction_status
+            != self._psycopg_TransactionStatus.IDLE
+        ):
+            dbapi_conn.rollback()
+        before_autocommit = dbapi_conn.autocommit
+        try:
+            if not before_autocommit:
+                self._do_autocommit(dbapi_conn, True)
+            dbapi_conn.execute(command)
+        finally:
+            if not before_autocommit:
+                self._do_autocommit(dbapi_conn, before_autocommit)
+
+    def do_rollback_twophase(
+        self, connection, xid, is_prepared=True, recover=False
+    ):
+        if is_prepared:
+            self._do_prepared_twophase(
+                connection, f"ROLLBACK PREPARED '{xid}'", recover=recover
+            )
+        else:
+            self.do_rollback(connection.connection)
+
+    def do_commit_twophase(
+        self, connection, xid, is_prepared=True, recover=False
+    ):
+        if is_prepared:
+            self._do_prepared_twophase(
+                connection, f"COMMIT PREPARED '{xid}'", recover=recover
+            )
+        else:
+            self.do_commit(connection.connection)
+
+    @util.memoized_property
+    def _dialect_specific_select_one(self):
+        return ";"
+
+
+class AsyncAdapt_psycopg_cursor:
+    __slots__ = ("_cursor", "await_", "_rows")
+
+    _psycopg_ExecStatus = None
+
+    def __init__(self, cursor, await_) -> None:
+        self._cursor = cursor
+        self.await_ = await_
+        self._rows = deque()
+
+    def __getattr__(self, name):
+        return getattr(self._cursor, name)
+
+    @property
+    def arraysize(self):
+        return self._cursor.arraysize
+
+    @arraysize.setter
+    def arraysize(self, value):
+        self._cursor.arraysize = value
+
+    def close(self):
+        self._rows.clear()
+        # Normal cursor just call _close() in a non-sync way.
+        self._cursor._close()
+
+    def execute(self, query, params=None, **kw):
+        result = self.await_(self._cursor.execute(query, params, **kw))
+        # sqlalchemy result is not async, so need to pull all rows here
+        res = self._cursor.pgresult
+
+        # don't rely on psycopg providing enum symbols, compare with
+        # eq/ne
+        if res and res.status == self._psycopg_ExecStatus.TUPLES_OK:
+            rows = self.await_(self._cursor.fetchall())
+            self._rows = deque(rows)
+        return result
+
+    def executemany(self, query, params_seq):
+        return self.await_(self._cursor.executemany(query, params_seq))
+
+    def __iter__(self):
+        while self._rows:
+            yield self._rows.popleft()
+
+    def fetchone(self):
+        if self._rows:
+            return self._rows.popleft()
+        else:
+            return None
+
+    def fetchmany(self, size=None):
+        if size is None:
+            size = self._cursor.arraysize
+
+        rr = self._rows
+        return [rr.popleft() for _ in range(min(size, len(rr)))]
+
+    def fetchall(self):
+        retval = list(self._rows)
+        self._rows.clear()
+        return retval
+
+
+class AsyncAdapt_psycopg_ss_cursor(AsyncAdapt_psycopg_cursor):
+    def execute(self, query, params=None, **kw):
+        self.await_(self._cursor.execute(query, params, **kw))
+        return self
+
+    def close(self):
+        self.await_(self._cursor.close())
+
+    def fetchone(self):
+        return self.await_(self._cursor.fetchone())
+
+    def fetchmany(self, size=0):
+        return self.await_(self._cursor.fetchmany(size))
+
+    def fetchall(self):
+        return self.await_(self._cursor.fetchall())
+
+    def __iter__(self):
+        iterator = self._cursor.__aiter__()
+        while True:
+            try:
+                yield self.await_(iterator.__anext__())
+            except StopAsyncIteration:
+                break
+
+
+class AsyncAdapt_psycopg_connection(AdaptedConnection):
+    _connection: AsyncConnection
+    __slots__ = ()
+    await_ = staticmethod(await_only)
+
+    def __init__(self, connection) -> None:
+        self._connection = connection
+
+    def __getattr__(self, name):
+        return getattr(self._connection, name)
+
+    def execute(self, query, params=None, **kw):
+        cursor = self.await_(self._connection.execute(query, params, **kw))
+        return AsyncAdapt_psycopg_cursor(cursor, self.await_)
+
+    def cursor(self, *args, **kw):
+        cursor = self._connection.cursor(*args, **kw)
+        if hasattr(cursor, "name"):
+            return AsyncAdapt_psycopg_ss_cursor(cursor, self.await_)
+        else:
+            return AsyncAdapt_psycopg_cursor(cursor, self.await_)
+
+    def commit(self):
+        self.await_(self._connection.commit())
+
+    def rollback(self):
+        self.await_(self._connection.rollback())
+
+    def close(self):
+        self.await_(self._connection.close())
+
+    @property
+    def autocommit(self):
+        return self._connection.autocommit
+
+    @autocommit.setter
+    def autocommit(self, value):
+        self.set_autocommit(value)
+
+    def set_autocommit(self, value):
+        self.await_(self._connection.set_autocommit(value))
+
+    def set_isolation_level(self, value):
+        self.await_(self._connection.set_isolation_level(value))
+
+    def set_read_only(self, value):
+        self.await_(self._connection.set_read_only(value))
+
+    def set_deferrable(self, value):
+        self.await_(self._connection.set_deferrable(value))
+
+
+class AsyncAdaptFallback_psycopg_connection(AsyncAdapt_psycopg_connection):
+    __slots__ = ()
+    await_ = staticmethod(await_fallback)
+
+
+class PsycopgAdaptDBAPI:
+    def __init__(self, psycopg) -> None:
+        self.psycopg = psycopg
+
+        for k, v in self.psycopg.__dict__.items():
+            if k != "connect":
+                self.__dict__[k] = v
+
+    def connect(self, *arg, **kw):
+        async_fallback = kw.pop("async_fallback", False)
+        creator_fn = kw.pop(
+            "async_creator_fn", self.psycopg.AsyncConnection.connect
+        )
+        if util.asbool(async_fallback):
+            return AsyncAdaptFallback_psycopg_connection(
+                await_fallback(creator_fn(*arg, **kw))
+            )
+        else:
+            return AsyncAdapt_psycopg_connection(
+                await_only(creator_fn(*arg, **kw))
+            )
+
+
+class PGDialectAsync_psycopg(PGDialect_psycopg):
+    is_async = True
+    supports_statement_cache = True
+
+    @classmethod
+    def import_dbapi(cls):
+        import psycopg
+        from psycopg.pq import ExecStatus
+
+        AsyncAdapt_psycopg_cursor._psycopg_ExecStatus = ExecStatus
+
+        return PsycopgAdaptDBAPI(psycopg)
+
+    @classmethod
+    def get_pool_class(cls, url):
+        async_fallback = url.query.get("async_fallback", False)
+
+        if util.asbool(async_fallback):
+            return pool.FallbackAsyncAdaptedQueuePool
+        else:
+            return pool.AsyncAdaptedQueuePool
+
+    def _type_info_fetch(self, connection, name):
+        from psycopg.types import TypeInfo
+
+        adapted = connection.connection
+        return adapted.await_(TypeInfo.fetch(adapted.driver_connection, name))
+
+    def _do_isolation_level(self, connection, autocommit, isolation_level):
+        connection.set_autocommit(autocommit)
+        connection.set_isolation_level(isolation_level)
+
+    def _do_autocommit(self, connection, value):
+        connection.set_autocommit(value)
+
+    def set_readonly(self, connection, value):
+        connection.set_read_only(value)
+
+    def set_deferrable(self, connection, value):
+        connection.set_deferrable(value)
+
+    def get_driver_connection(self, connection):
+        return connection._connection
+
+
+dialect = PGDialect_psycopg
+dialect_async = PGDialectAsync_psycopg