about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/engine/default.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sqlalchemy/engine/default.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/engine/default.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/engine/default.py2367
1 files changed, 2367 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/engine/default.py b/.venv/lib/python3.12/site-packages/sqlalchemy/engine/default.py
new file mode 100644
index 00000000..dd4250ff
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/engine/default.py
@@ -0,0 +1,2367 @@
+# engine/default.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: allow-untyped-defs, allow-untyped-calls
+
+"""Default implementations of per-dialect sqlalchemy.engine classes.
+
+These are semi-private implementation classes which are only of importance
+to database dialect authors; dialects will usually use the classes here
+as the base class for their own corresponding classes.
+
+"""
+
+from __future__ import annotations
+
+import functools
+import operator
+import random
+import re
+from time import perf_counter
+import typing
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import List
+from typing import Mapping
+from typing import MutableMapping
+from typing import MutableSequence
+from typing import Optional
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import Union
+import weakref
+
+from . import characteristics
+from . import cursor as _cursor
+from . import interfaces
+from .base import Connection
+from .interfaces import CacheStats
+from .interfaces import DBAPICursor
+from .interfaces import Dialect
+from .interfaces import ExecuteStyle
+from .interfaces import ExecutionContext
+from .reflection import ObjectKind
+from .reflection import ObjectScope
+from .. import event
+from .. import exc
+from .. import pool
+from .. import util
+from ..sql import compiler
+from ..sql import dml
+from ..sql import expression
+from ..sql import type_api
+from ..sql import util as sql_util
+from ..sql._typing import is_tuple_type
+from ..sql.base import _NoArg
+from ..sql.compiler import DDLCompiler
+from ..sql.compiler import InsertmanyvaluesSentinelOpts
+from ..sql.compiler import SQLCompiler
+from ..sql.elements import quoted_name
+from ..util.typing import Final
+from ..util.typing import Literal
+
+if typing.TYPE_CHECKING:
+    from types import ModuleType
+
+    from .base import Engine
+    from .cursor import ResultFetchStrategy
+    from .interfaces import _CoreMultiExecuteParams
+    from .interfaces import _CoreSingleExecuteParams
+    from .interfaces import _DBAPICursorDescription
+    from .interfaces import _DBAPIMultiExecuteParams
+    from .interfaces import _ExecuteOptions
+    from .interfaces import _MutableCoreSingleExecuteParams
+    from .interfaces import _ParamStyle
+    from .interfaces import DBAPIConnection
+    from .interfaces import IsolationLevel
+    from .row import Row
+    from .url import URL
+    from ..event import _ListenerFnType
+    from ..pool import Pool
+    from ..pool import PoolProxiedConnection
+    from ..sql import Executable
+    from ..sql.compiler import Compiled
+    from ..sql.compiler import Linting
+    from ..sql.compiler import ResultColumnsEntry
+    from ..sql.dml import DMLState
+    from ..sql.dml import UpdateBase
+    from ..sql.elements import BindParameter
+    from ..sql.schema import Column
+    from ..sql.type_api import _BindProcessorType
+    from ..sql.type_api import _ResultProcessorType
+    from ..sql.type_api import TypeEngine
+
+# When we're handed literal SQL, ensure it's a SELECT query
+SERVER_SIDE_CURSOR_RE = re.compile(r"\s*SELECT", re.I | re.UNICODE)
+
+
+(
+    CACHE_HIT,
+    CACHE_MISS,
+    CACHING_DISABLED,
+    NO_CACHE_KEY,
+    NO_DIALECT_SUPPORT,
+) = list(CacheStats)
+
+
+class DefaultDialect(Dialect):
+    """Default implementation of Dialect"""
+
+    statement_compiler = compiler.SQLCompiler
+    ddl_compiler = compiler.DDLCompiler
+    type_compiler_cls = compiler.GenericTypeCompiler
+
+    preparer = compiler.IdentifierPreparer
+    supports_alter = True
+    supports_comments = False
+    supports_constraint_comments = False
+    inline_comments = False
+    supports_statement_cache = True
+
+    div_is_floordiv = True
+
+    bind_typing = interfaces.BindTyping.NONE
+
+    include_set_input_sizes: Optional[Set[Any]] = None
+    exclude_set_input_sizes: Optional[Set[Any]] = None
+
+    # the first value we'd get for an autoincrement column.
+    default_sequence_base = 1
+
+    # most DBAPIs happy with this for execute().
+    # not cx_oracle.
+    execute_sequence_format = tuple
+
+    supports_schemas = True
+    supports_views = True
+    supports_sequences = False
+    sequences_optional = False
+    preexecute_autoincrement_sequences = False
+    supports_identity_columns = False
+    postfetch_lastrowid = True
+    favor_returning_over_lastrowid = False
+    insert_null_pk_still_autoincrements = False
+    update_returning = False
+    delete_returning = False
+    update_returning_multifrom = False
+    delete_returning_multifrom = False
+    insert_returning = False
+
+    cte_follows_insert = False
+
+    supports_native_enum = False
+    supports_native_boolean = False
+    supports_native_uuid = False
+    returns_native_bytes = False
+
+    non_native_boolean_check_constraint = True
+
+    supports_simple_order_by_label = True
+
+    tuple_in_values = False
+
+    connection_characteristics = util.immutabledict(
+        {
+            "isolation_level": characteristics.IsolationLevelCharacteristic(),
+            "logging_token": characteristics.LoggingTokenCharacteristic(),
+        }
+    )
+
+    engine_config_types: Mapping[str, Any] = util.immutabledict(
+        {
+            "pool_timeout": util.asint,
+            "echo": util.bool_or_str("debug"),
+            "echo_pool": util.bool_or_str("debug"),
+            "pool_recycle": util.asint,
+            "pool_size": util.asint,
+            "max_overflow": util.asint,
+            "future": util.asbool,
+        }
+    )
+
+    # if the NUMERIC type
+    # returns decimal.Decimal.
+    # *not* the FLOAT type however.
+    supports_native_decimal = False
+
+    name = "default"
+
+    # length at which to truncate
+    # any identifier.
+    max_identifier_length = 9999
+    _user_defined_max_identifier_length: Optional[int] = None
+
+    isolation_level: Optional[str] = None
+
+    # sub-categories of max_identifier_length.
+    # currently these accommodate for MySQL which allows alias names
+    # of 255 but DDL names only of 64.
+    max_index_name_length: Optional[int] = None
+    max_constraint_name_length: Optional[int] = None
+
+    supports_sane_rowcount = True
+    supports_sane_multi_rowcount = True
+    colspecs: MutableMapping[Type[TypeEngine[Any]], Type[TypeEngine[Any]]] = {}
+    default_paramstyle = "named"
+
+    supports_default_values = False
+    """dialect supports INSERT... DEFAULT VALUES syntax"""
+
+    supports_default_metavalue = False
+    """dialect supports INSERT... VALUES (DEFAULT) syntax"""
+
+    default_metavalue_token = "DEFAULT"
+    """for INSERT... VALUES (DEFAULT) syntax, the token to put in the
+    parenthesis."""
+
+    # not sure if this is a real thing but the compiler will deliver it
+    # if this is the only flag enabled.
+    supports_empty_insert = True
+    """dialect supports INSERT () VALUES ()"""
+
+    supports_multivalues_insert = False
+
+    use_insertmanyvalues: bool = False
+
+    use_insertmanyvalues_wo_returning: bool = False
+
+    insertmanyvalues_implicit_sentinel: InsertmanyvaluesSentinelOpts = (
+        InsertmanyvaluesSentinelOpts.NOT_SUPPORTED
+    )
+
+    insertmanyvalues_page_size: int = 1000
+    insertmanyvalues_max_parameters = 32700
+
+    supports_is_distinct_from = True
+
+    supports_server_side_cursors = False
+
+    server_side_cursors = False
+
+    # extra record-level locking features (#4860)
+    supports_for_update_of = False
+
+    server_version_info = None
+
+    default_schema_name: Optional[str] = None
+
+    # indicates symbol names are
+    # UPPERCASED if they are case insensitive
+    # within the database.
+    # if this is True, the methods normalize_name()
+    # and denormalize_name() must be provided.
+    requires_name_normalize = False
+
+    is_async = False
+
+    has_terminate = False
+
+    # TODO: this is not to be part of 2.0.  implement rudimentary binary
+    # literals for SQLite, PostgreSQL, MySQL only within
+    # _Binary.literal_processor
+    _legacy_binary_type_literal_encoding = "utf-8"
+
+    @util.deprecated_params(
+        empty_in_strategy=(
+            "1.4",
+            "The :paramref:`_sa.create_engine.empty_in_strategy` keyword is "
+            "deprecated, and no longer has any effect.  All IN expressions "
+            "are now rendered using "
+            'the "expanding parameter" strategy which renders a set of bound'
+            'expressions, or an "empty set" SELECT, at statement execution'
+            "time.",
+        ),
+        server_side_cursors=(
+            "1.4",
+            "The :paramref:`_sa.create_engine.server_side_cursors` parameter "
+            "is deprecated and will be removed in a future release.  Please "
+            "use the "
+            ":paramref:`_engine.Connection.execution_options.stream_results` "
+            "parameter.",
+        ),
+    )
+    def __init__(
+        self,
+        paramstyle: Optional[_ParamStyle] = None,
+        isolation_level: Optional[IsolationLevel] = None,
+        dbapi: Optional[ModuleType] = None,
+        implicit_returning: Literal[True] = True,
+        supports_native_boolean: Optional[bool] = None,
+        max_identifier_length: Optional[int] = None,
+        label_length: Optional[int] = None,
+        insertmanyvalues_page_size: Union[_NoArg, int] = _NoArg.NO_ARG,
+        use_insertmanyvalues: Optional[bool] = None,
+        # util.deprecated_params decorator cannot render the
+        # Linting.NO_LINTING constant
+        compiler_linting: Linting = int(compiler.NO_LINTING),  # type: ignore
+        server_side_cursors: bool = False,
+        **kwargs: Any,
+    ):
+        if server_side_cursors:
+            if not self.supports_server_side_cursors:
+                raise exc.ArgumentError(
+                    "Dialect %s does not support server side cursors" % self
+                )
+            else:
+                self.server_side_cursors = True
+
+        if getattr(self, "use_setinputsizes", False):
+            util.warn_deprecated(
+                "The dialect-level use_setinputsizes attribute is "
+                "deprecated.  Please use "
+                "bind_typing = BindTyping.SETINPUTSIZES",
+                "2.0",
+            )
+            self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
+
+        self.positional = False
+        self._ischema = None
+
+        self.dbapi = dbapi
+
+        if paramstyle is not None:
+            self.paramstyle = paramstyle
+        elif self.dbapi is not None:
+            self.paramstyle = self.dbapi.paramstyle
+        else:
+            self.paramstyle = self.default_paramstyle
+        self.positional = self.paramstyle in (
+            "qmark",
+            "format",
+            "numeric",
+            "numeric_dollar",
+        )
+        self.identifier_preparer = self.preparer(self)
+        self._on_connect_isolation_level = isolation_level
+
+        legacy_tt_callable = getattr(self, "type_compiler", None)
+        if legacy_tt_callable is not None:
+            tt_callable = cast(
+                Type[compiler.GenericTypeCompiler],
+                self.type_compiler,
+            )
+        else:
+            tt_callable = self.type_compiler_cls
+
+        self.type_compiler_instance = self.type_compiler = tt_callable(self)
+
+        if supports_native_boolean is not None:
+            self.supports_native_boolean = supports_native_boolean
+
+        self._user_defined_max_identifier_length = max_identifier_length
+        if self._user_defined_max_identifier_length:
+            self.max_identifier_length = (
+                self._user_defined_max_identifier_length
+            )
+        self.label_length = label_length
+        self.compiler_linting = compiler_linting
+
+        if use_insertmanyvalues is not None:
+            self.use_insertmanyvalues = use_insertmanyvalues
+
+        if insertmanyvalues_page_size is not _NoArg.NO_ARG:
+            self.insertmanyvalues_page_size = insertmanyvalues_page_size
+
+    @property
+    @util.deprecated(
+        "2.0",
+        "full_returning is deprecated, please use insert_returning, "
+        "update_returning, delete_returning",
+    )
+    def full_returning(self):
+        return (
+            self.insert_returning
+            and self.update_returning
+            and self.delete_returning
+        )
+
+    @util.memoized_property
+    def insert_executemany_returning(self):
+        """Default implementation for insert_executemany_returning, if not
+        otherwise overridden by the specific dialect.
+
+        The default dialect determines "insert_executemany_returning" is
+        available if the dialect in use has opted into using the
+        "use_insertmanyvalues" feature. If they haven't opted into that, then
+        this attribute is False, unless the dialect in question overrides this
+        and provides some other implementation (such as the Oracle Database
+        dialects).
+
+        """
+        return self.insert_returning and self.use_insertmanyvalues
+
+    @util.memoized_property
+    def insert_executemany_returning_sort_by_parameter_order(self):
+        """Default implementation for
+        insert_executemany_returning_deterministic_order, if not otherwise
+        overridden by the specific dialect.
+
+        The default dialect determines "insert_executemany_returning" can have
+        deterministic order only if the dialect in use has opted into using the
+        "use_insertmanyvalues" feature, which implements deterministic ordering
+        using client side sentinel columns only by default.  The
+        "insertmanyvalues" feature also features alternate forms that can
+        use server-generated PK values as "sentinels", but those are only
+        used if the :attr:`.Dialect.insertmanyvalues_implicit_sentinel`
+        bitflag enables those alternate SQL forms, which are disabled
+        by default.
+
+        If the dialect in use hasn't opted into that, then this attribute is
+        False, unless the dialect in question overrides this and provides some
+        other implementation (such as the Oracle Database dialects).
+
+        """
+        return self.insert_returning and self.use_insertmanyvalues
+
+    update_executemany_returning = False
+    delete_executemany_returning = False
+
+    @util.memoized_property
+    def loaded_dbapi(self) -> ModuleType:
+        if self.dbapi is None:
+            raise exc.InvalidRequestError(
+                f"Dialect {self} does not have a Python DBAPI established "
+                "and cannot be used for actual database interaction"
+            )
+        return self.dbapi
+
+    @util.memoized_property
+    def _bind_typing_render_casts(self):
+        return self.bind_typing is interfaces.BindTyping.RENDER_CASTS
+
+    def _ensure_has_table_connection(self, arg):
+        if not isinstance(arg, Connection):
+            raise exc.ArgumentError(
+                "The argument passed to Dialect.has_table() should be a "
+                "%s, got %s. "
+                "Additionally, the Dialect.has_table() method is for "
+                "internal dialect "
+                "use only; please use "
+                "``inspect(some_engine).has_table(<tablename>>)`` "
+                "for public API use." % (Connection, type(arg))
+            )
+
+    @util.memoized_property
+    def _supports_statement_cache(self):
+        ssc = self.__class__.__dict__.get("supports_statement_cache", None)
+        if ssc is None:
+            util.warn(
+                "Dialect %s:%s will not make use of SQL compilation caching "
+                "as it does not set the 'supports_statement_cache' attribute "
+                "to ``True``.  This can have "
+                "significant performance implications including some "
+                "performance degradations in comparison to prior SQLAlchemy "
+                "versions.  Dialect maintainers should seek to set this "
+                "attribute to True after appropriate development and testing "
+                "for SQLAlchemy 1.4 caching support.   Alternatively, this "
+                "attribute may be set to False which will disable this "
+                "warning." % (self.name, self.driver),
+                code="cprf",
+            )
+
+        return bool(ssc)
+
+    @util.memoized_property
+    def _type_memos(self):
+        return weakref.WeakKeyDictionary()
+
+    @property
+    def dialect_description(self):
+        return self.name + "+" + self.driver
+
+    @property
+    def supports_sane_rowcount_returning(self):
+        """True if this dialect supports sane rowcount even if RETURNING is
+        in use.
+
+        For dialects that don't support RETURNING, this is synonymous with
+        ``supports_sane_rowcount``.
+
+        """
+        return self.supports_sane_rowcount
+
+    @classmethod
+    def get_pool_class(cls, url: URL) -> Type[Pool]:
+        return getattr(cls, "poolclass", pool.QueuePool)
+
+    def get_dialect_pool_class(self, url: URL) -> Type[Pool]:
+        return self.get_pool_class(url)
+
+    @classmethod
+    def load_provisioning(cls):
+        package = ".".join(cls.__module__.split(".")[0:-1])
+        try:
+            __import__(package + ".provision")
+        except ImportError:
+            pass
+
+    def _builtin_onconnect(self) -> Optional[_ListenerFnType]:
+        if self._on_connect_isolation_level is not None:
+
+            def builtin_connect(dbapi_conn, conn_rec):
+                self._assert_and_set_isolation_level(
+                    dbapi_conn, self._on_connect_isolation_level
+                )
+
+            return builtin_connect
+        else:
+            return None
+
+    def initialize(self, connection):
+        try:
+            self.server_version_info = self._get_server_version_info(
+                connection
+            )
+        except NotImplementedError:
+            self.server_version_info = None
+        try:
+            self.default_schema_name = self._get_default_schema_name(
+                connection
+            )
+        except NotImplementedError:
+            self.default_schema_name = None
+
+        try:
+            self.default_isolation_level = self.get_default_isolation_level(
+                connection.connection.dbapi_connection
+            )
+        except NotImplementedError:
+            self.default_isolation_level = None
+
+        if not self._user_defined_max_identifier_length:
+            max_ident_length = self._check_max_identifier_length(connection)
+            if max_ident_length:
+                self.max_identifier_length = max_ident_length
+
+        if (
+            self.label_length
+            and self.label_length > self.max_identifier_length
+        ):
+            raise exc.ArgumentError(
+                "Label length of %d is greater than this dialect's"
+                " maximum identifier length of %d"
+                % (self.label_length, self.max_identifier_length)
+            )
+
+    def on_connect(self):
+        # inherits the docstring from interfaces.Dialect.on_connect
+        return None
+
+    def _check_max_identifier_length(self, connection):
+        """Perform a connection / server version specific check to determine
+        the max_identifier_length.
+
+        If the dialect's class level max_identifier_length should be used,
+        can return None.
+
+        .. versionadded:: 1.3.9
+
+        """
+        return None
+
+    def get_default_isolation_level(self, dbapi_conn):
+        """Given a DBAPI connection, return its isolation level, or
+        a default isolation level if one cannot be retrieved.
+
+        May be overridden by subclasses in order to provide a
+        "fallback" isolation level for databases that cannot reliably
+        retrieve the actual isolation level.
+
+        By default, calls the :meth:`_engine.Interfaces.get_isolation_level`
+        method, propagating any exceptions raised.
+
+        .. versionadded:: 1.3.22
+
+        """
+        return self.get_isolation_level(dbapi_conn)
+
+    def type_descriptor(self, typeobj):
+        """Provide a database-specific :class:`.TypeEngine` object, given
+        the generic object which comes from the types module.
+
+        This method looks for a dictionary called
+        ``colspecs`` as a class or instance-level variable,
+        and passes on to :func:`_types.adapt_type`.
+
+        """
+        return type_api.adapt_type(typeobj, self.colspecs)
+
+    def has_index(self, connection, table_name, index_name, schema=None, **kw):
+        if not self.has_table(connection, table_name, schema=schema, **kw):
+            return False
+        for idx in self.get_indexes(
+            connection, table_name, schema=schema, **kw
+        ):
+            if idx["name"] == index_name:
+                return True
+        else:
+            return False
+
+    def has_schema(
+        self, connection: Connection, schema_name: str, **kw: Any
+    ) -> bool:
+        return schema_name in self.get_schema_names(connection, **kw)
+
+    def validate_identifier(self, ident):
+        if len(ident) > self.max_identifier_length:
+            raise exc.IdentifierError(
+                "Identifier '%s' exceeds maximum length of %d characters"
+                % (ident, self.max_identifier_length)
+            )
+
+    def connect(self, *cargs, **cparams):
+        # inherits the docstring from interfaces.Dialect.connect
+        return self.loaded_dbapi.connect(*cargs, **cparams)
+
+    def create_connect_args(self, url):
+        # inherits the docstring from interfaces.Dialect.create_connect_args
+        opts = url.translate_connect_args()
+        opts.update(url.query)
+        return ([], opts)
+
+    def set_engine_execution_options(
+        self, engine: Engine, opts: Mapping[str, Any]
+    ) -> None:
+        supported_names = set(self.connection_characteristics).intersection(
+            opts
+        )
+        if supported_names:
+            characteristics: Mapping[str, Any] = util.immutabledict(
+                (name, opts[name]) for name in supported_names
+            )
+
+            @event.listens_for(engine, "engine_connect")
+            def set_connection_characteristics(connection):
+                self._set_connection_characteristics(
+                    connection, characteristics
+                )
+
+    def set_connection_execution_options(
+        self, connection: Connection, opts: Mapping[str, Any]
+    ) -> None:
+        supported_names = set(self.connection_characteristics).intersection(
+            opts
+        )
+        if supported_names:
+            characteristics: Mapping[str, Any] = util.immutabledict(
+                (name, opts[name]) for name in supported_names
+            )
+            self._set_connection_characteristics(connection, characteristics)
+
+    def _set_connection_characteristics(self, connection, characteristics):
+        characteristic_values = [
+            (name, self.connection_characteristics[name], value)
+            for name, value in characteristics.items()
+        ]
+
+        if connection.in_transaction():
+            trans_objs = [
+                (name, obj)
+                for name, obj, _ in characteristic_values
+                if obj.transactional
+            ]
+            if trans_objs:
+                raise exc.InvalidRequestError(
+                    "This connection has already initialized a SQLAlchemy "
+                    "Transaction() object via begin() or autobegin; "
+                    "%s may not be altered unless rollback() or commit() "
+                    "is called first."
+                    % (", ".join(name for name, obj in trans_objs))
+                )
+
+        dbapi_connection = connection.connection.dbapi_connection
+        for _, characteristic, value in characteristic_values:
+            characteristic.set_connection_characteristic(
+                self, connection, dbapi_connection, value
+            )
+        connection.connection._connection_record.finalize_callback.append(
+            functools.partial(self._reset_characteristics, characteristics)
+        )
+
+    def _reset_characteristics(self, characteristics, dbapi_connection):
+        for characteristic_name in characteristics:
+            characteristic = self.connection_characteristics[
+                characteristic_name
+            ]
+            characteristic.reset_characteristic(self, dbapi_connection)
+
+    def do_begin(self, dbapi_connection):
+        pass
+
+    def do_rollback(self, dbapi_connection):
+        dbapi_connection.rollback()
+
+    def do_commit(self, dbapi_connection):
+        dbapi_connection.commit()
+
+    def do_terminate(self, dbapi_connection):
+        self.do_close(dbapi_connection)
+
+    def do_close(self, dbapi_connection):
+        dbapi_connection.close()
+
+    @util.memoized_property
+    def _dialect_specific_select_one(self):
+        return str(expression.select(1).compile(dialect=self))
+
+    def _do_ping_w_event(self, dbapi_connection: DBAPIConnection) -> bool:
+        try:
+            return self.do_ping(dbapi_connection)
+        except self.loaded_dbapi.Error as err:
+            is_disconnect = self.is_disconnect(err, dbapi_connection, None)
+
+            if self._has_events:
+                try:
+                    Connection._handle_dbapi_exception_noconnection(
+                        err,
+                        self,
+                        is_disconnect=is_disconnect,
+                        invalidate_pool_on_disconnect=False,
+                        is_pre_ping=True,
+                    )
+                except exc.StatementError as new_err:
+                    is_disconnect = new_err.connection_invalidated
+
+            if is_disconnect:
+                return False
+            else:
+                raise
+
+    def do_ping(self, dbapi_connection: DBAPIConnection) -> bool:
+        cursor = None
+
+        cursor = dbapi_connection.cursor()
+        try:
+            cursor.execute(self._dialect_specific_select_one)
+        finally:
+            cursor.close()
+        return True
+
+    def create_xid(self):
+        """Create a random two-phase transaction ID.
+
+        This id will be passed to do_begin_twophase(), do_rollback_twophase(),
+        do_commit_twophase().  Its format is unspecified.
+        """
+
+        return "_sa_%032x" % random.randint(0, 2**128)
+
+    def do_savepoint(self, connection, name):
+        connection.execute(expression.SavepointClause(name))
+
+    def do_rollback_to_savepoint(self, connection, name):
+        connection.execute(expression.RollbackToSavepointClause(name))
+
+    def do_release_savepoint(self, connection, name):
+        connection.execute(expression.ReleaseSavepointClause(name))
+
+    def _deliver_insertmanyvalues_batches(
+        self,
+        connection,
+        cursor,
+        statement,
+        parameters,
+        generic_setinputsizes,
+        context,
+    ):
+        context = cast(DefaultExecutionContext, context)
+        compiled = cast(SQLCompiler, context.compiled)
+
+        _composite_sentinel_proc: Sequence[
+            Optional[_ResultProcessorType[Any]]
+        ] = ()
+        _scalar_sentinel_proc: Optional[_ResultProcessorType[Any]] = None
+        _sentinel_proc_initialized: bool = False
+
+        compiled_parameters = context.compiled_parameters
+
+        imv = compiled._insertmanyvalues
+        assert imv is not None
+
+        is_returning: Final[bool] = bool(compiled.effective_returning)
+        batch_size = context.execution_options.get(
+            "insertmanyvalues_page_size", self.insertmanyvalues_page_size
+        )
+
+        if compiled.schema_translate_map:
+            schema_translate_map = context.execution_options.get(
+                "schema_translate_map", {}
+            )
+        else:
+            schema_translate_map = None
+
+        if is_returning:
+            result: Optional[List[Any]] = []
+            context._insertmanyvalues_rows = result
+
+            sort_by_parameter_order = imv.sort_by_parameter_order
+
+        else:
+            sort_by_parameter_order = False
+            result = None
+
+        for imv_batch in compiled._deliver_insertmanyvalues_batches(
+            statement,
+            parameters,
+            compiled_parameters,
+            generic_setinputsizes,
+            batch_size,
+            sort_by_parameter_order,
+            schema_translate_map,
+        ):
+            yield imv_batch
+
+            if is_returning:
+
+                try:
+                    rows = context.fetchall_for_returning(cursor)
+                except BaseException as be:
+                    connection._handle_dbapi_exception(
+                        be,
+                        sql_util._long_statement(imv_batch.replaced_statement),
+                        imv_batch.replaced_parameters,
+                        None,
+                        context,
+                        is_sub_exec=True,
+                    )
+
+                # I would have thought "is_returning: Final[bool]"
+                # would have assured this but pylance thinks not
+                assert result is not None
+
+                if imv.num_sentinel_columns and not imv_batch.is_downgraded:
+                    composite_sentinel = imv.num_sentinel_columns > 1
+                    if imv.implicit_sentinel:
+                        # for implicit sentinel, which is currently single-col
+                        # integer autoincrement, do a simple sort.
+                        assert not composite_sentinel
+                        result.extend(
+                            sorted(rows, key=operator.itemgetter(-1))
+                        )
+                        continue
+
+                    # otherwise, create dictionaries to match up batches
+                    # with parameters
+                    assert imv.sentinel_param_keys
+                    assert imv.sentinel_columns
+
+                    _nsc = imv.num_sentinel_columns
+
+                    if not _sentinel_proc_initialized:
+                        if composite_sentinel:
+                            _composite_sentinel_proc = [
+                                col.type._cached_result_processor(
+                                    self, cursor_desc[1]
+                                )
+                                for col, cursor_desc in zip(
+                                    imv.sentinel_columns,
+                                    cursor.description[-_nsc:],
+                                )
+                            ]
+                        else:
+                            _scalar_sentinel_proc = (
+                                imv.sentinel_columns[0]
+                            ).type._cached_result_processor(
+                                self, cursor.description[-1][1]
+                            )
+                        _sentinel_proc_initialized = True
+
+                    rows_by_sentinel: Union[
+                        Dict[Tuple[Any, ...], Any],
+                        Dict[Any, Any],
+                    ]
+                    if composite_sentinel:
+                        rows_by_sentinel = {
+                            tuple(
+                                (proc(val) if proc else val)
+                                for val, proc in zip(
+                                    row[-_nsc:], _composite_sentinel_proc
+                                )
+                            ): row
+                            for row in rows
+                        }
+                    elif _scalar_sentinel_proc:
+                        rows_by_sentinel = {
+                            _scalar_sentinel_proc(row[-1]): row for row in rows
+                        }
+                    else:
+                        rows_by_sentinel = {row[-1]: row for row in rows}
+
+                    if len(rows_by_sentinel) != len(imv_batch.batch):
+                        # see test_insert_exec.py::
+                        # IMVSentinelTest::test_sentinel_incorrect_rowcount
+                        # for coverage / demonstration
+                        raise exc.InvalidRequestError(
+                            f"Sentinel-keyed result set did not produce "
+                            f"correct number of rows {len(imv_batch.batch)}; "
+                            "produced "
+                            f"{len(rows_by_sentinel)}.  Please ensure the "
+                            "sentinel column is fully unique and populated in "
+                            "all cases."
+                        )
+
+                    try:
+                        ordered_rows = [
+                            rows_by_sentinel[sentinel_keys]
+                            for sentinel_keys in imv_batch.sentinel_values
+                        ]
+                    except KeyError as ke:
+                        # see test_insert_exec.py::
+                        # IMVSentinelTest::test_sentinel_cant_match_keys
+                        # for coverage / demonstration
+                        raise exc.InvalidRequestError(
+                            f"Can't match sentinel values in result set to "
+                            f"parameter sets; key {ke.args[0]!r} was not "
+                            "found. "
+                            "There may be a mismatch between the datatype "
+                            "passed to the DBAPI driver vs. that which it "
+                            "returns in a result row.  Ensure the given "
+                            "Python value matches the expected result type "
+                            "*exactly*, taking care to not rely upon implicit "
+                            "conversions which may occur such as when using "
+                            "strings in place of UUID or integer values, etc. "
+                        ) from ke
+
+                    result.extend(ordered_rows)
+
+                else:
+                    result.extend(rows)
+
+    def do_executemany(self, cursor, statement, parameters, context=None):
+        cursor.executemany(statement, parameters)
+
+    def do_execute(self, cursor, statement, parameters, context=None):
+        cursor.execute(statement, parameters)
+
+    def do_execute_no_params(self, cursor, statement, context=None):
+        cursor.execute(statement)
+
+    def is_disconnect(self, e, connection, cursor):
+        return False
+
+    @util.memoized_instancemethod
+    def _gen_allowed_isolation_levels(self, dbapi_conn):
+        try:
+            raw_levels = list(self.get_isolation_level_values(dbapi_conn))
+        except NotImplementedError:
+            return None
+        else:
+            normalized_levels = [
+                level.replace("_", " ").upper() for level in raw_levels
+            ]
+            if raw_levels != normalized_levels:
+                raise ValueError(
+                    f"Dialect {self.name!r} get_isolation_level_values() "
+                    f"method should return names as UPPERCASE using spaces, "
+                    f"not underscores; got "
+                    f"{sorted(set(raw_levels).difference(normalized_levels))}"
+                )
+            return tuple(normalized_levels)
+
+    def _assert_and_set_isolation_level(self, dbapi_conn, level):
+        level = level.replace("_", " ").upper()
+
+        _allowed_isolation_levels = self._gen_allowed_isolation_levels(
+            dbapi_conn
+        )
+        if (
+            _allowed_isolation_levels
+            and level not in _allowed_isolation_levels
+        ):
+            raise exc.ArgumentError(
+                f"Invalid value {level!r} for isolation_level. "
+                f"Valid isolation levels for {self.name!r} are "
+                f"{', '.join(_allowed_isolation_levels)}"
+            )
+
+        self.set_isolation_level(dbapi_conn, level)
+
+    def reset_isolation_level(self, dbapi_conn):
+        if self._on_connect_isolation_level is not None:
+            assert (
+                self._on_connect_isolation_level == "AUTOCOMMIT"
+                or self._on_connect_isolation_level
+                == self.default_isolation_level
+            )
+            self._assert_and_set_isolation_level(
+                dbapi_conn, self._on_connect_isolation_level
+            )
+        else:
+            assert self.default_isolation_level is not None
+            self._assert_and_set_isolation_level(
+                dbapi_conn,
+                self.default_isolation_level,
+            )
+
+    def normalize_name(self, name):
+        if name is None:
+            return None
+
+        name_lower = name.lower()
+        name_upper = name.upper()
+
+        if name_upper == name_lower:
+            # name has no upper/lower conversion, e.g. non-european characters.
+            # return unchanged
+            return name
+        elif name_upper == name and not (
+            self.identifier_preparer._requires_quotes
+        )(name_lower):
+            # name is all uppercase and doesn't require quoting; normalize
+            # to all lower case
+            return name_lower
+        elif name_lower == name:
+            # name is all lower case, which if denormalized means we need to
+            # force quoting on it
+            return quoted_name(name, quote=True)
+        else:
+            # name is mixed case, means it will be quoted in SQL when used
+            # later, no normalizes
+            return name
+
+    def denormalize_name(self, name):
+        if name is None:
+            return None
+
+        name_lower = name.lower()
+        name_upper = name.upper()
+
+        if name_upper == name_lower:
+            # name has no upper/lower conversion, e.g. non-european characters.
+            # return unchanged
+            return name
+        elif name_lower == name and not (
+            self.identifier_preparer._requires_quotes
+        )(name_lower):
+            name = name_upper
+        return name
+
+    def get_driver_connection(self, connection):
+        return connection
+
+    def _overrides_default(self, method):
+        return (
+            getattr(type(self), method).__code__
+            is not getattr(DefaultDialect, method).__code__
+        )
+
+    def _default_multi_reflect(
+        self,
+        single_tbl_method,
+        connection,
+        kind,
+        schema,
+        filter_names,
+        scope,
+        **kw,
+    ):
+        names_fns = []
+        temp_names_fns = []
+        if ObjectKind.TABLE in kind:
+            names_fns.append(self.get_table_names)
+            temp_names_fns.append(self.get_temp_table_names)
+        if ObjectKind.VIEW in kind:
+            names_fns.append(self.get_view_names)
+            temp_names_fns.append(self.get_temp_view_names)
+        if ObjectKind.MATERIALIZED_VIEW in kind:
+            names_fns.append(self.get_materialized_view_names)
+            # no temp materialized view at the moment
+            # temp_names_fns.append(self.get_temp_materialized_view_names)
+
+        unreflectable = kw.pop("unreflectable", {})
+
+        if (
+            filter_names
+            and scope is ObjectScope.ANY
+            and kind is ObjectKind.ANY
+        ):
+            # if names are given and no qualification on type of table
+            # (i.e. the Table(..., autoload) case), take the names as given,
+            # don't run names queries. If a table does not exit
+            # NoSuchTableError is raised and it's skipped
+
+            # this also suits the case for mssql where we can reflect
+            # individual temp tables but there's no temp_names_fn
+            names = filter_names
+        else:
+            names = []
+            name_kw = {"schema": schema, **kw}
+            fns = []
+            if ObjectScope.DEFAULT in scope:
+                fns.extend(names_fns)
+            if ObjectScope.TEMPORARY in scope:
+                fns.extend(temp_names_fns)
+
+            for fn in fns:
+                try:
+                    names.extend(fn(connection, **name_kw))
+                except NotImplementedError:
+                    pass
+
+        if filter_names:
+            filter_names = set(filter_names)
+
+        # iterate over all the tables/views and call the single table method
+        for table in names:
+            if not filter_names or table in filter_names:
+                key = (schema, table)
+                try:
+                    yield (
+                        key,
+                        single_tbl_method(
+                            connection, table, schema=schema, **kw
+                        ),
+                    )
+                except exc.UnreflectableTableError as err:
+                    if key not in unreflectable:
+                        unreflectable[key] = err
+                except exc.NoSuchTableError:
+                    pass
+
+    def get_multi_table_options(self, connection, **kw):
+        return self._default_multi_reflect(
+            self.get_table_options, connection, **kw
+        )
+
+    def get_multi_columns(self, connection, **kw):
+        return self._default_multi_reflect(self.get_columns, connection, **kw)
+
+    def get_multi_pk_constraint(self, connection, **kw):
+        return self._default_multi_reflect(
+            self.get_pk_constraint, connection, **kw
+        )
+
+    def get_multi_foreign_keys(self, connection, **kw):
+        return self._default_multi_reflect(
+            self.get_foreign_keys, connection, **kw
+        )
+
+    def get_multi_indexes(self, connection, **kw):
+        return self._default_multi_reflect(self.get_indexes, connection, **kw)
+
+    def get_multi_unique_constraints(self, connection, **kw):
+        return self._default_multi_reflect(
+            self.get_unique_constraints, connection, **kw
+        )
+
+    def get_multi_check_constraints(self, connection, **kw):
+        return self._default_multi_reflect(
+            self.get_check_constraints, connection, **kw
+        )
+
+    def get_multi_table_comment(self, connection, **kw):
+        return self._default_multi_reflect(
+            self.get_table_comment, connection, **kw
+        )
+
+
+class StrCompileDialect(DefaultDialect):
+    statement_compiler = compiler.StrSQLCompiler
+    ddl_compiler = compiler.DDLCompiler
+    type_compiler_cls = compiler.StrSQLTypeCompiler
+    preparer = compiler.IdentifierPreparer
+
+    insert_returning = True
+    update_returning = True
+    delete_returning = True
+
+    supports_statement_cache = True
+
+    supports_identity_columns = True
+
+    supports_sequences = True
+    sequences_optional = True
+    preexecute_autoincrement_sequences = False
+
+    supports_native_boolean = True
+
+    supports_multivalues_insert = True
+    supports_simple_order_by_label = True
+
+
+class DefaultExecutionContext(ExecutionContext):
+    isinsert = False
+    isupdate = False
+    isdelete = False
+    is_crud = False
+    is_text = False
+    isddl = False
+
+    execute_style: ExecuteStyle = ExecuteStyle.EXECUTE
+
+    compiled: Optional[Compiled] = None
+    result_column_struct: Optional[
+        Tuple[List[ResultColumnsEntry], bool, bool, bool, bool]
+    ] = None
+    returned_default_rows: Optional[Sequence[Row[Any]]] = None
+
+    execution_options: _ExecuteOptions = util.EMPTY_DICT
+
+    cursor_fetch_strategy = _cursor._DEFAULT_FETCH
+
+    invoked_statement: Optional[Executable] = None
+
+    _is_implicit_returning = False
+    _is_explicit_returning = False
+    _is_supplemental_returning = False
+    _is_server_side = False
+
+    _soft_closed = False
+
+    _rowcount: Optional[int] = None
+
+    # a hook for SQLite's translation of
+    # result column names
+    # NOTE: pyhive is using this hook, can't remove it :(
+    _translate_colname: Optional[Callable[[str], str]] = None
+
+    _expanded_parameters: Mapping[str, List[str]] = util.immutabledict()
+    """used by set_input_sizes().
+
+    This collection comes from ``ExpandedState.parameter_expansion``.
+
+    """
+
+    cache_hit = NO_CACHE_KEY
+
+    root_connection: Connection
+    _dbapi_connection: PoolProxiedConnection
+    dialect: Dialect
+    unicode_statement: str
+    cursor: DBAPICursor
+    compiled_parameters: List[_MutableCoreSingleExecuteParams]
+    parameters: _DBAPIMultiExecuteParams
+    extracted_parameters: Optional[Sequence[BindParameter[Any]]]
+
+    _empty_dict_params = cast("Mapping[str, Any]", util.EMPTY_DICT)
+
+    _insertmanyvalues_rows: Optional[List[Tuple[Any, ...]]] = None
+    _num_sentinel_cols: int = 0
+
+    @classmethod
+    def _init_ddl(
+        cls,
+        dialect: Dialect,
+        connection: Connection,
+        dbapi_connection: PoolProxiedConnection,
+        execution_options: _ExecuteOptions,
+        compiled_ddl: DDLCompiler,
+    ) -> ExecutionContext:
+        """Initialize execution context for an ExecutableDDLElement
+        construct."""
+
+        self = cls.__new__(cls)
+        self.root_connection = connection
+        self._dbapi_connection = dbapi_connection
+        self.dialect = connection.dialect
+
+        self.compiled = compiled = compiled_ddl
+        self.isddl = True
+
+        self.execution_options = execution_options
+
+        self.unicode_statement = str(compiled)
+        if compiled.schema_translate_map:
+            schema_translate_map = self.execution_options.get(
+                "schema_translate_map", {}
+            )
+
+            rst = compiled.preparer._render_schema_translates
+            self.unicode_statement = rst(
+                self.unicode_statement, schema_translate_map
+            )
+
+        self.statement = self.unicode_statement
+
+        self.cursor = self.create_cursor()
+        self.compiled_parameters = []
+
+        if dialect.positional:
+            self.parameters = [dialect.execute_sequence_format()]
+        else:
+            self.parameters = [self._empty_dict_params]
+
+        return self
+
+    @classmethod
+    def _init_compiled(
+        cls,
+        dialect: Dialect,
+        connection: Connection,
+        dbapi_connection: PoolProxiedConnection,
+        execution_options: _ExecuteOptions,
+        compiled: SQLCompiler,
+        parameters: _CoreMultiExecuteParams,
+        invoked_statement: Executable,
+        extracted_parameters: Optional[Sequence[BindParameter[Any]]],
+        cache_hit: CacheStats = CacheStats.CACHING_DISABLED,
+    ) -> ExecutionContext:
+        """Initialize execution context for a Compiled construct."""
+
+        self = cls.__new__(cls)
+        self.root_connection = connection
+        self._dbapi_connection = dbapi_connection
+        self.dialect = connection.dialect
+        self.extracted_parameters = extracted_parameters
+        self.invoked_statement = invoked_statement
+        self.compiled = compiled
+        self.cache_hit = cache_hit
+
+        self.execution_options = execution_options
+
+        self.result_column_struct = (
+            compiled._result_columns,
+            compiled._ordered_columns,
+            compiled._textual_ordered_columns,
+            compiled._ad_hoc_textual,
+            compiled._loose_column_name_matching,
+        )
+
+        self.isinsert = ii = compiled.isinsert
+        self.isupdate = iu = compiled.isupdate
+        self.isdelete = id_ = compiled.isdelete
+        self.is_text = compiled.isplaintext
+
+        if ii or iu or id_:
+            dml_statement = compiled.compile_state.statement  # type: ignore
+            if TYPE_CHECKING:
+                assert isinstance(dml_statement, UpdateBase)
+            self.is_crud = True
+            self._is_explicit_returning = ier = bool(dml_statement._returning)
+            self._is_implicit_returning = iir = bool(
+                compiled.implicit_returning
+            )
+            if iir and dml_statement._supplemental_returning:
+                self._is_supplemental_returning = True
+
+            # dont mix implicit and explicit returning
+            assert not (iir and ier)
+
+            if (ier or iir) and compiled.for_executemany:
+                if ii and not self.dialect.insert_executemany_returning:
+                    raise exc.InvalidRequestError(
+                        f"Dialect {self.dialect.dialect_description} with "
+                        f"current server capabilities does not support "
+                        "INSERT..RETURNING when executemany is used"
+                    )
+                elif (
+                    ii
+                    and dml_statement._sort_by_parameter_order
+                    and not self.dialect.insert_executemany_returning_sort_by_parameter_order  # noqa: E501
+                ):
+                    raise exc.InvalidRequestError(
+                        f"Dialect {self.dialect.dialect_description} with "
+                        f"current server capabilities does not support "
+                        "INSERT..RETURNING with deterministic row ordering "
+                        "when executemany is used"
+                    )
+                elif (
+                    ii
+                    and self.dialect.use_insertmanyvalues
+                    and not compiled._insertmanyvalues
+                ):
+                    raise exc.InvalidRequestError(
+                        'Statement does not have "insertmanyvalues" '
+                        "enabled, can't use INSERT..RETURNING with "
+                        "executemany in this case."
+                    )
+                elif iu and not self.dialect.update_executemany_returning:
+                    raise exc.InvalidRequestError(
+                        f"Dialect {self.dialect.dialect_description} with "
+                        f"current server capabilities does not support "
+                        "UPDATE..RETURNING when executemany is used"
+                    )
+                elif id_ and not self.dialect.delete_executemany_returning:
+                    raise exc.InvalidRequestError(
+                        f"Dialect {self.dialect.dialect_description} with "
+                        f"current server capabilities does not support "
+                        "DELETE..RETURNING when executemany is used"
+                    )
+
+        if not parameters:
+            self.compiled_parameters = [
+                compiled.construct_params(
+                    extracted_parameters=extracted_parameters,
+                    escape_names=False,
+                )
+            ]
+        else:
+            self.compiled_parameters = [
+                compiled.construct_params(
+                    m,
+                    escape_names=False,
+                    _group_number=grp,
+                    extracted_parameters=extracted_parameters,
+                )
+                for grp, m in enumerate(parameters)
+            ]
+
+            if len(parameters) > 1:
+                if self.isinsert and compiled._insertmanyvalues:
+                    self.execute_style = ExecuteStyle.INSERTMANYVALUES
+
+                    imv = compiled._insertmanyvalues
+                    if imv.sentinel_columns is not None:
+                        self._num_sentinel_cols = imv.num_sentinel_columns
+                else:
+                    self.execute_style = ExecuteStyle.EXECUTEMANY
+
+        self.unicode_statement = compiled.string
+
+        self.cursor = self.create_cursor()
+
+        if self.compiled.insert_prefetch or self.compiled.update_prefetch:
+            self._process_execute_defaults()
+
+        processors = compiled._bind_processors
+
+        flattened_processors: Mapping[
+            str, _BindProcessorType[Any]
+        ] = processors  # type: ignore[assignment]
+
+        if compiled.literal_execute_params or compiled.post_compile_params:
+            if self.executemany:
+                raise exc.InvalidRequestError(
+                    "'literal_execute' or 'expanding' parameters can't be "
+                    "used with executemany()"
+                )
+
+            expanded_state = compiled._process_parameters_for_postcompile(
+                self.compiled_parameters[0]
+            )
+
+            # re-assign self.unicode_statement
+            self.unicode_statement = expanded_state.statement
+
+            self._expanded_parameters = expanded_state.parameter_expansion
+
+            flattened_processors = dict(processors)  # type: ignore
+            flattened_processors.update(expanded_state.processors)
+            positiontup = expanded_state.positiontup
+        elif compiled.positional:
+            positiontup = self.compiled.positiontup
+        else:
+            positiontup = None
+
+        if compiled.schema_translate_map:
+            schema_translate_map = self.execution_options.get(
+                "schema_translate_map", {}
+            )
+            rst = compiled.preparer._render_schema_translates
+            self.unicode_statement = rst(
+                self.unicode_statement, schema_translate_map
+            )
+
+        # final self.unicode_statement is now assigned, encode if needed
+        # by dialect
+        self.statement = self.unicode_statement
+
+        # Convert the dictionary of bind parameter values
+        # into a dict or list to be sent to the DBAPI's
+        # execute() or executemany() method.
+
+        if compiled.positional:
+            core_positional_parameters: MutableSequence[Sequence[Any]] = []
+            assert positiontup is not None
+            for compiled_params in self.compiled_parameters:
+                l_param: List[Any] = [
+                    (
+                        flattened_processors[key](compiled_params[key])
+                        if key in flattened_processors
+                        else compiled_params[key]
+                    )
+                    for key in positiontup
+                ]
+                core_positional_parameters.append(
+                    dialect.execute_sequence_format(l_param)
+                )
+
+            self.parameters = core_positional_parameters
+        else:
+            core_dict_parameters: MutableSequence[Dict[str, Any]] = []
+            escaped_names = compiled.escaped_bind_names
+
+            # note that currently, "expanded" parameters will be present
+            # in self.compiled_parameters in their quoted form.   This is
+            # slightly inconsistent with the approach taken as of
+            # #8056 where self.compiled_parameters is meant to contain unquoted
+            # param names.
+            d_param: Dict[str, Any]
+            for compiled_params in self.compiled_parameters:
+                if escaped_names:
+                    d_param = {
+                        escaped_names.get(key, key): (
+                            flattened_processors[key](compiled_params[key])
+                            if key in flattened_processors
+                            else compiled_params[key]
+                        )
+                        for key in compiled_params
+                    }
+                else:
+                    d_param = {
+                        key: (
+                            flattened_processors[key](compiled_params[key])
+                            if key in flattened_processors
+                            else compiled_params[key]
+                        )
+                        for key in compiled_params
+                    }
+
+                core_dict_parameters.append(d_param)
+
+            self.parameters = core_dict_parameters
+
+        return self
+
+    @classmethod
+    def _init_statement(
+        cls,
+        dialect: Dialect,
+        connection: Connection,
+        dbapi_connection: PoolProxiedConnection,
+        execution_options: _ExecuteOptions,
+        statement: str,
+        parameters: _DBAPIMultiExecuteParams,
+    ) -> ExecutionContext:
+        """Initialize execution context for a string SQL statement."""
+
+        self = cls.__new__(cls)
+        self.root_connection = connection
+        self._dbapi_connection = dbapi_connection
+        self.dialect = connection.dialect
+        self.is_text = True
+
+        self.execution_options = execution_options
+
+        if not parameters:
+            if self.dialect.positional:
+                self.parameters = [dialect.execute_sequence_format()]
+            else:
+                self.parameters = [self._empty_dict_params]
+        elif isinstance(parameters[0], dialect.execute_sequence_format):
+            self.parameters = parameters
+        elif isinstance(parameters[0], dict):
+            self.parameters = parameters
+        else:
+            self.parameters = [
+                dialect.execute_sequence_format(p) for p in parameters
+            ]
+
+        if len(parameters) > 1:
+            self.execute_style = ExecuteStyle.EXECUTEMANY
+
+        self.statement = self.unicode_statement = statement
+
+        self.cursor = self.create_cursor()
+        return self
+
+    @classmethod
+    def _init_default(
+        cls,
+        dialect: Dialect,
+        connection: Connection,
+        dbapi_connection: PoolProxiedConnection,
+        execution_options: _ExecuteOptions,
+    ) -> ExecutionContext:
+        """Initialize execution context for a ColumnDefault construct."""
+
+        self = cls.__new__(cls)
+        self.root_connection = connection
+        self._dbapi_connection = dbapi_connection
+        self.dialect = connection.dialect
+
+        self.execution_options = execution_options
+
+        self.cursor = self.create_cursor()
+        return self
+
+    def _get_cache_stats(self) -> str:
+        if self.compiled is None:
+            return "raw sql"
+
+        now = perf_counter()
+
+        ch = self.cache_hit
+
+        gen_time = self.compiled._gen_time
+        assert gen_time is not None
+
+        if ch is NO_CACHE_KEY:
+            return "no key %.5fs" % (now - gen_time,)
+        elif ch is CACHE_HIT:
+            return "cached since %.4gs ago" % (now - gen_time,)
+        elif ch is CACHE_MISS:
+            return "generated in %.5fs" % (now - gen_time,)
+        elif ch is CACHING_DISABLED:
+            if "_cache_disable_reason" in self.execution_options:
+                return "caching disabled (%s) %.5fs " % (
+                    self.execution_options["_cache_disable_reason"],
+                    now - gen_time,
+                )
+            else:
+                return "caching disabled %.5fs" % (now - gen_time,)
+        elif ch is NO_DIALECT_SUPPORT:
+            return "dialect %s+%s does not support caching %.5fs" % (
+                self.dialect.name,
+                self.dialect.driver,
+                now - gen_time,
+            )
+        else:
+            return "unknown"
+
+    @property
+    def executemany(self):
+        return self.execute_style in (
+            ExecuteStyle.EXECUTEMANY,
+            ExecuteStyle.INSERTMANYVALUES,
+        )
+
+    @util.memoized_property
+    def identifier_preparer(self):
+        if self.compiled:
+            return self.compiled.preparer
+        elif "schema_translate_map" in self.execution_options:
+            return self.dialect.identifier_preparer._with_schema_translate(
+                self.execution_options["schema_translate_map"]
+            )
+        else:
+            return self.dialect.identifier_preparer
+
+    @util.memoized_property
+    def engine(self):
+        return self.root_connection.engine
+
+    @util.memoized_property
+    def postfetch_cols(self) -> Optional[Sequence[Column[Any]]]:
+        if TYPE_CHECKING:
+            assert isinstance(self.compiled, SQLCompiler)
+        return self.compiled.postfetch
+
+    @util.memoized_property
+    def prefetch_cols(self) -> Optional[Sequence[Column[Any]]]:
+        if TYPE_CHECKING:
+            assert isinstance(self.compiled, SQLCompiler)
+        if self.isinsert:
+            return self.compiled.insert_prefetch
+        elif self.isupdate:
+            return self.compiled.update_prefetch
+        else:
+            return ()
+
+    @util.memoized_property
+    def no_parameters(self):
+        return self.execution_options.get("no_parameters", False)
+
+    def _execute_scalar(self, stmt, type_, parameters=None):
+        """Execute a string statement on the current cursor, returning a
+        scalar result.
+
+        Used to fire off sequences, default phrases, and "select lastrowid"
+        types of statements individually or in the context of a parent INSERT
+        or UPDATE statement.
+
+        """
+
+        conn = self.root_connection
+
+        if "schema_translate_map" in self.execution_options:
+            schema_translate_map = self.execution_options.get(
+                "schema_translate_map", {}
+            )
+
+            rst = self.identifier_preparer._render_schema_translates
+            stmt = rst(stmt, schema_translate_map)
+
+        if not parameters:
+            if self.dialect.positional:
+                parameters = self.dialect.execute_sequence_format()
+            else:
+                parameters = {}
+
+        conn._cursor_execute(self.cursor, stmt, parameters, context=self)
+        row = self.cursor.fetchone()
+        if row is not None:
+            r = row[0]
+        else:
+            r = None
+        if type_ is not None:
+            # apply type post processors to the result
+            proc = type_._cached_result_processor(
+                self.dialect, self.cursor.description[0][1]
+            )
+            if proc:
+                return proc(r)
+        return r
+
+    @util.memoized_property
+    def connection(self):
+        return self.root_connection
+
+    def _use_server_side_cursor(self):
+        if not self.dialect.supports_server_side_cursors:
+            return False
+
+        if self.dialect.server_side_cursors:
+            # this is deprecated
+            use_server_side = self.execution_options.get(
+                "stream_results", True
+            ) and (
+                self.compiled
+                and isinstance(self.compiled.statement, expression.Selectable)
+                or (
+                    (
+                        not self.compiled
+                        or isinstance(
+                            self.compiled.statement, expression.TextClause
+                        )
+                    )
+                    and self.unicode_statement
+                    and SERVER_SIDE_CURSOR_RE.match(self.unicode_statement)
+                )
+            )
+        else:
+            use_server_side = self.execution_options.get(
+                "stream_results", False
+            )
+
+        return use_server_side
+
+    def create_cursor(self):
+        if (
+            # inlining initial preference checks for SS cursors
+            self.dialect.supports_server_side_cursors
+            and (
+                self.execution_options.get("stream_results", False)
+                or (
+                    self.dialect.server_side_cursors
+                    and self._use_server_side_cursor()
+                )
+            )
+        ):
+            self._is_server_side = True
+            return self.create_server_side_cursor()
+        else:
+            self._is_server_side = False
+            return self.create_default_cursor()
+
+    def fetchall_for_returning(self, cursor):
+        return cursor.fetchall()
+
+    def create_default_cursor(self):
+        return self._dbapi_connection.cursor()
+
+    def create_server_side_cursor(self):
+        raise NotImplementedError()
+
+    def pre_exec(self):
+        pass
+
+    def get_out_parameter_values(self, names):
+        raise NotImplementedError(
+            "This dialect does not support OUT parameters"
+        )
+
+    def post_exec(self):
+        pass
+
+    def get_result_processor(self, type_, colname, coltype):
+        """Return a 'result processor' for a given type as present in
+        cursor.description.
+
+        This has a default implementation that dialects can override
+        for context-sensitive result type handling.
+
+        """
+        return type_._cached_result_processor(self.dialect, coltype)
+
+    def get_lastrowid(self):
+        """return self.cursor.lastrowid, or equivalent, after an INSERT.
+
+        This may involve calling special cursor functions, issuing a new SELECT
+        on the cursor (or a new one), or returning a stored value that was
+        calculated within post_exec().
+
+        This function will only be called for dialects which support "implicit"
+        primary key generation, keep preexecute_autoincrement_sequences set to
+        False, and when no explicit id value was bound to the statement.
+
+        The function is called once for an INSERT statement that would need to
+        return the last inserted primary key for those dialects that make use
+        of the lastrowid concept.  In these cases, it is called directly after
+        :meth:`.ExecutionContext.post_exec`.
+
+        """
+        return self.cursor.lastrowid
+
+    def handle_dbapi_exception(self, e):
+        pass
+
+    @util.non_memoized_property
+    def rowcount(self) -> int:
+        if self._rowcount is not None:
+            return self._rowcount
+        else:
+            return self.cursor.rowcount
+
+    @property
+    def _has_rowcount(self):
+        return self._rowcount is not None
+
+    def supports_sane_rowcount(self):
+        return self.dialect.supports_sane_rowcount
+
+    def supports_sane_multi_rowcount(self):
+        return self.dialect.supports_sane_multi_rowcount
+
+    def _setup_result_proxy(self):
+        exec_opt = self.execution_options
+
+        if self._rowcount is None and exec_opt.get("preserve_rowcount", False):
+            self._rowcount = self.cursor.rowcount
+
+        if self.is_crud or self.is_text:
+            result = self._setup_dml_or_text_result()
+            yp = sr = False
+        else:
+            yp = exec_opt.get("yield_per", None)
+            sr = self._is_server_side or exec_opt.get("stream_results", False)
+            strategy = self.cursor_fetch_strategy
+            if sr and strategy is _cursor._DEFAULT_FETCH:
+                strategy = _cursor.BufferedRowCursorFetchStrategy(
+                    self.cursor, self.execution_options
+                )
+            cursor_description: _DBAPICursorDescription = (
+                strategy.alternate_cursor_description
+                or self.cursor.description
+            )
+            if cursor_description is None:
+                strategy = _cursor._NO_CURSOR_DQL
+
+            result = _cursor.CursorResult(self, strategy, cursor_description)
+
+        compiled = self.compiled
+
+        if (
+            compiled
+            and not self.isddl
+            and cast(SQLCompiler, compiled).has_out_parameters
+        ):
+            self._setup_out_parameters(result)
+
+        self._soft_closed = result._soft_closed
+
+        if yp:
+            result = result.yield_per(yp)
+
+        return result
+
+    def _setup_out_parameters(self, result):
+        compiled = cast(SQLCompiler, self.compiled)
+
+        out_bindparams = [
+            (param, name)
+            for param, name in compiled.bind_names.items()
+            if param.isoutparam
+        ]
+        out_parameters = {}
+
+        for bindparam, raw_value in zip(
+            [param for param, name in out_bindparams],
+            self.get_out_parameter_values(
+                [name for param, name in out_bindparams]
+            ),
+        ):
+            type_ = bindparam.type
+            impl_type = type_.dialect_impl(self.dialect)
+            dbapi_type = impl_type.get_dbapi_type(self.dialect.loaded_dbapi)
+            result_processor = impl_type.result_processor(
+                self.dialect, dbapi_type
+            )
+            if result_processor is not None:
+                raw_value = result_processor(raw_value)
+            out_parameters[bindparam.key] = raw_value
+
+        result.out_parameters = out_parameters
+
+    def _setup_dml_or_text_result(self):
+        compiled = cast(SQLCompiler, self.compiled)
+
+        strategy: ResultFetchStrategy = self.cursor_fetch_strategy
+
+        if self.isinsert:
+            if (
+                self.execute_style is ExecuteStyle.INSERTMANYVALUES
+                and compiled.effective_returning
+            ):
+                strategy = _cursor.FullyBufferedCursorFetchStrategy(
+                    self.cursor,
+                    initial_buffer=self._insertmanyvalues_rows,
+                    # maintain alt cursor description if set by the
+                    # dialect, e.g. mssql preserves it
+                    alternate_description=(
+                        strategy.alternate_cursor_description
+                    ),
+                )
+
+            if compiled.postfetch_lastrowid:
+                self.inserted_primary_key_rows = (
+                    self._setup_ins_pk_from_lastrowid()
+                )
+            # else if not self._is_implicit_returning,
+            # the default inserted_primary_key_rows accessor will
+            # return an "empty" primary key collection when accessed.
+
+        if self._is_server_side and strategy is _cursor._DEFAULT_FETCH:
+            strategy = _cursor.BufferedRowCursorFetchStrategy(
+                self.cursor, self.execution_options
+            )
+
+        if strategy is _cursor._NO_CURSOR_DML:
+            cursor_description = None
+        else:
+            cursor_description = (
+                strategy.alternate_cursor_description
+                or self.cursor.description
+            )
+
+        if cursor_description is None:
+            strategy = _cursor._NO_CURSOR_DML
+        elif self._num_sentinel_cols:
+            assert self.execute_style is ExecuteStyle.INSERTMANYVALUES
+            # strip out the sentinel columns from cursor description
+            # a similar logic is done to the rows only in CursorResult
+            cursor_description = cursor_description[
+                0 : -self._num_sentinel_cols
+            ]
+
+        result: _cursor.CursorResult[Any] = _cursor.CursorResult(
+            self, strategy, cursor_description
+        )
+
+        if self.isinsert:
+            if self._is_implicit_returning:
+                rows = result.all()
+
+                self.returned_default_rows = rows
+
+                self.inserted_primary_key_rows = (
+                    self._setup_ins_pk_from_implicit_returning(result, rows)
+                )
+
+                # test that it has a cursor metadata that is accurate. the
+                # first row will have been fetched and current assumptions
+                # are that the result has only one row, until executemany()
+                # support is added here.
+                assert result._metadata.returns_rows
+
+                # Insert statement has both return_defaults() and
+                # returning().  rewind the result on the list of rows
+                # we just used.
+                if self._is_supplemental_returning:
+                    result._rewind(rows)
+                else:
+                    result._soft_close()
+            elif not self._is_explicit_returning:
+                result._soft_close()
+
+                # we assume here the result does not return any rows.
+                # *usually*, this will be true.  However, some dialects
+                # such as that of MSSQL/pyodbc need to SELECT a post fetch
+                # function so this is not necessarily true.
+                # assert not result.returns_rows
+
+        elif self._is_implicit_returning:
+            rows = result.all()
+
+            if rows:
+                self.returned_default_rows = rows
+            self._rowcount = len(rows)
+
+            if self._is_supplemental_returning:
+                result._rewind(rows)
+            else:
+                result._soft_close()
+
+            # test that it has a cursor metadata that is accurate.
+            # the rows have all been fetched however.
+            assert result._metadata.returns_rows
+
+        elif not result._metadata.returns_rows:
+            # no results, get rowcount
+            # (which requires open cursor on some drivers)
+            if self._rowcount is None:
+                self._rowcount = self.cursor.rowcount
+            result._soft_close()
+        elif self.isupdate or self.isdelete:
+            if self._rowcount is None:
+                self._rowcount = self.cursor.rowcount
+        return result
+
+    @util.memoized_property
+    def inserted_primary_key_rows(self):
+        # if no specific "get primary key" strategy was set up
+        # during execution, return a "default" primary key based
+        # on what's in the compiled_parameters and nothing else.
+        return self._setup_ins_pk_from_empty()
+
+    def _setup_ins_pk_from_lastrowid(self):
+        getter = cast(
+            SQLCompiler, self.compiled
+        )._inserted_primary_key_from_lastrowid_getter
+        lastrowid = self.get_lastrowid()
+        return [getter(lastrowid, self.compiled_parameters[0])]
+
+    def _setup_ins_pk_from_empty(self):
+        getter = cast(
+            SQLCompiler, self.compiled
+        )._inserted_primary_key_from_lastrowid_getter
+        return [getter(None, param) for param in self.compiled_parameters]
+
+    def _setup_ins_pk_from_implicit_returning(self, result, rows):
+        if not rows:
+            return []
+
+        getter = cast(
+            SQLCompiler, self.compiled
+        )._inserted_primary_key_from_returning_getter
+        compiled_params = self.compiled_parameters
+
+        return [
+            getter(row, param) for row, param in zip(rows, compiled_params)
+        ]
+
+    def lastrow_has_defaults(self):
+        return (self.isinsert or self.isupdate) and bool(
+            cast(SQLCompiler, self.compiled).postfetch
+        )
+
+    def _prepare_set_input_sizes(
+        self,
+    ) -> Optional[List[Tuple[str, Any, TypeEngine[Any]]]]:
+        """Given a cursor and ClauseParameters, prepare arguments
+        in order to call the appropriate
+        style of ``setinputsizes()`` on the cursor, using DB-API types
+        from the bind parameter's ``TypeEngine`` objects.
+
+        This method only called by those dialects which set the
+        :attr:`.Dialect.bind_typing` attribute to
+        :attr:`.BindTyping.SETINPUTSIZES`.  Python-oracledb and cx_Oracle are
+        the only DBAPIs that requires setinputsizes(); pyodbc offers it as an
+        option.
+
+        Prior to SQLAlchemy 2.0, the setinputsizes() approach was also used
+        for pg8000 and asyncpg, which has been changed to inline rendering
+        of casts.
+
+        """
+        if self.isddl or self.is_text:
+            return None
+
+        compiled = cast(SQLCompiler, self.compiled)
+
+        inputsizes = compiled._get_set_input_sizes_lookup()
+
+        if inputsizes is None:
+            return None
+
+        dialect = self.dialect
+
+        # all of the rest of this... cython?
+
+        if dialect._has_events:
+            inputsizes = dict(inputsizes)
+            dialect.dispatch.do_setinputsizes(
+                inputsizes, self.cursor, self.statement, self.parameters, self
+            )
+
+        if compiled.escaped_bind_names:
+            escaped_bind_names = compiled.escaped_bind_names
+        else:
+            escaped_bind_names = None
+
+        if dialect.positional:
+            items = [
+                (key, compiled.binds[key])
+                for key in compiled.positiontup or ()
+            ]
+        else:
+            items = [
+                (key, bindparam)
+                for bindparam, key in compiled.bind_names.items()
+            ]
+
+        generic_inputsizes: List[Tuple[str, Any, TypeEngine[Any]]] = []
+        for key, bindparam in items:
+            if bindparam in compiled.literal_execute_params:
+                continue
+
+            if key in self._expanded_parameters:
+                if is_tuple_type(bindparam.type):
+                    num = len(bindparam.type.types)
+                    dbtypes = inputsizes[bindparam]
+                    generic_inputsizes.extend(
+                        (
+                            (
+                                escaped_bind_names.get(paramname, paramname)
+                                if escaped_bind_names is not None
+                                else paramname
+                            ),
+                            dbtypes[idx % num],
+                            bindparam.type.types[idx % num],
+                        )
+                        for idx, paramname in enumerate(
+                            self._expanded_parameters[key]
+                        )
+                    )
+                else:
+                    dbtype = inputsizes.get(bindparam, None)
+                    generic_inputsizes.extend(
+                        (
+                            (
+                                escaped_bind_names.get(paramname, paramname)
+                                if escaped_bind_names is not None
+                                else paramname
+                            ),
+                            dbtype,
+                            bindparam.type,
+                        )
+                        for paramname in self._expanded_parameters[key]
+                    )
+            else:
+                dbtype = inputsizes.get(bindparam, None)
+
+                escaped_name = (
+                    escaped_bind_names.get(key, key)
+                    if escaped_bind_names is not None
+                    else key
+                )
+
+                generic_inputsizes.append(
+                    (escaped_name, dbtype, bindparam.type)
+                )
+
+        return generic_inputsizes
+
+    def _exec_default(self, column, default, type_):
+        if default.is_sequence:
+            return self.fire_sequence(default, type_)
+        elif default.is_callable:
+            # this codepath is not normally used as it's inlined
+            # into _process_execute_defaults
+            self.current_column = column
+            return default.arg(self)
+        elif default.is_clause_element:
+            return self._exec_default_clause_element(column, default, type_)
+        else:
+            # this codepath is not normally used as it's inlined
+            # into _process_execute_defaults
+            return default.arg
+
+    def _exec_default_clause_element(self, column, default, type_):
+        # execute a default that's a complete clause element.  Here, we have
+        # to re-implement a miniature version of the compile->parameters->
+        # cursor.execute() sequence, since we don't want to modify the state
+        # of the connection  / result in progress or create new connection/
+        # result objects etc.
+        # .. versionchanged:: 1.4
+
+        if not default._arg_is_typed:
+            default_arg = expression.type_coerce(default.arg, type_)
+        else:
+            default_arg = default.arg
+        compiled = expression.select(default_arg).compile(dialect=self.dialect)
+        compiled_params = compiled.construct_params()
+        processors = compiled._bind_processors
+        if compiled.positional:
+            parameters = self.dialect.execute_sequence_format(
+                [
+                    (
+                        processors[key](compiled_params[key])  # type: ignore
+                        if key in processors
+                        else compiled_params[key]
+                    )
+                    for key in compiled.positiontup or ()
+                ]
+            )
+        else:
+            parameters = {
+                key: (
+                    processors[key](compiled_params[key])  # type: ignore
+                    if key in processors
+                    else compiled_params[key]
+                )
+                for key in compiled_params
+            }
+        return self._execute_scalar(
+            str(compiled), type_, parameters=parameters
+        )
+
+    current_parameters: Optional[_CoreSingleExecuteParams] = None
+    """A dictionary of parameters applied to the current row.
+
+    This attribute is only available in the context of a user-defined default
+    generation function, e.g. as described at :ref:`context_default_functions`.
+    It consists of a dictionary which includes entries for each column/value
+    pair that is to be part of the INSERT or UPDATE statement. The keys of the
+    dictionary will be the key value of each :class:`_schema.Column`,
+    which is usually
+    synonymous with the name.
+
+    Note that the :attr:`.DefaultExecutionContext.current_parameters` attribute
+    does not accommodate for the "multi-values" feature of the
+    :meth:`_expression.Insert.values` method.  The
+    :meth:`.DefaultExecutionContext.get_current_parameters` method should be
+    preferred.
+
+    .. seealso::
+
+        :meth:`.DefaultExecutionContext.get_current_parameters`
+
+        :ref:`context_default_functions`
+
+    """
+
+    def get_current_parameters(self, isolate_multiinsert_groups=True):
+        """Return a dictionary of parameters applied to the current row.
+
+        This method can only be used in the context of a user-defined default
+        generation function, e.g. as described at
+        :ref:`context_default_functions`. When invoked, a dictionary is
+        returned which includes entries for each column/value pair that is part
+        of the INSERT or UPDATE statement. The keys of the dictionary will be
+        the key value of each :class:`_schema.Column`,
+        which is usually synonymous
+        with the name.
+
+        :param isolate_multiinsert_groups=True: indicates that multi-valued
+         INSERT constructs created using :meth:`_expression.Insert.values`
+         should be
+         handled by returning only the subset of parameters that are local
+         to the current column default invocation.   When ``False``, the
+         raw parameters of the statement are returned including the
+         naming convention used in the case of multi-valued INSERT.
+
+        .. versionadded:: 1.2  added
+           :meth:`.DefaultExecutionContext.get_current_parameters`
+           which provides more functionality over the existing
+           :attr:`.DefaultExecutionContext.current_parameters`
+           attribute.
+
+        .. seealso::
+
+            :attr:`.DefaultExecutionContext.current_parameters`
+
+            :ref:`context_default_functions`
+
+        """
+        try:
+            parameters = self.current_parameters
+            column = self.current_column
+        except AttributeError:
+            raise exc.InvalidRequestError(
+                "get_current_parameters() can only be invoked in the "
+                "context of a Python side column default function"
+            )
+        else:
+            assert column is not None
+            assert parameters is not None
+        compile_state = cast(
+            "DMLState", cast(SQLCompiler, self.compiled).compile_state
+        )
+        assert compile_state is not None
+        if (
+            isolate_multiinsert_groups
+            and dml.isinsert(compile_state)
+            and compile_state._has_multi_parameters
+        ):
+            if column._is_multiparam_column:
+                index = column.index + 1
+                d = {column.original.key: parameters[column.key]}
+            else:
+                d = {column.key: parameters[column.key]}
+                index = 0
+            assert compile_state._dict_parameters is not None
+            keys = compile_state._dict_parameters.keys()
+            d.update(
+                (key, parameters["%s_m%d" % (key, index)]) for key in keys
+            )
+            return d
+        else:
+            return parameters
+
+    def get_insert_default(self, column):
+        if column.default is None:
+            return None
+        else:
+            return self._exec_default(column, column.default, column.type)
+
+    def get_update_default(self, column):
+        if column.onupdate is None:
+            return None
+        else:
+            return self._exec_default(column, column.onupdate, column.type)
+
+    def _process_execute_defaults(self):
+        compiled = cast(SQLCompiler, self.compiled)
+
+        key_getter = compiled._within_exec_param_key_getter
+
+        sentinel_counter = 0
+
+        if compiled.insert_prefetch:
+            prefetch_recs = [
+                (
+                    c,
+                    key_getter(c),
+                    c._default_description_tuple,
+                    self.get_insert_default,
+                )
+                for c in compiled.insert_prefetch
+            ]
+        elif compiled.update_prefetch:
+            prefetch_recs = [
+                (
+                    c,
+                    key_getter(c),
+                    c._onupdate_description_tuple,
+                    self.get_update_default,
+                )
+                for c in compiled.update_prefetch
+            ]
+        else:
+            prefetch_recs = []
+
+        for param in self.compiled_parameters:
+            self.current_parameters = param
+
+            for (
+                c,
+                param_key,
+                (arg, is_scalar, is_callable, is_sentinel),
+                fallback,
+            ) in prefetch_recs:
+                if is_sentinel:
+                    param[param_key] = sentinel_counter
+                    sentinel_counter += 1
+                elif is_scalar:
+                    param[param_key] = arg
+                elif is_callable:
+                    self.current_column = c
+                    param[param_key] = arg(self)
+                else:
+                    val = fallback(c)
+                    if val is not None:
+                        param[param_key] = val
+
+        del self.current_parameters
+
+
+DefaultDialect.execution_ctx_cls = DefaultExecutionContext