about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/sql/base.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sqlalchemy/sql/base.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/sql/base.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/sql/base.py2185
1 files changed, 2185 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/sql/base.py b/.venv/lib/python3.12/site-packages/sqlalchemy/sql/base.py
new file mode 100644
index 00000000..7ccef84e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/sql/base.py
@@ -0,0 +1,2185 @@
+# sql/base.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: allow-untyped-defs, allow-untyped-calls
+
+"""Foundational utilities common to many sql modules.
+
+"""
+
+
+from __future__ import annotations
+
+import collections
+from enum import Enum
+import itertools
+from itertools import zip_longest
+import operator
+import re
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import FrozenSet
+from typing import Generic
+from typing import Iterable
+from typing import Iterator
+from typing import List
+from typing import Mapping
+from typing import MutableMapping
+from typing import NamedTuple
+from typing import NoReturn
+from typing import Optional
+from typing import overload
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from . import roles
+from . import visitors
+from .cache_key import HasCacheKey  # noqa
+from .cache_key import MemoizedHasCacheKey  # noqa
+from .traversals import HasCopyInternals  # noqa
+from .visitors import ClauseVisitor
+from .visitors import ExtendedInternalTraversal
+from .visitors import ExternallyTraversible
+from .visitors import InternalTraversal
+from .. import event
+from .. import exc
+from .. import util
+from ..util import HasMemoized as HasMemoized
+from ..util import hybridmethod
+from ..util import typing as compat_typing
+from ..util.typing import Protocol
+from ..util.typing import Self
+from ..util.typing import TypeGuard
+
+if TYPE_CHECKING:
+    from . import coercions
+    from . import elements
+    from . import type_api
+    from ._orm_types import DMLStrategyArgument
+    from ._orm_types import SynchronizeSessionArgument
+    from ._typing import _CLE
+    from .compiler import SQLCompiler
+    from .elements import BindParameter
+    from .elements import ClauseList
+    from .elements import ColumnClause  # noqa
+    from .elements import ColumnElement
+    from .elements import NamedColumn
+    from .elements import SQLCoreOperations
+    from .elements import TextClause
+    from .schema import Column
+    from .schema import DefaultGenerator
+    from .selectable import _JoinTargetElement
+    from .selectable import _SelectIterable
+    from .selectable import FromClause
+    from ..engine import Connection
+    from ..engine import CursorResult
+    from ..engine.interfaces import _CoreMultiExecuteParams
+    from ..engine.interfaces import _ExecuteOptions
+    from ..engine.interfaces import _ImmutableExecuteOptions
+    from ..engine.interfaces import CacheStats
+    from ..engine.interfaces import Compiled
+    from ..engine.interfaces import CompiledCacheType
+    from ..engine.interfaces import CoreExecuteOptionsParameter
+    from ..engine.interfaces import Dialect
+    from ..engine.interfaces import IsolationLevel
+    from ..engine.interfaces import SchemaTranslateMapType
+    from ..event import dispatcher
+
+if not TYPE_CHECKING:
+    coercions = None  # noqa
+    elements = None  # noqa
+    type_api = None  # noqa
+
+
+class _NoArg(Enum):
+    NO_ARG = 0
+
+    def __repr__(self):
+        return f"_NoArg.{self.name}"
+
+
+NO_ARG = _NoArg.NO_ARG
+
+
+class _NoneName(Enum):
+    NONE_NAME = 0
+    """indicate a 'deferred' name that was ultimately the value None."""
+
+
+_NONE_NAME = _NoneName.NONE_NAME
+
+_T = TypeVar("_T", bound=Any)
+
+_Fn = TypeVar("_Fn", bound=Callable[..., Any])
+
+_AmbiguousTableNameMap = MutableMapping[str, str]
+
+
+class _DefaultDescriptionTuple(NamedTuple):
+    arg: Any
+    is_scalar: Optional[bool]
+    is_callable: Optional[bool]
+    is_sentinel: Optional[bool]
+
+    @classmethod
+    def _from_column_default(
+        cls, default: Optional[DefaultGenerator]
+    ) -> _DefaultDescriptionTuple:
+        return (
+            _DefaultDescriptionTuple(
+                default.arg,  # type: ignore
+                default.is_scalar,
+                default.is_callable,
+                default.is_sentinel,
+            )
+            if default
+            and (
+                default.has_arg
+                or (not default.for_update and default.is_sentinel)
+            )
+            else _DefaultDescriptionTuple(None, None, None, None)
+        )
+
+
+_never_select_column = operator.attrgetter("_omit_from_statements")
+
+
+class _EntityNamespace(Protocol):
+    def __getattr__(self, key: str) -> SQLCoreOperations[Any]: ...
+
+
+class _HasEntityNamespace(Protocol):
+    @util.ro_non_memoized_property
+    def entity_namespace(self) -> _EntityNamespace: ...
+
+
+def _is_has_entity_namespace(element: Any) -> TypeGuard[_HasEntityNamespace]:
+    return hasattr(element, "entity_namespace")
+
+
+# Remove when https://github.com/python/mypy/issues/14640 will be fixed
+_Self = TypeVar("_Self", bound=Any)
+
+
+class Immutable:
+    """mark a ClauseElement as 'immutable' when expressions are cloned.
+
+    "immutable" objects refers to the "mutability" of an object in the
+    context of SQL DQL and DML generation.   Such as, in DQL, one can
+    compose a SELECT or subquery of varied forms, but one cannot modify
+    the structure of a specific table or column within DQL.
+    :class:`.Immutable` is mostly intended to follow this concept, and as
+    such the primary "immutable" objects are :class:`.ColumnClause`,
+    :class:`.Column`, :class:`.TableClause`, :class:`.Table`.
+
+    """
+
+    __slots__ = ()
+
+    _is_immutable = True
+
+    def unique_params(self, *optionaldict, **kwargs):
+        raise NotImplementedError("Immutable objects do not support copying")
+
+    def params(self, *optionaldict, **kwargs):
+        raise NotImplementedError("Immutable objects do not support copying")
+
+    def _clone(self: _Self, **kw: Any) -> _Self:
+        return self
+
+    def _copy_internals(
+        self, *, omit_attrs: Iterable[str] = (), **kw: Any
+    ) -> None:
+        pass
+
+
+class SingletonConstant(Immutable):
+    """Represent SQL constants like NULL, TRUE, FALSE"""
+
+    _is_singleton_constant = True
+
+    _singleton: SingletonConstant
+
+    def __new__(cls: _T, *arg: Any, **kw: Any) -> _T:
+        return cast(_T, cls._singleton)
+
+    @util.non_memoized_property
+    def proxy_set(self) -> FrozenSet[ColumnElement[Any]]:
+        raise NotImplementedError()
+
+    @classmethod
+    def _create_singleton(cls):
+        obj = object.__new__(cls)
+        obj.__init__()  # type: ignore
+
+        # for a long time this was an empty frozenset, meaning
+        # a SingletonConstant would never be a "corresponding column" in
+        # a statement.  This referred to #6259.  However, in #7154 we see
+        # that we do in fact need "correspondence" to work when matching cols
+        # in result sets, so the non-correspondence was moved to a more
+        # specific level when we are actually adapting expressions for SQL
+        # render only.
+        obj.proxy_set = frozenset([obj])
+        cls._singleton = obj
+
+
+def _from_objects(
+    *elements: Union[
+        ColumnElement[Any], FromClause, TextClause, _JoinTargetElement
+    ]
+) -> Iterator[FromClause]:
+    return itertools.chain.from_iterable(
+        [element._from_objects for element in elements]
+    )
+
+
+def _select_iterables(
+    elements: Iterable[roles.ColumnsClauseRole],
+) -> _SelectIterable:
+    """expand tables into individual columns in the
+    given list of column expressions.
+
+    """
+    return itertools.chain.from_iterable(
+        [c._select_iterable for c in elements]
+    )
+
+
+_SelfGenerativeType = TypeVar("_SelfGenerativeType", bound="_GenerativeType")
+
+
+class _GenerativeType(compat_typing.Protocol):
+    def _generate(self) -> Self: ...
+
+
+def _generative(fn: _Fn) -> _Fn:
+    """non-caching _generative() decorator.
+
+    This is basically the legacy decorator that copies the object and
+    runs a method on the new copy.
+
+    """
+
+    @util.decorator
+    def _generative(
+        fn: _Fn, self: _SelfGenerativeType, *args: Any, **kw: Any
+    ) -> _SelfGenerativeType:
+        """Mark a method as generative."""
+
+        self = self._generate()
+        x = fn(self, *args, **kw)
+        assert x is self, "generative methods must return self"
+        return self
+
+    decorated = _generative(fn)
+    decorated.non_generative = fn  # type: ignore
+    return decorated
+
+
+def _exclusive_against(*names: str, **kw: Any) -> Callable[[_Fn], _Fn]:
+    msgs = kw.pop("msgs", {})
+
+    defaults = kw.pop("defaults", {})
+
+    getters = [
+        (name, operator.attrgetter(name), defaults.get(name, None))
+        for name in names
+    ]
+
+    @util.decorator
+    def check(fn, *args, **kw):
+        # make pylance happy by not including "self" in the argument
+        # list
+        self = args[0]
+        args = args[1:]
+        for name, getter, default_ in getters:
+            if getter(self) is not default_:
+                msg = msgs.get(
+                    name,
+                    "Method %s() has already been invoked on this %s construct"
+                    % (fn.__name__, self.__class__),
+                )
+                raise exc.InvalidRequestError(msg)
+        return fn(self, *args, **kw)
+
+    return check
+
+
+def _clone(element, **kw):
+    return element._clone(**kw)
+
+
+def _expand_cloned(
+    elements: Iterable[_CLE],
+) -> Iterable[_CLE]:
+    """expand the given set of ClauseElements to be the set of all 'cloned'
+    predecessors.
+
+    """
+    # TODO: cython candidate
+    return itertools.chain(*[x._cloned_set for x in elements])
+
+
+def _de_clone(
+    elements: Iterable[_CLE],
+) -> Iterable[_CLE]:
+    for x in elements:
+        while x._is_clone_of is not None:
+            x = x._is_clone_of
+        yield x
+
+
+def _cloned_intersection(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
+    """return the intersection of sets a and b, counting
+    any overlap between 'cloned' predecessors.
+
+    The returned set is in terms of the entities present within 'a'.
+
+    """
+    all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
+    return {elem for elem in a if all_overlap.intersection(elem._cloned_set)}
+
+
+def _cloned_difference(a: Iterable[_CLE], b: Iterable[_CLE]) -> Set[_CLE]:
+    all_overlap = set(_expand_cloned(a)).intersection(_expand_cloned(b))
+    return {
+        elem for elem in a if not all_overlap.intersection(elem._cloned_set)
+    }
+
+
+class _DialectArgView(MutableMapping[str, Any]):
+    """A dictionary view of dialect-level arguments in the form
+    <dialectname>_<argument_name>.
+
+    """
+
+    def __init__(self, obj):
+        self.obj = obj
+
+    def _key(self, key):
+        try:
+            dialect, value_key = key.split("_", 1)
+        except ValueError as err:
+            raise KeyError(key) from err
+        else:
+            return dialect, value_key
+
+    def __getitem__(self, key):
+        dialect, value_key = self._key(key)
+
+        try:
+            opt = self.obj.dialect_options[dialect]
+        except exc.NoSuchModuleError as err:
+            raise KeyError(key) from err
+        else:
+            return opt[value_key]
+
+    def __setitem__(self, key, value):
+        try:
+            dialect, value_key = self._key(key)
+        except KeyError as err:
+            raise exc.ArgumentError(
+                "Keys must be of the form <dialectname>_<argname>"
+            ) from err
+        else:
+            self.obj.dialect_options[dialect][value_key] = value
+
+    def __delitem__(self, key):
+        dialect, value_key = self._key(key)
+        del self.obj.dialect_options[dialect][value_key]
+
+    def __len__(self):
+        return sum(
+            len(args._non_defaults)
+            for args in self.obj.dialect_options.values()
+        )
+
+    def __iter__(self):
+        return (
+            "%s_%s" % (dialect_name, value_name)
+            for dialect_name in self.obj.dialect_options
+            for value_name in self.obj.dialect_options[
+                dialect_name
+            ]._non_defaults
+        )
+
+
+class _DialectArgDict(MutableMapping[str, Any]):
+    """A dictionary view of dialect-level arguments for a specific
+    dialect.
+
+    Maintains a separate collection of user-specified arguments
+    and dialect-specified default arguments.
+
+    """
+
+    def __init__(self):
+        self._non_defaults = {}
+        self._defaults = {}
+
+    def __len__(self):
+        return len(set(self._non_defaults).union(self._defaults))
+
+    def __iter__(self):
+        return iter(set(self._non_defaults).union(self._defaults))
+
+    def __getitem__(self, key):
+        if key in self._non_defaults:
+            return self._non_defaults[key]
+        else:
+            return self._defaults[key]
+
+    def __setitem__(self, key, value):
+        self._non_defaults[key] = value
+
+    def __delitem__(self, key):
+        del self._non_defaults[key]
+
+
+@util.preload_module("sqlalchemy.dialects")
+def _kw_reg_for_dialect(dialect_name):
+    dialect_cls = util.preloaded.dialects.registry.load(dialect_name)
+    if dialect_cls.construct_arguments is None:
+        return None
+    return dict(dialect_cls.construct_arguments)
+
+
+class DialectKWArgs:
+    """Establish the ability for a class to have dialect-specific arguments
+    with defaults and constructor validation.
+
+    The :class:`.DialectKWArgs` interacts with the
+    :attr:`.DefaultDialect.construct_arguments` present on a dialect.
+
+    .. seealso::
+
+        :attr:`.DefaultDialect.construct_arguments`
+
+    """
+
+    __slots__ = ()
+
+    _dialect_kwargs_traverse_internals = [
+        ("dialect_options", InternalTraversal.dp_dialect_options)
+    ]
+
+    @classmethod
+    def argument_for(cls, dialect_name, argument_name, default):
+        """Add a new kind of dialect-specific keyword argument for this class.
+
+        E.g.::
+
+            Index.argument_for("mydialect", "length", None)
+
+            some_index = Index("a", "b", mydialect_length=5)
+
+        The :meth:`.DialectKWArgs.argument_for` method is a per-argument
+        way adding extra arguments to the
+        :attr:`.DefaultDialect.construct_arguments` dictionary. This
+        dictionary provides a list of argument names accepted by various
+        schema-level constructs on behalf of a dialect.
+
+        New dialects should typically specify this dictionary all at once as a
+        data member of the dialect class.  The use case for ad-hoc addition of
+        argument names is typically for end-user code that is also using
+        a custom compilation scheme which consumes the additional arguments.
+
+        :param dialect_name: name of a dialect.  The dialect must be
+         locatable, else a :class:`.NoSuchModuleError` is raised.   The
+         dialect must also include an existing
+         :attr:`.DefaultDialect.construct_arguments` collection, indicating
+         that it participates in the keyword-argument validation and default
+         system, else :class:`.ArgumentError` is raised.  If the dialect does
+         not include this collection, then any keyword argument can be
+         specified on behalf of this dialect already.  All dialects packaged
+         within SQLAlchemy include this collection, however for third party
+         dialects, support may vary.
+
+        :param argument_name: name of the parameter.
+
+        :param default: default value of the parameter.
+
+        """
+
+        construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
+        if construct_arg_dictionary is None:
+            raise exc.ArgumentError(
+                "Dialect '%s' does have keyword-argument "
+                "validation and defaults enabled configured" % dialect_name
+            )
+        if cls not in construct_arg_dictionary:
+            construct_arg_dictionary[cls] = {}
+        construct_arg_dictionary[cls][argument_name] = default
+
+    @util.memoized_property
+    def dialect_kwargs(self):
+        """A collection of keyword arguments specified as dialect-specific
+        options to this construct.
+
+        The arguments are present here in their original ``<dialect>_<kwarg>``
+        format.  Only arguments that were actually passed are included;
+        unlike the :attr:`.DialectKWArgs.dialect_options` collection, which
+        contains all options known by this dialect including defaults.
+
+        The collection is also writable; keys are accepted of the
+        form ``<dialect>_<kwarg>`` where the value will be assembled
+        into the list of options.
+
+        .. seealso::
+
+            :attr:`.DialectKWArgs.dialect_options` - nested dictionary form
+
+        """
+        return _DialectArgView(self)
+
+    @property
+    def kwargs(self):
+        """A synonym for :attr:`.DialectKWArgs.dialect_kwargs`."""
+        return self.dialect_kwargs
+
+    _kw_registry = util.PopulateDict(_kw_reg_for_dialect)
+
+    def _kw_reg_for_dialect_cls(self, dialect_name):
+        construct_arg_dictionary = DialectKWArgs._kw_registry[dialect_name]
+        d = _DialectArgDict()
+
+        if construct_arg_dictionary is None:
+            d._defaults.update({"*": None})
+        else:
+            for cls in reversed(self.__class__.__mro__):
+                if cls in construct_arg_dictionary:
+                    d._defaults.update(construct_arg_dictionary[cls])
+        return d
+
+    @util.memoized_property
+    def dialect_options(self):
+        """A collection of keyword arguments specified as dialect-specific
+        options to this construct.
+
+        This is a two-level nested registry, keyed to ``<dialect_name>``
+        and ``<argument_name>``.  For example, the ``postgresql_where``
+        argument would be locatable as::
+
+            arg = my_object.dialect_options["postgresql"]["where"]
+
+        .. versionadded:: 0.9.2
+
+        .. seealso::
+
+            :attr:`.DialectKWArgs.dialect_kwargs` - flat dictionary form
+
+        """
+
+        return util.PopulateDict(
+            util.portable_instancemethod(self._kw_reg_for_dialect_cls)
+        )
+
+    def _validate_dialect_kwargs(self, kwargs: Dict[str, Any]) -> None:
+        # validate remaining kwargs that they all specify DB prefixes
+
+        if not kwargs:
+            return
+
+        for k in kwargs:
+            m = re.match("^(.+?)_(.+)$", k)
+            if not m:
+                raise TypeError(
+                    "Additional arguments should be "
+                    "named <dialectname>_<argument>, got '%s'" % k
+                )
+            dialect_name, arg_name = m.group(1, 2)
+
+            try:
+                construct_arg_dictionary = self.dialect_options[dialect_name]
+            except exc.NoSuchModuleError:
+                util.warn(
+                    "Can't validate argument %r; can't "
+                    "locate any SQLAlchemy dialect named %r"
+                    % (k, dialect_name)
+                )
+                self.dialect_options[dialect_name] = d = _DialectArgDict()
+                d._defaults.update({"*": None})
+                d._non_defaults[arg_name] = kwargs[k]
+            else:
+                if (
+                    "*" not in construct_arg_dictionary
+                    and arg_name not in construct_arg_dictionary
+                ):
+                    raise exc.ArgumentError(
+                        "Argument %r is not accepted by "
+                        "dialect %r on behalf of %r"
+                        % (k, dialect_name, self.__class__)
+                    )
+                else:
+                    construct_arg_dictionary[arg_name] = kwargs[k]
+
+
+class CompileState:
+    """Produces additional object state necessary for a statement to be
+    compiled.
+
+    the :class:`.CompileState` class is at the base of classes that assemble
+    state for a particular statement object that is then used by the
+    compiler.   This process is essentially an extension of the process that
+    the SQLCompiler.visit_XYZ() method takes, however there is an emphasis
+    on converting raw user intent into more organized structures rather than
+    producing string output.   The top-level :class:`.CompileState` for the
+    statement being executed is also accessible when the execution context
+    works with invoking the statement and collecting results.
+
+    The production of :class:`.CompileState` is specific to the compiler,  such
+    as within the :meth:`.SQLCompiler.visit_insert`,
+    :meth:`.SQLCompiler.visit_select` etc. methods.  These methods are also
+    responsible for associating the :class:`.CompileState` with the
+    :class:`.SQLCompiler` itself, if the statement is the "toplevel" statement,
+    i.e. the outermost SQL statement that's actually being executed.
+    There can be other :class:`.CompileState` objects that are not the
+    toplevel, such as when a SELECT subquery or CTE-nested
+    INSERT/UPDATE/DELETE is generated.
+
+    .. versionadded:: 1.4
+
+    """
+
+    __slots__ = ("statement", "_ambiguous_table_name_map")
+
+    plugins: Dict[Tuple[str, str], Type[CompileState]] = {}
+
+    _ambiguous_table_name_map: Optional[_AmbiguousTableNameMap]
+
+    @classmethod
+    def create_for_statement(
+        cls, statement: Executable, compiler: SQLCompiler, **kw: Any
+    ) -> CompileState:
+        # factory construction.
+
+        if statement._propagate_attrs:
+            plugin_name = statement._propagate_attrs.get(
+                "compile_state_plugin", "default"
+            )
+            klass = cls.plugins.get(
+                (plugin_name, statement._effective_plugin_target), None
+            )
+            if klass is None:
+                klass = cls.plugins[
+                    ("default", statement._effective_plugin_target)
+                ]
+
+        else:
+            klass = cls.plugins[
+                ("default", statement._effective_plugin_target)
+            ]
+
+        if klass is cls:
+            return cls(statement, compiler, **kw)
+        else:
+            return klass.create_for_statement(statement, compiler, **kw)
+
+    def __init__(self, statement, compiler, **kw):
+        self.statement = statement
+
+    @classmethod
+    def get_plugin_class(
+        cls, statement: Executable
+    ) -> Optional[Type[CompileState]]:
+        plugin_name = statement._propagate_attrs.get(
+            "compile_state_plugin", None
+        )
+
+        if plugin_name:
+            key = (plugin_name, statement._effective_plugin_target)
+            if key in cls.plugins:
+                return cls.plugins[key]
+
+        # there's no case where we call upon get_plugin_class() and want
+        # to get None back, there should always be a default.  return that
+        # if there was no plugin-specific class  (e.g. Insert with "orm"
+        # plugin)
+        try:
+            return cls.plugins[("default", statement._effective_plugin_target)]
+        except KeyError:
+            return None
+
+    @classmethod
+    def _get_plugin_class_for_plugin(
+        cls, statement: Executable, plugin_name: str
+    ) -> Optional[Type[CompileState]]:
+        try:
+            return cls.plugins[
+                (plugin_name, statement._effective_plugin_target)
+            ]
+        except KeyError:
+            return None
+
+    @classmethod
+    def plugin_for(
+        cls, plugin_name: str, visit_name: str
+    ) -> Callable[[_Fn], _Fn]:
+        def decorate(cls_to_decorate):
+            cls.plugins[(plugin_name, visit_name)] = cls_to_decorate
+            return cls_to_decorate
+
+        return decorate
+
+
+class Generative(HasMemoized):
+    """Provide a method-chaining pattern in conjunction with the
+    @_generative decorator."""
+
+    def _generate(self) -> Self:
+        skip = self._memoized_keys
+        cls = self.__class__
+        s = cls.__new__(cls)
+        if skip:
+            # ensure this iteration remains atomic
+            s.__dict__ = {
+                k: v for k, v in self.__dict__.copy().items() if k not in skip
+            }
+        else:
+            s.__dict__ = self.__dict__.copy()
+        return s
+
+
+class InPlaceGenerative(HasMemoized):
+    """Provide a method-chaining pattern in conjunction with the
+    @_generative decorator that mutates in place."""
+
+    __slots__ = ()
+
+    def _generate(self):
+        skip = self._memoized_keys
+        # note __dict__ needs to be in __slots__ if this is used
+        for k in skip:
+            self.__dict__.pop(k, None)
+        return self
+
+
+class HasCompileState(Generative):
+    """A class that has a :class:`.CompileState` associated with it."""
+
+    _compile_state_plugin: Optional[Type[CompileState]] = None
+
+    _attributes: util.immutabledict[str, Any] = util.EMPTY_DICT
+
+    _compile_state_factory = CompileState.create_for_statement
+
+
+class _MetaOptions(type):
+    """metaclass for the Options class.
+
+    This metaclass is actually necessary despite the availability of the
+    ``__init_subclass__()`` hook as this type also provides custom class-level
+    behavior for the ``__add__()`` method.
+
+    """
+
+    _cache_attrs: Tuple[str, ...]
+
+    def __add__(self, other):
+        o1 = self()
+
+        if set(other).difference(self._cache_attrs):
+            raise TypeError(
+                "dictionary contains attributes not covered by "
+                "Options class %s: %r"
+                % (self, set(other).difference(self._cache_attrs))
+            )
+
+        o1.__dict__.update(other)
+        return o1
+
+    if TYPE_CHECKING:
+
+        def __getattr__(self, key: str) -> Any: ...
+
+        def __setattr__(self, key: str, value: Any) -> None: ...
+
+        def __delattr__(self, key: str) -> None: ...
+
+
+class Options(metaclass=_MetaOptions):
+    """A cacheable option dictionary with defaults."""
+
+    __slots__ = ()
+
+    _cache_attrs: Tuple[str, ...]
+
+    def __init_subclass__(cls) -> None:
+        dict_ = cls.__dict__
+        cls._cache_attrs = tuple(
+            sorted(
+                d
+                for d in dict_
+                if not d.startswith("__")
+                and d not in ("_cache_key_traversal",)
+            )
+        )
+        super().__init_subclass__()
+
+    def __init__(self, **kw):
+        self.__dict__.update(kw)
+
+    def __add__(self, other):
+        o1 = self.__class__.__new__(self.__class__)
+        o1.__dict__.update(self.__dict__)
+
+        if set(other).difference(self._cache_attrs):
+            raise TypeError(
+                "dictionary contains attributes not covered by "
+                "Options class %s: %r"
+                % (self, set(other).difference(self._cache_attrs))
+            )
+
+        o1.__dict__.update(other)
+        return o1
+
+    def __eq__(self, other):
+        # TODO: very inefficient.  This is used only in test suites
+        # right now.
+        for a, b in zip_longest(self._cache_attrs, other._cache_attrs):
+            if getattr(self, a) != getattr(other, b):
+                return False
+        return True
+
+    def __repr__(self):
+        # TODO: fairly inefficient, used only in debugging right now.
+
+        return "%s(%s)" % (
+            self.__class__.__name__,
+            ", ".join(
+                "%s=%r" % (k, self.__dict__[k])
+                for k in self._cache_attrs
+                if k in self.__dict__
+            ),
+        )
+
+    @classmethod
+    def isinstance(cls, klass: Type[Any]) -> bool:
+        return issubclass(cls, klass)
+
+    @hybridmethod
+    def add_to_element(self, name, value):
+        return self + {name: getattr(self, name) + value}
+
+    @hybridmethod
+    def _state_dict_inst(self) -> Mapping[str, Any]:
+        return self.__dict__
+
+    _state_dict_const: util.immutabledict[str, Any] = util.EMPTY_DICT
+
+    @_state_dict_inst.classlevel
+    def _state_dict(cls) -> Mapping[str, Any]:
+        return cls._state_dict_const
+
+    @classmethod
+    def safe_merge(cls, other):
+        d = other._state_dict()
+
+        # only support a merge with another object of our class
+        # and which does not have attrs that we don't.   otherwise
+        # we risk having state that might not be part of our cache
+        # key strategy
+
+        if (
+            cls is not other.__class__
+            and other._cache_attrs
+            and set(other._cache_attrs).difference(cls._cache_attrs)
+        ):
+            raise TypeError(
+                "other element %r is not empty, is not of type %s, "
+                "and contains attributes not covered here %r"
+                % (
+                    other,
+                    cls,
+                    set(other._cache_attrs).difference(cls._cache_attrs),
+                )
+            )
+        return cls + d
+
+    @classmethod
+    def from_execution_options(
+        cls, key, attrs, exec_options, statement_exec_options
+    ):
+        """process Options argument in terms of execution options.
+
+
+        e.g.::
+
+            (
+                load_options,
+                execution_options,
+            ) = QueryContext.default_load_options.from_execution_options(
+                "_sa_orm_load_options",
+                {"populate_existing", "autoflush", "yield_per"},
+                execution_options,
+                statement._execution_options,
+            )
+
+        get back the Options and refresh "_sa_orm_load_options" in the
+        exec options dict w/ the Options as well
+
+        """
+
+        # common case is that no options we are looking for are
+        # in either dictionary, so cancel for that first
+        check_argnames = attrs.intersection(
+            set(exec_options).union(statement_exec_options)
+        )
+
+        existing_options = exec_options.get(key, cls)
+
+        if check_argnames:
+            result = {}
+            for argname in check_argnames:
+                local = "_" + argname
+                if argname in exec_options:
+                    result[local] = exec_options[argname]
+                elif argname in statement_exec_options:
+                    result[local] = statement_exec_options[argname]
+
+            new_options = existing_options + result
+            exec_options = util.immutabledict().merge_with(
+                exec_options, {key: new_options}
+            )
+            return new_options, exec_options
+
+        else:
+            return existing_options, exec_options
+
+    if TYPE_CHECKING:
+
+        def __getattr__(self, key: str) -> Any: ...
+
+        def __setattr__(self, key: str, value: Any) -> None: ...
+
+        def __delattr__(self, key: str) -> None: ...
+
+
+class CacheableOptions(Options, HasCacheKey):
+    __slots__ = ()
+
+    @hybridmethod
+    def _gen_cache_key_inst(self, anon_map, bindparams):
+        return HasCacheKey._gen_cache_key(self, anon_map, bindparams)
+
+    @_gen_cache_key_inst.classlevel
+    def _gen_cache_key(cls, anon_map, bindparams):
+        return (cls, ())
+
+    @hybridmethod
+    def _generate_cache_key(self):
+        return HasCacheKey._generate_cache_key_for_object(self)
+
+
+class ExecutableOption(HasCopyInternals):
+    __slots__ = ()
+
+    _annotations = util.EMPTY_DICT
+
+    __visit_name__ = "executable_option"
+
+    _is_has_cache_key = False
+
+    _is_core = True
+
+    def _clone(self, **kw):
+        """Create a shallow copy of this ExecutableOption."""
+        c = self.__class__.__new__(self.__class__)
+        c.__dict__ = dict(self.__dict__)  # type: ignore
+        return c
+
+
+class Executable(roles.StatementRole):
+    """Mark a :class:`_expression.ClauseElement` as supporting execution.
+
+    :class:`.Executable` is a superclass for all "statement" types
+    of objects, including :func:`select`, :func:`delete`, :func:`update`,
+    :func:`insert`, :func:`text`.
+
+    """
+
+    supports_execution: bool = True
+    _execution_options: _ImmutableExecuteOptions = util.EMPTY_DICT
+    _is_default_generator = False
+    _with_options: Tuple[ExecutableOption, ...] = ()
+    _with_context_options: Tuple[
+        Tuple[Callable[[CompileState], None], Any], ...
+    ] = ()
+    _compile_options: Optional[Union[Type[CacheableOptions], CacheableOptions]]
+
+    _executable_traverse_internals = [
+        ("_with_options", InternalTraversal.dp_executable_options),
+        (
+            "_with_context_options",
+            ExtendedInternalTraversal.dp_with_context_options,
+        ),
+        ("_propagate_attrs", ExtendedInternalTraversal.dp_propagate_attrs),
+    ]
+
+    is_select = False
+    is_from_statement = False
+    is_update = False
+    is_insert = False
+    is_text = False
+    is_delete = False
+    is_dml = False
+
+    if TYPE_CHECKING:
+        __visit_name__: str
+
+        def _compile_w_cache(
+            self,
+            dialect: Dialect,
+            *,
+            compiled_cache: Optional[CompiledCacheType],
+            column_keys: List[str],
+            for_executemany: bool = False,
+            schema_translate_map: Optional[SchemaTranslateMapType] = None,
+            **kw: Any,
+        ) -> Tuple[
+            Compiled, Optional[Sequence[BindParameter[Any]]], CacheStats
+        ]: ...
+
+        def _execute_on_connection(
+            self,
+            connection: Connection,
+            distilled_params: _CoreMultiExecuteParams,
+            execution_options: CoreExecuteOptionsParameter,
+        ) -> CursorResult[Any]: ...
+
+        def _execute_on_scalar(
+            self,
+            connection: Connection,
+            distilled_params: _CoreMultiExecuteParams,
+            execution_options: CoreExecuteOptionsParameter,
+        ) -> Any: ...
+
+    @util.ro_non_memoized_property
+    def _all_selected_columns(self):
+        raise NotImplementedError()
+
+    @property
+    def _effective_plugin_target(self) -> str:
+        return self.__visit_name__
+
+    @_generative
+    def options(self, *options: ExecutableOption) -> Self:
+        """Apply options to this statement.
+
+        In the general sense, options are any kind of Python object
+        that can be interpreted by the SQL compiler for the statement.
+        These options can be consumed by specific dialects or specific kinds
+        of compilers.
+
+        The most commonly known kind of option are the ORM level options
+        that apply "eager load" and other loading behaviors to an ORM
+        query.   However, options can theoretically be used for many other
+        purposes.
+
+        For background on specific kinds of options for specific kinds of
+        statements, refer to the documentation for those option objects.
+
+        .. versionchanged:: 1.4 - added :meth:`.Executable.options` to
+           Core statement objects towards the goal of allowing unified
+           Core / ORM querying capabilities.
+
+        .. seealso::
+
+            :ref:`loading_columns` - refers to options specific to the usage
+            of ORM queries
+
+            :ref:`relationship_loader_options` - refers to options specific
+            to the usage of ORM queries
+
+        """
+        self._with_options += tuple(
+            coercions.expect(roles.ExecutableOptionRole, opt)
+            for opt in options
+        )
+        return self
+
+    @_generative
+    def _set_compile_options(self, compile_options: CacheableOptions) -> Self:
+        """Assign the compile options to a new value.
+
+        :param compile_options: appropriate CacheableOptions structure
+
+        """
+
+        self._compile_options = compile_options
+        return self
+
+    @_generative
+    def _update_compile_options(self, options: CacheableOptions) -> Self:
+        """update the _compile_options with new keys."""
+
+        assert self._compile_options is not None
+        self._compile_options += options
+        return self
+
+    @_generative
+    def _add_context_option(
+        self,
+        callable_: Callable[[CompileState], None],
+        cache_args: Any,
+    ) -> Self:
+        """Add a context option to this statement.
+
+        These are callable functions that will
+        be given the CompileState object upon compilation.
+
+        A second argument cache_args is required, which will be combined with
+        the ``__code__`` identity of the function itself in order to produce a
+        cache key.
+
+        """
+        self._with_context_options += ((callable_, cache_args),)
+        return self
+
+    @overload
+    def execution_options(
+        self,
+        *,
+        compiled_cache: Optional[CompiledCacheType] = ...,
+        logging_token: str = ...,
+        isolation_level: IsolationLevel = ...,
+        no_parameters: bool = False,
+        stream_results: bool = False,
+        max_row_buffer: int = ...,
+        yield_per: int = ...,
+        insertmanyvalues_page_size: int = ...,
+        schema_translate_map: Optional[SchemaTranslateMapType] = ...,
+        populate_existing: bool = False,
+        autoflush: bool = False,
+        synchronize_session: SynchronizeSessionArgument = ...,
+        dml_strategy: DMLStrategyArgument = ...,
+        render_nulls: bool = ...,
+        is_delete_using: bool = ...,
+        is_update_from: bool = ...,
+        preserve_rowcount: bool = False,
+        **opt: Any,
+    ) -> Self: ...
+
+    @overload
+    def execution_options(self, **opt: Any) -> Self: ...
+
+    @_generative
+    def execution_options(self, **kw: Any) -> Self:
+        """Set non-SQL options for the statement which take effect during
+        execution.
+
+        Execution options can be set at many scopes, including per-statement,
+        per-connection, or per execution, using methods such as
+        :meth:`_engine.Connection.execution_options` and parameters which
+        accept a dictionary of options such as
+        :paramref:`_engine.Connection.execute.execution_options` and
+        :paramref:`_orm.Session.execute.execution_options`.
+
+        The primary characteristic of an execution option, as opposed to
+        other kinds of options such as ORM loader options, is that
+        **execution options never affect the compiled SQL of a query, only
+        things that affect how the SQL statement itself is invoked or how
+        results are fetched**.  That is, execution options are not part of
+        what's accommodated by SQL compilation nor are they considered part of
+        the cached state of a statement.
+
+        The :meth:`_sql.Executable.execution_options` method is
+        :term:`generative`, as
+        is the case for the method as applied to the :class:`_engine.Engine`
+        and :class:`_orm.Query` objects, which means when the method is called,
+        a copy of the object is returned, which applies the given parameters to
+        that new copy, but leaves the original unchanged::
+
+            statement = select(table.c.x, table.c.y)
+            new_statement = statement.execution_options(my_option=True)
+
+        An exception to this behavior is the :class:`_engine.Connection`
+        object, where the :meth:`_engine.Connection.execution_options` method
+        is explicitly **not** generative.
+
+        The kinds of options that may be passed to
+        :meth:`_sql.Executable.execution_options` and other related methods and
+        parameter dictionaries include parameters that are explicitly consumed
+        by SQLAlchemy Core or ORM, as well as arbitrary keyword arguments not
+        defined by SQLAlchemy, which means the methods and/or parameter
+        dictionaries may be used for user-defined parameters that interact with
+        custom code, which may access the parameters using methods such as
+        :meth:`_sql.Executable.get_execution_options` and
+        :meth:`_engine.Connection.get_execution_options`, or within selected
+        event hooks using a dedicated ``execution_options`` event parameter
+        such as
+        :paramref:`_events.ConnectionEvents.before_execute.execution_options`
+        or :attr:`_orm.ORMExecuteState.execution_options`, e.g.::
+
+             from sqlalchemy import event
+
+
+             @event.listens_for(some_engine, "before_execute")
+             def _process_opt(conn, statement, multiparams, params, execution_options):
+                 "run a SQL function before invoking a statement"
+
+                 if execution_options.get("do_special_thing", False):
+                     conn.exec_driver_sql("run_special_function()")
+
+        Within the scope of options that are explicitly recognized by
+        SQLAlchemy, most apply to specific classes of objects and not others.
+        The most common execution options include:
+
+        * :paramref:`_engine.Connection.execution_options.isolation_level` -
+          sets the isolation level for a connection or a class of connections
+          via an :class:`_engine.Engine`.  This option is accepted only
+          by :class:`_engine.Connection` or :class:`_engine.Engine`.
+
+        * :paramref:`_engine.Connection.execution_options.stream_results` -
+          indicates results should be fetched using a server side cursor;
+          this option is accepted by :class:`_engine.Connection`, by the
+          :paramref:`_engine.Connection.execute.execution_options` parameter
+          on :meth:`_engine.Connection.execute`, and additionally by
+          :meth:`_sql.Executable.execution_options` on a SQL statement object,
+          as well as by ORM constructs like :meth:`_orm.Session.execute`.
+
+        * :paramref:`_engine.Connection.execution_options.compiled_cache` -
+          indicates a dictionary that will serve as the
+          :ref:`SQL compilation cache <sql_caching>`
+          for a :class:`_engine.Connection` or :class:`_engine.Engine`, as
+          well as for ORM methods like :meth:`_orm.Session.execute`.
+          Can be passed as ``None`` to disable caching for statements.
+          This option is not accepted by
+          :meth:`_sql.Executable.execution_options` as it is inadvisable to
+          carry along a compilation cache within a statement object.
+
+        * :paramref:`_engine.Connection.execution_options.schema_translate_map`
+          - a mapping of schema names used by the
+          :ref:`Schema Translate Map <schema_translating>` feature, accepted
+          by :class:`_engine.Connection`, :class:`_engine.Engine`,
+          :class:`_sql.Executable`, as well as by ORM constructs
+          like :meth:`_orm.Session.execute`.
+
+        .. seealso::
+
+            :meth:`_engine.Connection.execution_options`
+
+            :paramref:`_engine.Connection.execute.execution_options`
+
+            :paramref:`_orm.Session.execute.execution_options`
+
+            :ref:`orm_queryguide_execution_options` - documentation on all
+            ORM-specific execution options
+
+        """  # noqa: E501
+        if "isolation_level" in kw:
+            raise exc.ArgumentError(
+                "'isolation_level' execution option may only be specified "
+                "on Connection.execution_options(), or "
+                "per-engine using the isolation_level "
+                "argument to create_engine()."
+            )
+        if "compiled_cache" in kw:
+            raise exc.ArgumentError(
+                "'compiled_cache' execution option may only be specified "
+                "on Connection.execution_options(), not per statement."
+            )
+        self._execution_options = self._execution_options.union(kw)
+        return self
+
+    def get_execution_options(self) -> _ExecuteOptions:
+        """Get the non-SQL options which will take effect during execution.
+
+        .. versionadded:: 1.3
+
+        .. seealso::
+
+            :meth:`.Executable.execution_options`
+        """
+        return self._execution_options
+
+
+class SchemaEventTarget(event.EventTarget):
+    """Base class for elements that are the targets of :class:`.DDLEvents`
+    events.
+
+    This includes :class:`.SchemaItem` as well as :class:`.SchemaType`.
+
+    """
+
+    dispatch: dispatcher[SchemaEventTarget]
+
+    def _set_parent(self, parent: SchemaEventTarget, **kw: Any) -> None:
+        """Associate with this SchemaEvent's parent object."""
+
+    def _set_parent_with_dispatch(
+        self, parent: SchemaEventTarget, **kw: Any
+    ) -> None:
+        self.dispatch.before_parent_attach(self, parent)
+        self._set_parent(parent, **kw)
+        self.dispatch.after_parent_attach(self, parent)
+
+
+class SchemaVisitor(ClauseVisitor):
+    """Define the visiting for ``SchemaItem`` objects."""
+
+    __traverse_options__ = {"schema_visitor": True}
+
+
+class _SentinelDefaultCharacterization(Enum):
+    NONE = "none"
+    UNKNOWN = "unknown"
+    CLIENTSIDE = "clientside"
+    SENTINEL_DEFAULT = "sentinel_default"
+    SERVERSIDE = "serverside"
+    IDENTITY = "identity"
+    SEQUENCE = "sequence"
+
+
+class _SentinelColumnCharacterization(NamedTuple):
+    columns: Optional[Sequence[Column[Any]]] = None
+    is_explicit: bool = False
+    is_autoinc: bool = False
+    default_characterization: _SentinelDefaultCharacterization = (
+        _SentinelDefaultCharacterization.NONE
+    )
+
+
+_COLKEY = TypeVar("_COLKEY", Union[None, str], str)
+
+_COL_co = TypeVar("_COL_co", bound="ColumnElement[Any]", covariant=True)
+_COL = TypeVar("_COL", bound="ColumnElement[Any]")
+
+
+class _ColumnMetrics(Generic[_COL_co]):
+    __slots__ = ("column",)
+
+    column: _COL_co
+
+    def __init__(
+        self, collection: ColumnCollection[Any, _COL_co], col: _COL_co
+    ):
+        self.column = col
+
+        # proxy_index being non-empty means it was initialized.
+        # so we need to update it
+        pi = collection._proxy_index
+        if pi:
+            for eps_col in col._expanded_proxy_set:
+                pi[eps_col].add(self)
+
+    def get_expanded_proxy_set(self):
+        return self.column._expanded_proxy_set
+
+    def dispose(self, collection):
+        pi = collection._proxy_index
+        if not pi:
+            return
+        for col in self.column._expanded_proxy_set:
+            colset = pi.get(col, None)
+            if colset:
+                colset.discard(self)
+            if colset is not None and not colset:
+                del pi[col]
+
+    def embedded(
+        self,
+        target_set: Union[
+            Set[ColumnElement[Any]], FrozenSet[ColumnElement[Any]]
+        ],
+    ) -> bool:
+        expanded_proxy_set = self.column._expanded_proxy_set
+        for t in target_set.difference(expanded_proxy_set):
+            if not expanded_proxy_set.intersection(_expand_cloned([t])):
+                return False
+        return True
+
+
+class ColumnCollection(Generic[_COLKEY, _COL_co]):
+    """Collection of :class:`_expression.ColumnElement` instances,
+    typically for
+    :class:`_sql.FromClause` objects.
+
+    The :class:`_sql.ColumnCollection` object is most commonly available
+    as the :attr:`_schema.Table.c` or :attr:`_schema.Table.columns` collection
+    on the :class:`_schema.Table` object, introduced at
+    :ref:`metadata_tables_and_columns`.
+
+    The :class:`_expression.ColumnCollection` has both mapping- and sequence-
+    like behaviors. A :class:`_expression.ColumnCollection` usually stores
+    :class:`_schema.Column` objects, which are then accessible both via mapping
+    style access as well as attribute access style.
+
+    To access :class:`_schema.Column` objects using ordinary attribute-style
+    access, specify the name like any other object attribute, such as below
+    a column named ``employee_name`` is accessed::
+
+        >>> employee_table.c.employee_name
+
+    To access columns that have names with special characters or spaces,
+    index-style access is used, such as below which illustrates a column named
+    ``employee ' payment`` is accessed::
+
+        >>> employee_table.c["employee ' payment"]
+
+    As the :class:`_sql.ColumnCollection` object provides a Python dictionary
+    interface, common dictionary method names like
+    :meth:`_sql.ColumnCollection.keys`, :meth:`_sql.ColumnCollection.values`,
+    and :meth:`_sql.ColumnCollection.items` are available, which means that
+    database columns that are keyed under these names also need to use indexed
+    access::
+
+        >>> employee_table.c["values"]
+
+
+    The name for which a :class:`_schema.Column` would be present is normally
+    that of the :paramref:`_schema.Column.key` parameter.  In some contexts,
+    such as a :class:`_sql.Select` object that uses a label style set
+    using the :meth:`_sql.Select.set_label_style` method, a column of a certain
+    key may instead be represented under a particular label name such
+    as ``tablename_columnname``::
+
+        >>> from sqlalchemy import select, column, table
+        >>> from sqlalchemy import LABEL_STYLE_TABLENAME_PLUS_COL
+        >>> t = table("t", column("c"))
+        >>> stmt = select(t).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL)
+        >>> subq = stmt.subquery()
+        >>> subq.c.t_c
+        <sqlalchemy.sql.elements.ColumnClause at 0x7f59dcf04fa0; t_c>
+
+    :class:`.ColumnCollection` also indexes the columns in order and allows
+    them to be accessible by their integer position::
+
+        >>> cc[0]
+        Column('x', Integer(), table=None)
+        >>> cc[1]
+        Column('y', Integer(), table=None)
+
+    .. versionadded:: 1.4 :class:`_expression.ColumnCollection`
+       allows integer-based
+       index access to the collection.
+
+    Iterating the collection yields the column expressions in order::
+
+        >>> list(cc)
+        [Column('x', Integer(), table=None),
+         Column('y', Integer(), table=None)]
+
+    The base :class:`_expression.ColumnCollection` object can store
+    duplicates, which can
+    mean either two columns with the same key, in which case the column
+    returned by key  access is **arbitrary**::
+
+        >>> x1, x2 = Column("x", Integer), Column("x", Integer)
+        >>> cc = ColumnCollection(columns=[(x1.name, x1), (x2.name, x2)])
+        >>> list(cc)
+        [Column('x', Integer(), table=None),
+         Column('x', Integer(), table=None)]
+        >>> cc["x"] is x1
+        False
+        >>> cc["x"] is x2
+        True
+
+    Or it can also mean the same column multiple times.   These cases are
+    supported as :class:`_expression.ColumnCollection`
+    is used to represent the columns in
+    a SELECT statement which may include duplicates.
+
+    A special subclass :class:`.DedupeColumnCollection` exists which instead
+    maintains SQLAlchemy's older behavior of not allowing duplicates; this
+    collection is used for schema level objects like :class:`_schema.Table`
+    and
+    :class:`.PrimaryKeyConstraint` where this deduping is helpful.  The
+    :class:`.DedupeColumnCollection` class also has additional mutation methods
+    as the schema constructs have more use cases that require removal and
+    replacement of columns.
+
+    .. versionchanged:: 1.4 :class:`_expression.ColumnCollection`
+       now stores duplicate
+       column keys as well as the same column in multiple positions.  The
+       :class:`.DedupeColumnCollection` class is added to maintain the
+       former behavior in those cases where deduplication as well as
+       additional replace/remove operations are needed.
+
+
+    """
+
+    __slots__ = "_collection", "_index", "_colset", "_proxy_index"
+
+    _collection: List[Tuple[_COLKEY, _COL_co, _ColumnMetrics[_COL_co]]]
+    _index: Dict[Union[None, str, int], Tuple[_COLKEY, _COL_co]]
+    _proxy_index: Dict[ColumnElement[Any], Set[_ColumnMetrics[_COL_co]]]
+    _colset: Set[_COL_co]
+
+    def __init__(
+        self, columns: Optional[Iterable[Tuple[_COLKEY, _COL_co]]] = None
+    ):
+        object.__setattr__(self, "_colset", set())
+        object.__setattr__(self, "_index", {})
+        object.__setattr__(
+            self, "_proxy_index", collections.defaultdict(util.OrderedSet)
+        )
+        object.__setattr__(self, "_collection", [])
+        if columns:
+            self._initial_populate(columns)
+
+    @util.preload_module("sqlalchemy.sql.elements")
+    def __clause_element__(self) -> ClauseList:
+        elements = util.preloaded.sql_elements
+
+        return elements.ClauseList(
+            _literal_as_text_role=roles.ColumnsClauseRole,
+            group=False,
+            *self._all_columns,
+        )
+
+    def _initial_populate(
+        self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
+    ) -> None:
+        self._populate_separate_keys(iter_)
+
+    @property
+    def _all_columns(self) -> List[_COL_co]:
+        return [col for (_, col, _) in self._collection]
+
+    def keys(self) -> List[_COLKEY]:
+        """Return a sequence of string key names for all columns in this
+        collection."""
+        return [k for (k, _, _) in self._collection]
+
+    def values(self) -> List[_COL_co]:
+        """Return a sequence of :class:`_sql.ColumnClause` or
+        :class:`_schema.Column` objects for all columns in this
+        collection."""
+        return [col for (_, col, _) in self._collection]
+
+    def items(self) -> List[Tuple[_COLKEY, _COL_co]]:
+        """Return a sequence of (key, column) tuples for all columns in this
+        collection each consisting of a string key name and a
+        :class:`_sql.ColumnClause` or
+        :class:`_schema.Column` object.
+        """
+
+        return [(k, col) for (k, col, _) in self._collection]
+
+    def __bool__(self) -> bool:
+        return bool(self._collection)
+
+    def __len__(self) -> int:
+        return len(self._collection)
+
+    def __iter__(self) -> Iterator[_COL_co]:
+        # turn to a list first to maintain over a course of changes
+        return iter([col for _, col, _ in self._collection])
+
+    @overload
+    def __getitem__(self, key: Union[str, int]) -> _COL_co: ...
+
+    @overload
+    def __getitem__(
+        self, key: Tuple[Union[str, int], ...]
+    ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
+
+    @overload
+    def __getitem__(
+        self, key: slice
+    ) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]: ...
+
+    def __getitem__(
+        self, key: Union[str, int, slice, Tuple[Union[str, int], ...]]
+    ) -> Union[ReadOnlyColumnCollection[_COLKEY, _COL_co], _COL_co]:
+        try:
+            if isinstance(key, (tuple, slice)):
+                if isinstance(key, slice):
+                    cols = (
+                        (sub_key, col)
+                        for (sub_key, col, _) in self._collection[key]
+                    )
+                else:
+                    cols = (self._index[sub_key] for sub_key in key)
+
+                return ColumnCollection(cols).as_readonly()
+            else:
+                return self._index[key][1]
+        except KeyError as err:
+            if isinstance(err.args[0], int):
+                raise IndexError(err.args[0]) from err
+            else:
+                raise
+
+    def __getattr__(self, key: str) -> _COL_co:
+        try:
+            return self._index[key][1]
+        except KeyError as err:
+            raise AttributeError(key) from err
+
+    def __contains__(self, key: str) -> bool:
+        if key not in self._index:
+            if not isinstance(key, str):
+                raise exc.ArgumentError(
+                    "__contains__ requires a string argument"
+                )
+            return False
+        else:
+            return True
+
+    def compare(self, other: ColumnCollection[Any, Any]) -> bool:
+        """Compare this :class:`_expression.ColumnCollection` to another
+        based on the names of the keys"""
+
+        for l, r in zip_longest(self, other):
+            if l is not r:
+                return False
+        else:
+            return True
+
+    def __eq__(self, other: Any) -> bool:
+        return self.compare(other)
+
+    @overload
+    def get(self, key: str, default: None = None) -> Optional[_COL_co]: ...
+
+    @overload
+    def get(self, key: str, default: _COL) -> Union[_COL_co, _COL]: ...
+
+    def get(
+        self, key: str, default: Optional[_COL] = None
+    ) -> Optional[Union[_COL_co, _COL]]:
+        """Get a :class:`_sql.ColumnClause` or :class:`_schema.Column` object
+        based on a string key name from this
+        :class:`_expression.ColumnCollection`."""
+
+        if key in self._index:
+            return self._index[key][1]
+        else:
+            return default
+
+    def __str__(self) -> str:
+        return "%s(%s)" % (
+            self.__class__.__name__,
+            ", ".join(str(c) for c in self),
+        )
+
+    def __setitem__(self, key: str, value: Any) -> NoReturn:
+        raise NotImplementedError()
+
+    def __delitem__(self, key: str) -> NoReturn:
+        raise NotImplementedError()
+
+    def __setattr__(self, key: str, obj: Any) -> NoReturn:
+        raise NotImplementedError()
+
+    def clear(self) -> NoReturn:
+        """Dictionary clear() is not implemented for
+        :class:`_sql.ColumnCollection`."""
+        raise NotImplementedError()
+
+    def remove(self, column: Any) -> None:
+        raise NotImplementedError()
+
+    def update(self, iter_: Any) -> NoReturn:
+        """Dictionary update() is not implemented for
+        :class:`_sql.ColumnCollection`."""
+        raise NotImplementedError()
+
+    # https://github.com/python/mypy/issues/4266
+    __hash__ = None  # type: ignore
+
+    def _populate_separate_keys(
+        self, iter_: Iterable[Tuple[_COLKEY, _COL_co]]
+    ) -> None:
+        """populate from an iterator of (key, column)"""
+
+        self._collection[:] = collection = [
+            (k, c, _ColumnMetrics(self, c)) for k, c in iter_
+        ]
+        self._colset.update(c._deannotate() for _, c, _ in collection)
+        self._index.update(
+            {idx: (k, c) for idx, (k, c, _) in enumerate(collection)}
+        )
+        self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
+
+    def add(
+        self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
+    ) -> None:
+        """Add a column to this :class:`_sql.ColumnCollection`.
+
+        .. note::
+
+            This method is **not normally used by user-facing code**, as the
+            :class:`_sql.ColumnCollection` is usually part of an existing
+            object such as a :class:`_schema.Table`. To add a
+            :class:`_schema.Column` to an existing :class:`_schema.Table`
+            object, use the :meth:`_schema.Table.append_column` method.
+
+        """
+        colkey: _COLKEY
+
+        if key is None:
+            colkey = column.key  # type: ignore
+        else:
+            colkey = key
+
+        l = len(self._collection)
+
+        # don't really know how this part is supposed to work w/ the
+        # covariant thing
+
+        _column = cast(_COL_co, column)
+
+        self._collection.append(
+            (colkey, _column, _ColumnMetrics(self, _column))
+        )
+        self._colset.add(_column._deannotate())
+        self._index[l] = (colkey, _column)
+        if colkey not in self._index:
+            self._index[colkey] = (colkey, _column)
+
+    def __getstate__(self) -> Dict[str, Any]:
+        return {
+            "_collection": [(k, c) for k, c, _ in self._collection],
+            "_index": self._index,
+        }
+
+    def __setstate__(self, state: Dict[str, Any]) -> None:
+        object.__setattr__(self, "_index", state["_index"])
+        object.__setattr__(
+            self, "_proxy_index", collections.defaultdict(util.OrderedSet)
+        )
+        object.__setattr__(
+            self,
+            "_collection",
+            [
+                (k, c, _ColumnMetrics(self, c))
+                for (k, c) in state["_collection"]
+            ],
+        )
+        object.__setattr__(
+            self, "_colset", {col for k, col, _ in self._collection}
+        )
+
+    def contains_column(self, col: ColumnElement[Any]) -> bool:
+        """Checks if a column object exists in this collection"""
+        if col not in self._colset:
+            if isinstance(col, str):
+                raise exc.ArgumentError(
+                    "contains_column cannot be used with string arguments. "
+                    "Use ``col_name in table.c`` instead."
+                )
+            return False
+        else:
+            return True
+
+    def as_readonly(self) -> ReadOnlyColumnCollection[_COLKEY, _COL_co]:
+        """Return a "read only" form of this
+        :class:`_sql.ColumnCollection`."""
+
+        return ReadOnlyColumnCollection(self)
+
+    def _init_proxy_index(self):
+        """populate the "proxy index", if empty.
+
+        proxy index is added in 2.0 to provide more efficient operation
+        for the corresponding_column() method.
+
+        For reasons of both time to construct new .c collections as well as
+        memory conservation for large numbers of large .c collections, the
+        proxy_index is only filled if corresponding_column() is called. once
+        filled it stays that way, and new _ColumnMetrics objects created after
+        that point will populate it with new data. Note this case would be
+        unusual, if not nonexistent, as it means a .c collection is being
+        mutated after corresponding_column() were used, however it is tested in
+        test/base/test_utils.py.
+
+        """
+        pi = self._proxy_index
+        if pi:
+            return
+
+        for _, _, metrics in self._collection:
+            eps = metrics.column._expanded_proxy_set
+
+            for eps_col in eps:
+                pi[eps_col].add(metrics)
+
+    def corresponding_column(
+        self, column: _COL, require_embedded: bool = False
+    ) -> Optional[Union[_COL, _COL_co]]:
+        """Given a :class:`_expression.ColumnElement`, return the exported
+        :class:`_expression.ColumnElement` object from this
+        :class:`_expression.ColumnCollection`
+        which corresponds to that original :class:`_expression.ColumnElement`
+        via a common
+        ancestor column.
+
+        :param column: the target :class:`_expression.ColumnElement`
+                      to be matched.
+
+        :param require_embedded: only return corresponding columns for
+         the given :class:`_expression.ColumnElement`, if the given
+         :class:`_expression.ColumnElement`
+         is actually present within a sub-element
+         of this :class:`_expression.Selectable`.
+         Normally the column will match if
+         it merely shares a common ancestor with one of the exported
+         columns of this :class:`_expression.Selectable`.
+
+        .. seealso::
+
+            :meth:`_expression.Selectable.corresponding_column`
+            - invokes this method
+            against the collection returned by
+            :attr:`_expression.Selectable.exported_columns`.
+
+        .. versionchanged:: 1.4 the implementation for ``corresponding_column``
+           was moved onto the :class:`_expression.ColumnCollection` itself.
+
+        """
+        # TODO: cython candidate
+
+        # don't dig around if the column is locally present
+        if column in self._colset:
+            return column
+
+        selected_intersection, selected_metrics = None, None
+        target_set = column.proxy_set
+
+        pi = self._proxy_index
+        if not pi:
+            self._init_proxy_index()
+
+        for current_metrics in (
+            mm for ts in target_set if ts in pi for mm in pi[ts]
+        ):
+            if not require_embedded or current_metrics.embedded(target_set):
+                if selected_metrics is None:
+                    # no corresponding column yet, pick this one.
+                    selected_metrics = current_metrics
+                    continue
+
+                current_intersection = target_set.intersection(
+                    current_metrics.column._expanded_proxy_set
+                )
+                if selected_intersection is None:
+                    selected_intersection = target_set.intersection(
+                        selected_metrics.column._expanded_proxy_set
+                    )
+
+                if len(current_intersection) > len(selected_intersection):
+                    # 'current' has a larger field of correspondence than
+                    # 'selected'. i.e. selectable.c.a1_x->a1.c.x->table.c.x
+                    # matches a1.c.x->table.c.x better than
+                    # selectable.c.x->table.c.x does.
+
+                    selected_metrics = current_metrics
+                    selected_intersection = current_intersection
+                elif current_intersection == selected_intersection:
+                    # they have the same field of correspondence. see
+                    # which proxy_set has fewer columns in it, which
+                    # indicates a closer relationship with the root
+                    # column. Also take into account the "weight"
+                    # attribute which CompoundSelect() uses to give
+                    # higher precedence to columns based on vertical
+                    # position in the compound statement, and discard
+                    # columns that have no reference to the target
+                    # column (also occurs with CompoundSelect)
+
+                    selected_col_distance = sum(
+                        [
+                            sc._annotations.get("weight", 1)
+                            for sc in (
+                                selected_metrics.column._uncached_proxy_list()
+                            )
+                            if sc.shares_lineage(column)
+                        ],
+                    )
+                    current_col_distance = sum(
+                        [
+                            sc._annotations.get("weight", 1)
+                            for sc in (
+                                current_metrics.column._uncached_proxy_list()
+                            )
+                            if sc.shares_lineage(column)
+                        ],
+                    )
+                    if current_col_distance < selected_col_distance:
+                        selected_metrics = current_metrics
+                        selected_intersection = current_intersection
+
+        return selected_metrics.column if selected_metrics else None
+
+
+_NAMEDCOL = TypeVar("_NAMEDCOL", bound="NamedColumn[Any]")
+
+
+class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
+    """A :class:`_expression.ColumnCollection`
+    that maintains deduplicating behavior.
+
+    This is useful by schema level objects such as :class:`_schema.Table` and
+    :class:`.PrimaryKeyConstraint`.    The collection includes more
+    sophisticated mutator methods as well to suit schema objects which
+    require mutable column collections.
+
+    .. versionadded:: 1.4
+
+    """
+
+    def add(  # type: ignore[override]
+        self, column: _NAMEDCOL, key: Optional[str] = None
+    ) -> None:
+        if key is not None and column.key != key:
+            raise exc.ArgumentError(
+                "DedupeColumnCollection requires columns be under "
+                "the same key as their .key"
+            )
+        key = column.key
+
+        if key is None:
+            raise exc.ArgumentError(
+                "Can't add unnamed column to column collection"
+            )
+
+        if key in self._index:
+            existing = self._index[key][1]
+
+            if existing is column:
+                return
+
+            self.replace(column)
+
+            # pop out memoized proxy_set as this
+            # operation may very well be occurring
+            # in a _make_proxy operation
+            util.memoized_property.reset(column, "proxy_set")
+        else:
+            self._append_new_column(key, column)
+
+    def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
+        l = len(self._collection)
+        self._collection.append(
+            (key, named_column, _ColumnMetrics(self, named_column))
+        )
+        self._colset.add(named_column._deannotate())
+        self._index[l] = (key, named_column)
+        self._index[key] = (key, named_column)
+
+    def _populate_separate_keys(
+        self, iter_: Iterable[Tuple[str, _NAMEDCOL]]
+    ) -> None:
+        """populate from an iterator of (key, column)"""
+        cols = list(iter_)
+
+        replace_col = []
+        for k, col in cols:
+            if col.key != k:
+                raise exc.ArgumentError(
+                    "DedupeColumnCollection requires columns be under "
+                    "the same key as their .key"
+                )
+            if col.name in self._index and col.key != col.name:
+                replace_col.append(col)
+            elif col.key in self._index:
+                replace_col.append(col)
+            else:
+                self._index[k] = (k, col)
+                self._collection.append((k, col, _ColumnMetrics(self, col)))
+        self._colset.update(c._deannotate() for (k, c, _) in self._collection)
+
+        self._index.update(
+            (idx, (k, c)) for idx, (k, c, _) in enumerate(self._collection)
+        )
+        for col in replace_col:
+            self.replace(col)
+
+    def extend(self, iter_: Iterable[_NAMEDCOL]) -> None:
+        self._populate_separate_keys((col.key, col) for col in iter_)
+
+    def remove(self, column: _NAMEDCOL) -> None:
+        if column not in self._colset:
+            raise ValueError(
+                "Can't remove column %r; column is not in this collection"
+                % column
+            )
+        del self._index[column.key]
+        self._colset.remove(column)
+        self._collection[:] = [
+            (k, c, metrics)
+            for (k, c, metrics) in self._collection
+            if c is not column
+        ]
+        for metrics in self._proxy_index.get(column, ()):
+            metrics.dispose(self)
+
+        self._index.update(
+            {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
+        )
+        # delete higher index
+        del self._index[len(self._collection)]
+
+    def replace(
+        self,
+        column: _NAMEDCOL,
+        extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
+    ) -> None:
+        """add the given column to this collection, removing unaliased
+        versions of this column  as well as existing columns with the
+        same key.
+
+        e.g.::
+
+            t = Table("sometable", metadata, Column("col1", Integer))
+            t.columns.replace(Column("col1", Integer, key="columnone"))
+
+        will remove the original 'col1' from the collection, and add
+        the new column under the name 'columnname'.
+
+        Used by schema.Column to override columns during table reflection.
+
+        """
+
+        if extra_remove:
+            remove_col = set(extra_remove)
+        else:
+            remove_col = set()
+        # remove up to two columns based on matches of name as well as key
+        if column.name in self._index and column.key != column.name:
+            other = self._index[column.name][1]
+            if other.name == other.key:
+                remove_col.add(other)
+
+        if column.key in self._index:
+            remove_col.add(self._index[column.key][1])
+
+        if not remove_col:
+            self._append_new_column(column.key, column)
+            return
+        new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
+        replaced = False
+        for k, col, metrics in self._collection:
+            if col in remove_col:
+                if not replaced:
+                    replaced = True
+                    new_cols.append(
+                        (column.key, column, _ColumnMetrics(self, column))
+                    )
+            else:
+                new_cols.append((k, col, metrics))
+
+        if remove_col:
+            self._colset.difference_update(remove_col)
+
+            for rc in remove_col:
+                for metrics in self._proxy_index.get(rc, ()):
+                    metrics.dispose(self)
+
+        if not replaced:
+            new_cols.append((column.key, column, _ColumnMetrics(self, column)))
+
+        self._colset.add(column._deannotate())
+        self._collection[:] = new_cols
+
+        self._index.clear()
+
+        self._index.update(
+            {idx: (k, col) for idx, (k, col, _) in enumerate(self._collection)}
+        )
+        self._index.update({k: (k, col) for (k, col, _) in self._collection})
+
+
+class ReadOnlyColumnCollection(
+    util.ReadOnlyContainer, ColumnCollection[_COLKEY, _COL_co]
+):
+    __slots__ = ("_parent",)
+
+    def __init__(self, collection):
+        object.__setattr__(self, "_parent", collection)
+        object.__setattr__(self, "_colset", collection._colset)
+        object.__setattr__(self, "_index", collection._index)
+        object.__setattr__(self, "_collection", collection._collection)
+        object.__setattr__(self, "_proxy_index", collection._proxy_index)
+
+    def __getstate__(self):
+        return {"_parent": self._parent}
+
+    def __setstate__(self, state):
+        parent = state["_parent"]
+        self.__init__(parent)  # type: ignore
+
+    def add(self, column: Any, key: Any = ...) -> Any:
+        self._readonly()
+
+    def extend(self, elements: Any) -> NoReturn:
+        self._readonly()
+
+    def remove(self, item: Any) -> NoReturn:
+        self._readonly()
+
+
+class ColumnSet(util.OrderedSet["ColumnClause[Any]"]):
+    def contains_column(self, col):
+        return col in self
+
+    def extend(self, cols):
+        for col in cols:
+            self.add(col)
+
+    def __eq__(self, other):
+        l = []
+        for c in other:
+            for local in self:
+                if c.shares_lineage(local):
+                    l.append(c == local)
+        return elements.and_(*l)
+
+    def __hash__(self):  # type: ignore[override]
+        return hash(tuple(x for x in self))
+
+
+def _entity_namespace(
+    entity: Union[_HasEntityNamespace, ExternallyTraversible]
+) -> _EntityNamespace:
+    """Return the nearest .entity_namespace for the given entity.
+
+    If not immediately available, does an iterate to find a sub-element
+    that has one, if any.
+
+    """
+    try:
+        return cast(_HasEntityNamespace, entity).entity_namespace
+    except AttributeError:
+        for elem in visitors.iterate(cast(ExternallyTraversible, entity)):
+            if _is_has_entity_namespace(elem):
+                return elem.entity_namespace
+        else:
+            raise
+
+
+def _entity_namespace_key(
+    entity: Union[_HasEntityNamespace, ExternallyTraversible],
+    key: str,
+    default: Union[SQLCoreOperations[Any], _NoArg] = NO_ARG,
+) -> SQLCoreOperations[Any]:
+    """Return an entry from an entity_namespace.
+
+
+    Raises :class:`_exc.InvalidRequestError` rather than attribute error
+    on not found.
+
+    """
+
+    try:
+        ns = _entity_namespace(entity)
+        if default is not NO_ARG:
+            return getattr(ns, key, default)
+        else:
+            return getattr(ns, key)  # type: ignore
+    except AttributeError as err:
+        raise exc.InvalidRequestError(
+            'Entity namespace for "%s" has no property "%s"' % (entity, key)
+        ) from err