about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/orm/util.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/orm/util.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/orm/util.py2402
1 files changed, 2402 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/orm/util.py b/.venv/lib/python3.12/site-packages/sqlalchemy/orm/util.py
new file mode 100644
index 00000000..48282b2d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/orm/util.py
@@ -0,0 +1,2402 @@
+# orm/util.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: allow-untyped-defs, allow-untyped-calls
+
+from __future__ import annotations
+
+import enum
+import functools
+import re
+import types
+import typing
+from typing import AbstractSet
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import FrozenSet
+from typing import Generic
+from typing import Iterable
+from typing import Iterator
+from typing import List
+from typing import Match
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+import weakref
+
+from . import attributes  # noqa
+from . import exc
+from ._typing import _O
+from ._typing import insp_is_aliased_class
+from ._typing import insp_is_mapper
+from ._typing import prop_is_relationship
+from .base import _class_to_mapper as _class_to_mapper
+from .base import _MappedAnnotationBase
+from .base import _never_set as _never_set  # noqa: F401
+from .base import _none_only_set as _none_only_set  # noqa: F401
+from .base import _none_set as _none_set  # noqa: F401
+from .base import attribute_str as attribute_str  # noqa: F401
+from .base import class_mapper as class_mapper
+from .base import DynamicMapped
+from .base import InspectionAttr as InspectionAttr
+from .base import instance_str as instance_str  # noqa: F401
+from .base import Mapped
+from .base import object_mapper as object_mapper
+from .base import object_state as object_state  # noqa: F401
+from .base import opt_manager_of_class
+from .base import ORMDescriptor
+from .base import state_attribute_str as state_attribute_str  # noqa: F401
+from .base import state_class_str as state_class_str  # noqa: F401
+from .base import state_str as state_str  # noqa: F401
+from .base import WriteOnlyMapped
+from .interfaces import CriteriaOption
+from .interfaces import MapperProperty as MapperProperty
+from .interfaces import ORMColumnsClauseRole
+from .interfaces import ORMEntityColumnsClauseRole
+from .interfaces import ORMFromClauseRole
+from .path_registry import PathRegistry as PathRegistry
+from .. import event
+from .. import exc as sa_exc
+from .. import inspection
+from .. import sql
+from .. import util
+from ..engine.result import result_tuple
+from ..sql import coercions
+from ..sql import expression
+from ..sql import lambdas
+from ..sql import roles
+from ..sql import util as sql_util
+from ..sql import visitors
+from ..sql._typing import is_selectable
+from ..sql.annotation import SupportsCloneAnnotations
+from ..sql.base import ColumnCollection
+from ..sql.cache_key import HasCacheKey
+from ..sql.cache_key import MemoizedHasCacheKey
+from ..sql.elements import ColumnElement
+from ..sql.elements import KeyedColumnElement
+from ..sql.selectable import FromClause
+from ..util.langhelpers import MemoizedSlots
+from ..util.typing import de_stringify_annotation as _de_stringify_annotation
+from ..util.typing import eval_name_only as _eval_name_only
+from ..util.typing import fixup_container_fwd_refs
+from ..util.typing import get_origin
+from ..util.typing import is_origin_of_cls
+from ..util.typing import Literal
+from ..util.typing import Protocol
+
+if typing.TYPE_CHECKING:
+    from ._typing import _EntityType
+    from ._typing import _IdentityKeyType
+    from ._typing import _InternalEntityType
+    from ._typing import _ORMCOLEXPR
+    from .context import _MapperEntity
+    from .context import ORMCompileState
+    from .mapper import Mapper
+    from .path_registry import AbstractEntityRegistry
+    from .query import Query
+    from .relationships import RelationshipProperty
+    from ..engine import Row
+    from ..engine import RowMapping
+    from ..sql._typing import _CE
+    from ..sql._typing import _ColumnExpressionArgument
+    from ..sql._typing import _EquivalentColumnMap
+    from ..sql._typing import _FromClauseArgument
+    from ..sql._typing import _OnClauseArgument
+    from ..sql._typing import _PropagateAttrsType
+    from ..sql.annotation import _SA
+    from ..sql.base import ReadOnlyColumnCollection
+    from ..sql.elements import BindParameter
+    from ..sql.selectable import _ColumnsClauseElement
+    from ..sql.selectable import Select
+    from ..sql.selectable import Selectable
+    from ..sql.visitors import anon_map
+    from ..util.typing import _AnnotationScanType
+
+_T = TypeVar("_T", bound=Any)
+
+all_cascades = frozenset(
+    (
+        "delete",
+        "delete-orphan",
+        "all",
+        "merge",
+        "expunge",
+        "save-update",
+        "refresh-expire",
+        "none",
+    )
+)
+
+_de_stringify_partial = functools.partial(
+    functools.partial,
+    locals_=util.immutabledict(
+        {
+            "Mapped": Mapped,
+            "WriteOnlyMapped": WriteOnlyMapped,
+            "DynamicMapped": DynamicMapped,
+        }
+    ),
+)
+
+# partial is practically useless as we have to write out the whole
+# function and maintain the signature anyway
+
+
+class _DeStringifyAnnotation(Protocol):
+    def __call__(
+        self,
+        cls: Type[Any],
+        annotation: _AnnotationScanType,
+        originating_module: str,
+        *,
+        str_cleanup_fn: Optional[Callable[[str, str], str]] = None,
+        include_generic: bool = False,
+    ) -> Type[Any]: ...
+
+
+de_stringify_annotation = cast(
+    _DeStringifyAnnotation, _de_stringify_partial(_de_stringify_annotation)
+)
+
+
+class _EvalNameOnly(Protocol):
+    def __call__(self, name: str, module_name: str) -> Any: ...
+
+
+eval_name_only = cast(_EvalNameOnly, _de_stringify_partial(_eval_name_only))
+
+
+class CascadeOptions(FrozenSet[str]):
+    """Keeps track of the options sent to
+    :paramref:`.relationship.cascade`"""
+
+    _add_w_all_cascades = all_cascades.difference(
+        ["all", "none", "delete-orphan"]
+    )
+    _allowed_cascades = all_cascades
+
+    _viewonly_cascades = ["expunge", "all", "none", "refresh-expire", "merge"]
+
+    __slots__ = (
+        "save_update",
+        "delete",
+        "refresh_expire",
+        "merge",
+        "expunge",
+        "delete_orphan",
+    )
+
+    save_update: bool
+    delete: bool
+    refresh_expire: bool
+    merge: bool
+    expunge: bool
+    delete_orphan: bool
+
+    def __new__(
+        cls, value_list: Optional[Union[Iterable[str], str]]
+    ) -> CascadeOptions:
+        if isinstance(value_list, str) or value_list is None:
+            return cls.from_string(value_list)  # type: ignore
+        values = set(value_list)
+        if values.difference(cls._allowed_cascades):
+            raise sa_exc.ArgumentError(
+                "Invalid cascade option(s): %s"
+                % ", ".join(
+                    [
+                        repr(x)
+                        for x in sorted(
+                            values.difference(cls._allowed_cascades)
+                        )
+                    ]
+                )
+            )
+
+        if "all" in values:
+            values.update(cls._add_w_all_cascades)
+        if "none" in values:
+            values.clear()
+        values.discard("all")
+
+        self = super().__new__(cls, values)
+        self.save_update = "save-update" in values
+        self.delete = "delete" in values
+        self.refresh_expire = "refresh-expire" in values
+        self.merge = "merge" in values
+        self.expunge = "expunge" in values
+        self.delete_orphan = "delete-orphan" in values
+
+        if self.delete_orphan and not self.delete:
+            util.warn("The 'delete-orphan' cascade option requires 'delete'.")
+        return self
+
+    def __repr__(self):
+        return "CascadeOptions(%r)" % (",".join([x for x in sorted(self)]))
+
+    @classmethod
+    def from_string(cls, arg):
+        values = [c for c in re.split(r"\s*,\s*", arg or "") if c]
+        return cls(values)
+
+
+def _validator_events(desc, key, validator, include_removes, include_backrefs):
+    """Runs a validation method on an attribute value to be set or
+    appended.
+    """
+
+    if not include_backrefs:
+
+        def detect_is_backref(state, initiator):
+            impl = state.manager[key].impl
+            return initiator.impl is not impl
+
+    if include_removes:
+
+        def append(state, value, initiator):
+            if initiator.op is not attributes.OP_BULK_REPLACE and (
+                include_backrefs or not detect_is_backref(state, initiator)
+            ):
+                return validator(state.obj(), key, value, False)
+            else:
+                return value
+
+        def bulk_set(state, values, initiator):
+            if include_backrefs or not detect_is_backref(state, initiator):
+                obj = state.obj()
+                values[:] = [
+                    validator(obj, key, value, False) for value in values
+                ]
+
+        def set_(state, value, oldvalue, initiator):
+            if include_backrefs or not detect_is_backref(state, initiator):
+                return validator(state.obj(), key, value, False)
+            else:
+                return value
+
+        def remove(state, value, initiator):
+            if include_backrefs or not detect_is_backref(state, initiator):
+                validator(state.obj(), key, value, True)
+
+    else:
+
+        def append(state, value, initiator):
+            if initiator.op is not attributes.OP_BULK_REPLACE and (
+                include_backrefs or not detect_is_backref(state, initiator)
+            ):
+                return validator(state.obj(), key, value)
+            else:
+                return value
+
+        def bulk_set(state, values, initiator):
+            if include_backrefs or not detect_is_backref(state, initiator):
+                obj = state.obj()
+                values[:] = [validator(obj, key, value) for value in values]
+
+        def set_(state, value, oldvalue, initiator):
+            if include_backrefs or not detect_is_backref(state, initiator):
+                return validator(state.obj(), key, value)
+            else:
+                return value
+
+    event.listen(desc, "append", append, raw=True, retval=True)
+    event.listen(desc, "bulk_replace", bulk_set, raw=True)
+    event.listen(desc, "set", set_, raw=True, retval=True)
+    if include_removes:
+        event.listen(desc, "remove", remove, raw=True, retval=True)
+
+
+def polymorphic_union(
+    table_map, typecolname, aliasname="p_union", cast_nulls=True
+):
+    """Create a ``UNION`` statement used by a polymorphic mapper.
+
+    See  :ref:`concrete_inheritance` for an example of how
+    this is used.
+
+    :param table_map: mapping of polymorphic identities to
+     :class:`_schema.Table` objects.
+    :param typecolname: string name of a "discriminator" column, which will be
+     derived from the query, producing the polymorphic identity for
+     each row.  If ``None``, no polymorphic discriminator is generated.
+    :param aliasname: name of the :func:`~sqlalchemy.sql.expression.alias()`
+     construct generated.
+    :param cast_nulls: if True, non-existent columns, which are represented
+     as labeled NULLs, will be passed into CAST.   This is a legacy behavior
+     that is problematic on some backends such as Oracle - in which case it
+     can be set to False.
+
+    """
+
+    colnames: util.OrderedSet[str] = util.OrderedSet()
+    colnamemaps = {}
+    types = {}
+    for key in table_map:
+        table = table_map[key]
+
+        table = coercions.expect(
+            roles.StrictFromClauseRole, table, allow_select=True
+        )
+        table_map[key] = table
+
+        m = {}
+        for c in table.c:
+            if c.key == typecolname:
+                raise sa_exc.InvalidRequestError(
+                    "Polymorphic union can't use '%s' as the discriminator "
+                    "column due to mapped column %r; please apply the "
+                    "'typecolname' "
+                    "argument; this is available on "
+                    "ConcreteBase as '_concrete_discriminator_name'"
+                    % (typecolname, c)
+                )
+            colnames.add(c.key)
+            m[c.key] = c
+            types[c.key] = c.type
+        colnamemaps[table] = m
+
+    def col(name, table):
+        try:
+            return colnamemaps[table][name]
+        except KeyError:
+            if cast_nulls:
+                return sql.cast(sql.null(), types[name]).label(name)
+            else:
+                return sql.type_coerce(sql.null(), types[name]).label(name)
+
+    result = []
+    for type_, table in table_map.items():
+        if typecolname is not None:
+            result.append(
+                sql.select(
+                    *(
+                        [col(name, table) for name in colnames]
+                        + [
+                            sql.literal_column(
+                                sql_util._quote_ddl_expr(type_)
+                            ).label(typecolname)
+                        ]
+                    )
+                ).select_from(table)
+            )
+        else:
+            result.append(
+                sql.select(
+                    *[col(name, table) for name in colnames]
+                ).select_from(table)
+            )
+    return sql.union_all(*result).alias(aliasname)
+
+
+def identity_key(
+    class_: Optional[Type[_T]] = None,
+    ident: Union[Any, Tuple[Any, ...]] = None,
+    *,
+    instance: Optional[_T] = None,
+    row: Optional[Union[Row[Any], RowMapping]] = None,
+    identity_token: Optional[Any] = None,
+) -> _IdentityKeyType[_T]:
+    r"""Generate "identity key" tuples, as are used as keys in the
+    :attr:`.Session.identity_map` dictionary.
+
+    This function has several call styles:
+
+    * ``identity_key(class, ident, identity_token=token)``
+
+      This form receives a mapped class and a primary key scalar or
+      tuple as an argument.
+
+      E.g.::
+
+        >>> identity_key(MyClass, (1, 2))
+        (<class '__main__.MyClass'>, (1, 2), None)
+
+      :param class: mapped class (must be a positional argument)
+      :param ident: primary key, may be a scalar or tuple argument.
+      :param identity_token: optional identity token
+
+        .. versionadded:: 1.2 added identity_token
+
+
+    * ``identity_key(instance=instance)``
+
+      This form will produce the identity key for a given instance.  The
+      instance need not be persistent, only that its primary key attributes
+      are populated (else the key will contain ``None`` for those missing
+      values).
+
+      E.g.::
+
+        >>> instance = MyClass(1, 2)
+        >>> identity_key(instance=instance)
+        (<class '__main__.MyClass'>, (1, 2), None)
+
+      In this form, the given instance is ultimately run though
+      :meth:`_orm.Mapper.identity_key_from_instance`, which will have the
+      effect of performing a database check for the corresponding row
+      if the object is expired.
+
+      :param instance: object instance (must be given as a keyword arg)
+
+    * ``identity_key(class, row=row, identity_token=token)``
+
+      This form is similar to the class/tuple form, except is passed a
+      database result row as a :class:`.Row` or :class:`.RowMapping` object.
+
+      E.g.::
+
+        >>> row = engine.execute(text("select * from table where a=1 and b=2")).first()
+        >>> identity_key(MyClass, row=row)
+        (<class '__main__.MyClass'>, (1, 2), None)
+
+      :param class: mapped class (must be a positional argument)
+      :param row: :class:`.Row` row returned by a :class:`_engine.CursorResult`
+       (must be given as a keyword arg)
+      :param identity_token: optional identity token
+
+        .. versionadded:: 1.2 added identity_token
+
+    """  # noqa: E501
+    if class_ is not None:
+        mapper = class_mapper(class_)
+        if row is None:
+            if ident is None:
+                raise sa_exc.ArgumentError("ident or row is required")
+            return mapper.identity_key_from_primary_key(
+                tuple(util.to_list(ident)), identity_token=identity_token
+            )
+        else:
+            return mapper.identity_key_from_row(
+                row, identity_token=identity_token
+            )
+    elif instance is not None:
+        mapper = object_mapper(instance)
+        return mapper.identity_key_from_instance(instance)
+    else:
+        raise sa_exc.ArgumentError("class or instance is required")
+
+
+class _TraceAdaptRole(enum.Enum):
+    """Enumeration of all the use cases for ORMAdapter.
+
+    ORMAdapter remains one of the most complicated aspects of the ORM, as it is
+    used for in-place adaption of column expressions to be applied to a SELECT,
+    replacing :class:`.Table` and other objects that are mapped to classes with
+    aliases of those tables in the case of joined eager loading, or in the case
+    of polymorphic loading as used with concrete mappings or other custom "with
+    polymorphic" parameters, with whole user-defined subqueries. The
+    enumerations provide an overview of all the use cases used by ORMAdapter, a
+    layer of formality as to the introduction of new ORMAdapter use cases (of
+    which none are anticipated), as well as a means to trace the origins of a
+    particular ORMAdapter within runtime debugging.
+
+    SQLAlchemy 2.0 has greatly scaled back ORM features which relied heavily on
+    open-ended statement adaption, including the ``Query.with_polymorphic()``
+    method and the ``Query.select_from_entity()`` methods, favoring
+    user-explicit aliasing schemes using the ``aliased()`` and
+    ``with_polymorphic()`` standalone constructs; these still use adaption,
+    however the adaption is applied in a narrower scope.
+
+    """
+
+    # aliased() use that is used to adapt individual attributes at query
+    # construction time
+    ALIASED_INSP = enum.auto()
+
+    # joinedload cases; typically adapt an ON clause of a relationship
+    # join
+    JOINEDLOAD_USER_DEFINED_ALIAS = enum.auto()
+    JOINEDLOAD_PATH_WITH_POLYMORPHIC = enum.auto()
+    JOINEDLOAD_MEMOIZED_ADAPTER = enum.auto()
+
+    # polymorphic cases - these are complex ones that replace FROM
+    # clauses, replacing tables with subqueries
+    MAPPER_POLYMORPHIC_ADAPTER = enum.auto()
+    WITH_POLYMORPHIC_ADAPTER = enum.auto()
+    WITH_POLYMORPHIC_ADAPTER_RIGHT_JOIN = enum.auto()
+    DEPRECATED_JOIN_ADAPT_RIGHT_SIDE = enum.auto()
+
+    # the from_statement() case, used only to adapt individual attributes
+    # from a given statement to local ORM attributes at result fetching
+    # time.  assigned to ORMCompileState._from_obj_alias
+    ADAPT_FROM_STATEMENT = enum.auto()
+
+    # the joinedload for queries that have LIMIT/OFFSET/DISTINCT case;
+    # the query is placed inside of a subquery with the LIMIT/OFFSET/etc.,
+    # joinedloads are then placed on the outside.
+    # assigned to ORMCompileState.compound_eager_adapter
+    COMPOUND_EAGER_STATEMENT = enum.auto()
+
+    # the legacy Query._set_select_from() case.
+    # this is needed for Query's set operations (i.e. UNION, etc. )
+    # as well as "legacy from_self()", which while removed from 2.0 as
+    # public API, is used for the Query.count() method.  this one
+    # still does full statement traversal
+    # assigned to ORMCompileState._from_obj_alias
+    LEGACY_SELECT_FROM_ALIAS = enum.auto()
+
+
+class ORMStatementAdapter(sql_util.ColumnAdapter):
+    """ColumnAdapter which includes a role attribute."""
+
+    __slots__ = ("role",)
+
+    def __init__(
+        self,
+        role: _TraceAdaptRole,
+        selectable: Selectable,
+        *,
+        equivalents: Optional[_EquivalentColumnMap] = None,
+        adapt_required: bool = False,
+        allow_label_resolve: bool = True,
+        anonymize_labels: bool = False,
+        adapt_on_names: bool = False,
+        adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
+    ):
+        self.role = role
+        super().__init__(
+            selectable,
+            equivalents=equivalents,
+            adapt_required=adapt_required,
+            allow_label_resolve=allow_label_resolve,
+            anonymize_labels=anonymize_labels,
+            adapt_on_names=adapt_on_names,
+            adapt_from_selectables=adapt_from_selectables,
+        )
+
+
+class ORMAdapter(sql_util.ColumnAdapter):
+    """ColumnAdapter subclass which excludes adaptation of entities from
+    non-matching mappers.
+
+    """
+
+    __slots__ = ("role", "mapper", "is_aliased_class", "aliased_insp")
+
+    is_aliased_class: bool
+    aliased_insp: Optional[AliasedInsp[Any]]
+
+    def __init__(
+        self,
+        role: _TraceAdaptRole,
+        entity: _InternalEntityType[Any],
+        *,
+        equivalents: Optional[_EquivalentColumnMap] = None,
+        adapt_required: bool = False,
+        allow_label_resolve: bool = True,
+        anonymize_labels: bool = False,
+        selectable: Optional[Selectable] = None,
+        limit_on_entity: bool = True,
+        adapt_on_names: bool = False,
+        adapt_from_selectables: Optional[AbstractSet[FromClause]] = None,
+    ):
+        self.role = role
+        self.mapper = entity.mapper
+        if selectable is None:
+            selectable = entity.selectable
+        if insp_is_aliased_class(entity):
+            self.is_aliased_class = True
+            self.aliased_insp = entity
+        else:
+            self.is_aliased_class = False
+            self.aliased_insp = None
+
+        super().__init__(
+            selectable,
+            equivalents,
+            adapt_required=adapt_required,
+            allow_label_resolve=allow_label_resolve,
+            anonymize_labels=anonymize_labels,
+            include_fn=self._include_fn if limit_on_entity else None,
+            adapt_on_names=adapt_on_names,
+            adapt_from_selectables=adapt_from_selectables,
+        )
+
+    def _include_fn(self, elem):
+        entity = elem._annotations.get("parentmapper", None)
+
+        return not entity or entity.isa(self.mapper) or self.mapper.isa(entity)
+
+
+class AliasedClass(
+    inspection.Inspectable["AliasedInsp[_O]"], ORMColumnsClauseRole[_O]
+):
+    r"""Represents an "aliased" form of a mapped class for usage with Query.
+
+    The ORM equivalent of a :func:`~sqlalchemy.sql.expression.alias`
+    construct, this object mimics the mapped class using a
+    ``__getattr__`` scheme and maintains a reference to a
+    real :class:`~sqlalchemy.sql.expression.Alias` object.
+
+    A primary purpose of :class:`.AliasedClass` is to serve as an alternate
+    within a SQL statement generated by the ORM, such that an existing
+    mapped entity can be used in multiple contexts.   A simple example::
+
+        # find all pairs of users with the same name
+        user_alias = aliased(User)
+        session.query(User, user_alias).join(
+            (user_alias, User.id > user_alias.id)
+        ).filter(User.name == user_alias.name)
+
+    :class:`.AliasedClass` is also capable of mapping an existing mapped
+    class to an entirely new selectable, provided this selectable is column-
+    compatible with the existing mapped selectable, and it can also be
+    configured in a mapping as the target of a :func:`_orm.relationship`.
+    See the links below for examples.
+
+    The :class:`.AliasedClass` object is constructed typically using the
+    :func:`_orm.aliased` function.   It also is produced with additional
+    configuration when using the :func:`_orm.with_polymorphic` function.
+
+    The resulting object is an instance of :class:`.AliasedClass`.
+    This object implements an attribute scheme which produces the
+    same attribute and method interface as the original mapped
+    class, allowing :class:`.AliasedClass` to be compatible
+    with any attribute technique which works on the original class,
+    including hybrid attributes (see :ref:`hybrids_toplevel`).
+
+    The :class:`.AliasedClass` can be inspected for its underlying
+    :class:`_orm.Mapper`, aliased selectable, and other information
+    using :func:`_sa.inspect`::
+
+        from sqlalchemy import inspect
+
+        my_alias = aliased(MyClass)
+        insp = inspect(my_alias)
+
+    The resulting inspection object is an instance of :class:`.AliasedInsp`.
+
+
+    .. seealso::
+
+        :func:`.aliased`
+
+        :func:`.with_polymorphic`
+
+        :ref:`relationship_aliased_class`
+
+        :ref:`relationship_to_window_function`
+
+
+    """
+
+    __name__: str
+
+    def __init__(
+        self,
+        mapped_class_or_ac: _EntityType[_O],
+        alias: Optional[FromClause] = None,
+        name: Optional[str] = None,
+        flat: bool = False,
+        adapt_on_names: bool = False,
+        with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]] = None,
+        with_polymorphic_discriminator: Optional[ColumnElement[Any]] = None,
+        base_alias: Optional[AliasedInsp[Any]] = None,
+        use_mapper_path: bool = False,
+        represents_outer_join: bool = False,
+    ):
+        insp = cast(
+            "_InternalEntityType[_O]", inspection.inspect(mapped_class_or_ac)
+        )
+        mapper = insp.mapper
+
+        nest_adapters = False
+
+        if alias is None:
+            if insp.is_aliased_class and insp.selectable._is_subquery:
+                alias = insp.selectable.alias()
+            else:
+                alias = (
+                    mapper._with_polymorphic_selectable._anonymous_fromclause(
+                        name=name,
+                        flat=flat,
+                    )
+                )
+        elif insp.is_aliased_class:
+            nest_adapters = True
+
+        assert alias is not None
+        self._aliased_insp = AliasedInsp(
+            self,
+            insp,
+            alias,
+            name,
+            (
+                with_polymorphic_mappers
+                if with_polymorphic_mappers
+                else mapper.with_polymorphic_mappers
+            ),
+            (
+                with_polymorphic_discriminator
+                if with_polymorphic_discriminator is not None
+                else mapper.polymorphic_on
+            ),
+            base_alias,
+            use_mapper_path,
+            adapt_on_names,
+            represents_outer_join,
+            nest_adapters,
+        )
+
+        self.__name__ = f"aliased({mapper.class_.__name__})"
+
+    @classmethod
+    def _reconstitute_from_aliased_insp(
+        cls, aliased_insp: AliasedInsp[_O]
+    ) -> AliasedClass[_O]:
+        obj = cls.__new__(cls)
+        obj.__name__ = f"aliased({aliased_insp.mapper.class_.__name__})"
+        obj._aliased_insp = aliased_insp
+
+        if aliased_insp._is_with_polymorphic:
+            for sub_aliased_insp in aliased_insp._with_polymorphic_entities:
+                if sub_aliased_insp is not aliased_insp:
+                    ent = AliasedClass._reconstitute_from_aliased_insp(
+                        sub_aliased_insp
+                    )
+                    setattr(obj, sub_aliased_insp.class_.__name__, ent)
+
+        return obj
+
+    def __getattr__(self, key: str) -> Any:
+        try:
+            _aliased_insp = self.__dict__["_aliased_insp"]
+        except KeyError:
+            raise AttributeError()
+        else:
+            target = _aliased_insp._target
+            # maintain all getattr mechanics
+            attr = getattr(target, key)
+
+        # attribute is a method, that will be invoked against a
+        # "self"; so just return a new method with the same function and
+        # new self
+        if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
+            return types.MethodType(attr.__func__, self)
+
+        # attribute is a descriptor, that will be invoked against a
+        # "self"; so invoke the descriptor against this self
+        if hasattr(attr, "__get__"):
+            attr = attr.__get__(None, self)
+
+        # attributes within the QueryableAttribute system will want this
+        # to be invoked so the object can be adapted
+        if hasattr(attr, "adapt_to_entity"):
+            attr = attr.adapt_to_entity(_aliased_insp)
+            setattr(self, key, attr)
+
+        return attr
+
+    def _get_from_serialized(
+        self, key: str, mapped_class: _O, aliased_insp: AliasedInsp[_O]
+    ) -> Any:
+        # this method is only used in terms of the
+        # sqlalchemy.ext.serializer extension
+        attr = getattr(mapped_class, key)
+        if hasattr(attr, "__call__") and hasattr(attr, "__self__"):
+            return types.MethodType(attr.__func__, self)
+
+        # attribute is a descriptor, that will be invoked against a
+        # "self"; so invoke the descriptor against this self
+        if hasattr(attr, "__get__"):
+            attr = attr.__get__(None, self)
+
+        # attributes within the QueryableAttribute system will want this
+        # to be invoked so the object can be adapted
+        if hasattr(attr, "adapt_to_entity"):
+            aliased_insp._weak_entity = weakref.ref(self)
+            attr = attr.adapt_to_entity(aliased_insp)
+            setattr(self, key, attr)
+
+        return attr
+
+    def __repr__(self) -> str:
+        return "<AliasedClass at 0x%x; %s>" % (
+            id(self),
+            self._aliased_insp._target.__name__,
+        )
+
+    def __str__(self) -> str:
+        return str(self._aliased_insp)
+
+
+@inspection._self_inspects
+class AliasedInsp(
+    ORMEntityColumnsClauseRole[_O],
+    ORMFromClauseRole,
+    HasCacheKey,
+    InspectionAttr,
+    MemoizedSlots,
+    inspection.Inspectable["AliasedInsp[_O]"],
+    Generic[_O],
+):
+    """Provide an inspection interface for an
+    :class:`.AliasedClass` object.
+
+    The :class:`.AliasedInsp` object is returned
+    given an :class:`.AliasedClass` using the
+    :func:`_sa.inspect` function::
+
+        from sqlalchemy import inspect
+        from sqlalchemy.orm import aliased
+
+        my_alias = aliased(MyMappedClass)
+        insp = inspect(my_alias)
+
+    Attributes on :class:`.AliasedInsp`
+    include:
+
+    * ``entity`` - the :class:`.AliasedClass` represented.
+    * ``mapper`` - the :class:`_orm.Mapper` mapping the underlying class.
+    * ``selectable`` - the :class:`_expression.Alias`
+      construct which ultimately
+      represents an aliased :class:`_schema.Table` or
+      :class:`_expression.Select`
+      construct.
+    * ``name`` - the name of the alias.  Also is used as the attribute
+      name when returned in a result tuple from :class:`_query.Query`.
+    * ``with_polymorphic_mappers`` - collection of :class:`_orm.Mapper`
+      objects
+      indicating all those mappers expressed in the select construct
+      for the :class:`.AliasedClass`.
+    * ``polymorphic_on`` - an alternate column or SQL expression which
+      will be used as the "discriminator" for a polymorphic load.
+
+    .. seealso::
+
+        :ref:`inspection_toplevel`
+
+    """
+
+    __slots__ = (
+        "__weakref__",
+        "_weak_entity",
+        "mapper",
+        "selectable",
+        "name",
+        "_adapt_on_names",
+        "with_polymorphic_mappers",
+        "polymorphic_on",
+        "_use_mapper_path",
+        "_base_alias",
+        "represents_outer_join",
+        "persist_selectable",
+        "local_table",
+        "_is_with_polymorphic",
+        "_with_polymorphic_entities",
+        "_adapter",
+        "_target",
+        "__clause_element__",
+        "_memoized_values",
+        "_all_column_expressions",
+        "_nest_adapters",
+    )
+
+    _cache_key_traversal = [
+        ("name", visitors.ExtendedInternalTraversal.dp_string),
+        ("_adapt_on_names", visitors.ExtendedInternalTraversal.dp_boolean),
+        ("_use_mapper_path", visitors.ExtendedInternalTraversal.dp_boolean),
+        ("_target", visitors.ExtendedInternalTraversal.dp_inspectable),
+        ("selectable", visitors.ExtendedInternalTraversal.dp_clauseelement),
+        (
+            "with_polymorphic_mappers",
+            visitors.InternalTraversal.dp_has_cache_key_list,
+        ),
+        ("polymorphic_on", visitors.InternalTraversal.dp_clauseelement),
+    ]
+
+    mapper: Mapper[_O]
+    selectable: FromClause
+    _adapter: ORMAdapter
+    with_polymorphic_mappers: Sequence[Mapper[Any]]
+    _with_polymorphic_entities: Sequence[AliasedInsp[Any]]
+
+    _weak_entity: weakref.ref[AliasedClass[_O]]
+    """the AliasedClass that refers to this AliasedInsp"""
+
+    _target: Union[Type[_O], AliasedClass[_O]]
+    """the thing referenced by the AliasedClass/AliasedInsp.
+
+    In the vast majority of cases, this is the mapped class.  However
+    it may also be another AliasedClass (alias of alias).
+
+    """
+
+    def __init__(
+        self,
+        entity: AliasedClass[_O],
+        inspected: _InternalEntityType[_O],
+        selectable: FromClause,
+        name: Optional[str],
+        with_polymorphic_mappers: Optional[Sequence[Mapper[Any]]],
+        polymorphic_on: Optional[ColumnElement[Any]],
+        _base_alias: Optional[AliasedInsp[Any]],
+        _use_mapper_path: bool,
+        adapt_on_names: bool,
+        represents_outer_join: bool,
+        nest_adapters: bool,
+    ):
+        mapped_class_or_ac = inspected.entity
+        mapper = inspected.mapper
+
+        self._weak_entity = weakref.ref(entity)
+        self.mapper = mapper
+        self.selectable = self.persist_selectable = self.local_table = (
+            selectable
+        )
+        self.name = name
+        self.polymorphic_on = polymorphic_on
+        self._base_alias = weakref.ref(_base_alias or self)
+        self._use_mapper_path = _use_mapper_path
+        self.represents_outer_join = represents_outer_join
+        self._nest_adapters = nest_adapters
+
+        if with_polymorphic_mappers:
+            self._is_with_polymorphic = True
+            self.with_polymorphic_mappers = with_polymorphic_mappers
+            self._with_polymorphic_entities = []
+            for poly in self.with_polymorphic_mappers:
+                if poly is not mapper:
+                    ent = AliasedClass(
+                        poly.class_,
+                        selectable,
+                        base_alias=self,
+                        adapt_on_names=adapt_on_names,
+                        use_mapper_path=_use_mapper_path,
+                    )
+
+                    setattr(self.entity, poly.class_.__name__, ent)
+                    self._with_polymorphic_entities.append(ent._aliased_insp)
+
+        else:
+            self._is_with_polymorphic = False
+            self.with_polymorphic_mappers = [mapper]
+
+        self._adapter = ORMAdapter(
+            _TraceAdaptRole.ALIASED_INSP,
+            mapper,
+            selectable=selectable,
+            equivalents=mapper._equivalent_columns,
+            adapt_on_names=adapt_on_names,
+            anonymize_labels=True,
+            # make sure the adapter doesn't try to grab other tables that
+            # are not even the thing we are mapping, such as embedded
+            # selectables in subqueries or CTEs.  See issue #6060
+            adapt_from_selectables={
+                m.selectable
+                for m in self.with_polymorphic_mappers
+                if not adapt_on_names
+            },
+            limit_on_entity=False,
+        )
+
+        if nest_adapters:
+            # supports "aliased class of aliased class" use case
+            assert isinstance(inspected, AliasedInsp)
+            self._adapter = inspected._adapter.wrap(self._adapter)
+
+        self._adapt_on_names = adapt_on_names
+        self._target = mapped_class_or_ac
+
+    @classmethod
+    def _alias_factory(
+        cls,
+        element: Union[_EntityType[_O], FromClause],
+        alias: Optional[FromClause] = None,
+        name: Optional[str] = None,
+        flat: bool = False,
+        adapt_on_names: bool = False,
+    ) -> Union[AliasedClass[_O], FromClause]:
+        if isinstance(element, FromClause):
+            if adapt_on_names:
+                raise sa_exc.ArgumentError(
+                    "adapt_on_names only applies to ORM elements"
+                )
+            if name:
+                return element.alias(name=name, flat=flat)
+            else:
+                return coercions.expect(
+                    roles.AnonymizedFromClauseRole, element, flat=flat
+                )
+        else:
+            return AliasedClass(
+                element,
+                alias=alias,
+                flat=flat,
+                name=name,
+                adapt_on_names=adapt_on_names,
+            )
+
+    @classmethod
+    def _with_polymorphic_factory(
+        cls,
+        base: Union[Type[_O], Mapper[_O]],
+        classes: Union[Literal["*"], Iterable[_EntityType[Any]]],
+        selectable: Union[Literal[False, None], FromClause] = False,
+        flat: bool = False,
+        polymorphic_on: Optional[ColumnElement[Any]] = None,
+        aliased: bool = False,
+        innerjoin: bool = False,
+        adapt_on_names: bool = False,
+        name: Optional[str] = None,
+        _use_mapper_path: bool = False,
+    ) -> AliasedClass[_O]:
+        primary_mapper = _class_to_mapper(base)
+
+        if selectable not in (None, False) and flat:
+            raise sa_exc.ArgumentError(
+                "the 'flat' and 'selectable' arguments cannot be passed "
+                "simultaneously to with_polymorphic()"
+            )
+
+        mappers, selectable = primary_mapper._with_polymorphic_args(
+            classes, selectable, innerjoin=innerjoin
+        )
+        if aliased or flat:
+            assert selectable is not None
+            selectable = selectable._anonymous_fromclause(flat=flat)
+
+        return AliasedClass(
+            base,
+            selectable,
+            name=name,
+            with_polymorphic_mappers=mappers,
+            adapt_on_names=adapt_on_names,
+            with_polymorphic_discriminator=polymorphic_on,
+            use_mapper_path=_use_mapper_path,
+            represents_outer_join=not innerjoin,
+        )
+
+    @property
+    def entity(self) -> AliasedClass[_O]:
+        # to eliminate reference cycles, the AliasedClass is held weakly.
+        # this produces some situations where the AliasedClass gets lost,
+        # particularly when one is created internally and only the AliasedInsp
+        # is passed around.
+        # to work around this case, we just generate a new one when we need
+        # it, as it is a simple class with very little initial state on it.
+        ent = self._weak_entity()
+        if ent is None:
+            ent = AliasedClass._reconstitute_from_aliased_insp(self)
+            self._weak_entity = weakref.ref(ent)
+        return ent
+
+    is_aliased_class = True
+    "always returns True"
+
+    def _memoized_method___clause_element__(self) -> FromClause:
+        return self.selectable._annotate(
+            {
+                "parentmapper": self.mapper,
+                "parententity": self,
+                "entity_namespace": self,
+            }
+        )._set_propagate_attrs(
+            {"compile_state_plugin": "orm", "plugin_subject": self}
+        )
+
+    @property
+    def entity_namespace(self) -> AliasedClass[_O]:
+        return self.entity
+
+    @property
+    def class_(self) -> Type[_O]:
+        """Return the mapped class ultimately represented by this
+        :class:`.AliasedInsp`."""
+        return self.mapper.class_
+
+    @property
+    def _path_registry(self) -> AbstractEntityRegistry:
+        if self._use_mapper_path:
+            return self.mapper._path_registry
+        else:
+            return PathRegistry.per_mapper(self)
+
+    def __getstate__(self) -> Dict[str, Any]:
+        return {
+            "entity": self.entity,
+            "mapper": self.mapper,
+            "alias": self.selectable,
+            "name": self.name,
+            "adapt_on_names": self._adapt_on_names,
+            "with_polymorphic_mappers": self.with_polymorphic_mappers,
+            "with_polymorphic_discriminator": self.polymorphic_on,
+            "base_alias": self._base_alias(),
+            "use_mapper_path": self._use_mapper_path,
+            "represents_outer_join": self.represents_outer_join,
+            "nest_adapters": self._nest_adapters,
+        }
+
+    def __setstate__(self, state: Dict[str, Any]) -> None:
+        self.__init__(  # type: ignore
+            state["entity"],
+            state["mapper"],
+            state["alias"],
+            state["name"],
+            state["with_polymorphic_mappers"],
+            state["with_polymorphic_discriminator"],
+            state["base_alias"],
+            state["use_mapper_path"],
+            state["adapt_on_names"],
+            state["represents_outer_join"],
+            state["nest_adapters"],
+        )
+
+    def _merge_with(self, other: AliasedInsp[_O]) -> AliasedInsp[_O]:
+        # assert self._is_with_polymorphic
+        # assert other._is_with_polymorphic
+
+        primary_mapper = other.mapper
+
+        assert self.mapper is primary_mapper
+
+        our_classes = util.to_set(
+            mp.class_ for mp in self.with_polymorphic_mappers
+        )
+        new_classes = {mp.class_ for mp in other.with_polymorphic_mappers}
+        if our_classes == new_classes:
+            return other
+        else:
+            classes = our_classes.union(new_classes)
+
+        mappers, selectable = primary_mapper._with_polymorphic_args(
+            classes, None, innerjoin=not other.represents_outer_join
+        )
+        selectable = selectable._anonymous_fromclause(flat=True)
+        return AliasedClass(
+            primary_mapper,
+            selectable,
+            with_polymorphic_mappers=mappers,
+            with_polymorphic_discriminator=other.polymorphic_on,
+            use_mapper_path=other._use_mapper_path,
+            represents_outer_join=other.represents_outer_join,
+        )._aliased_insp
+
+    def _adapt_element(
+        self, expr: _ORMCOLEXPR, key: Optional[str] = None
+    ) -> _ORMCOLEXPR:
+        assert isinstance(expr, ColumnElement)
+        d: Dict[str, Any] = {
+            "parententity": self,
+            "parentmapper": self.mapper,
+        }
+        if key:
+            d["proxy_key"] = key
+
+        # IMO mypy should see this one also as returning the same type
+        # we put into it, but it's not
+        return (
+            self._adapter.traverse(expr)
+            ._annotate(d)
+            ._set_propagate_attrs(
+                {"compile_state_plugin": "orm", "plugin_subject": self}
+            )
+        )
+
+    if TYPE_CHECKING:
+        # establish compatibility with the _ORMAdapterProto protocol,
+        # which in turn is compatible with _CoreAdapterProto.
+
+        def _orm_adapt_element(
+            self,
+            obj: _CE,
+            key: Optional[str] = None,
+        ) -> _CE: ...
+
+    else:
+        _orm_adapt_element = _adapt_element
+
+    def _entity_for_mapper(self, mapper):
+        self_poly = self.with_polymorphic_mappers
+        if mapper in self_poly:
+            if mapper is self.mapper:
+                return self
+            else:
+                return getattr(
+                    self.entity, mapper.class_.__name__
+                )._aliased_insp
+        elif mapper.isa(self.mapper):
+            return self
+        else:
+            assert False, "mapper %s doesn't correspond to %s" % (mapper, self)
+
+    def _memoized_attr__get_clause(self):
+        onclause, replacemap = self.mapper._get_clause
+        return (
+            self._adapter.traverse(onclause),
+            {
+                self._adapter.traverse(col): param
+                for col, param in replacemap.items()
+            },
+        )
+
+    def _memoized_attr__memoized_values(self):
+        return {}
+
+    def _memoized_attr__all_column_expressions(self):
+        if self._is_with_polymorphic:
+            cols_plus_keys = self.mapper._columns_plus_keys(
+                [ent.mapper for ent in self._with_polymorphic_entities]
+            )
+        else:
+            cols_plus_keys = self.mapper._columns_plus_keys()
+
+        cols_plus_keys = [
+            (key, self._adapt_element(col)) for key, col in cols_plus_keys
+        ]
+
+        return ColumnCollection(cols_plus_keys)
+
+    def _memo(self, key, callable_, *args, **kw):
+        if key in self._memoized_values:
+            return self._memoized_values[key]
+        else:
+            self._memoized_values[key] = value = callable_(*args, **kw)
+            return value
+
+    def __repr__(self):
+        if self.with_polymorphic_mappers:
+            with_poly = "(%s)" % ", ".join(
+                mp.class_.__name__ for mp in self.with_polymorphic_mappers
+            )
+        else:
+            with_poly = ""
+        return "<AliasedInsp at 0x%x; %s%s>" % (
+            id(self),
+            self.class_.__name__,
+            with_poly,
+        )
+
+    def __str__(self):
+        if self._is_with_polymorphic:
+            return "with_polymorphic(%s, [%s])" % (
+                self._target.__name__,
+                ", ".join(
+                    mp.class_.__name__
+                    for mp in self.with_polymorphic_mappers
+                    if mp is not self.mapper
+                ),
+            )
+        else:
+            return "aliased(%s)" % (self._target.__name__,)
+
+
+class _WrapUserEntity:
+    """A wrapper used within the loader_criteria lambda caller so that
+    we can bypass declared_attr descriptors on unmapped mixins, which
+    normally emit a warning for such use.
+
+    might also be useful for other per-lambda instrumentations should
+    the need arise.
+
+    """
+
+    __slots__ = ("subject",)
+
+    def __init__(self, subject):
+        self.subject = subject
+
+    @util.preload_module("sqlalchemy.orm.decl_api")
+    def __getattribute__(self, name):
+        decl_api = util.preloaded.orm.decl_api
+
+        subject = object.__getattribute__(self, "subject")
+        if name in subject.__dict__ and isinstance(
+            subject.__dict__[name], decl_api.declared_attr
+        ):
+            return subject.__dict__[name].fget(subject)
+        else:
+            return getattr(subject, name)
+
+
+class LoaderCriteriaOption(CriteriaOption):
+    """Add additional WHERE criteria to the load for all occurrences of
+    a particular entity.
+
+    :class:`_orm.LoaderCriteriaOption` is invoked using the
+    :func:`_orm.with_loader_criteria` function; see that function for
+    details.
+
+    .. versionadded:: 1.4
+
+    """
+
+    __slots__ = (
+        "root_entity",
+        "entity",
+        "deferred_where_criteria",
+        "where_criteria",
+        "_where_crit_orig",
+        "include_aliases",
+        "propagate_to_loaders",
+    )
+
+    _traverse_internals = [
+        ("root_entity", visitors.ExtendedInternalTraversal.dp_plain_obj),
+        ("entity", visitors.ExtendedInternalTraversal.dp_has_cache_key),
+        ("where_criteria", visitors.InternalTraversal.dp_clauseelement),
+        ("include_aliases", visitors.InternalTraversal.dp_boolean),
+        ("propagate_to_loaders", visitors.InternalTraversal.dp_boolean),
+    ]
+
+    root_entity: Optional[Type[Any]]
+    entity: Optional[_InternalEntityType[Any]]
+    where_criteria: Union[ColumnElement[bool], lambdas.DeferredLambdaElement]
+    deferred_where_criteria: bool
+    include_aliases: bool
+    propagate_to_loaders: bool
+
+    _where_crit_orig: Any
+
+    def __init__(
+        self,
+        entity_or_base: _EntityType[Any],
+        where_criteria: Union[
+            _ColumnExpressionArgument[bool],
+            Callable[[Any], _ColumnExpressionArgument[bool]],
+        ],
+        loader_only: bool = False,
+        include_aliases: bool = False,
+        propagate_to_loaders: bool = True,
+        track_closure_variables: bool = True,
+    ):
+        entity = cast(
+            "_InternalEntityType[Any]",
+            inspection.inspect(entity_or_base, False),
+        )
+        if entity is None:
+            self.root_entity = cast("Type[Any]", entity_or_base)
+            self.entity = None
+        else:
+            self.root_entity = None
+            self.entity = entity
+
+        self._where_crit_orig = where_criteria
+        if callable(where_criteria):
+            if self.root_entity is not None:
+                wrap_entity = self.root_entity
+            else:
+                assert entity is not None
+                wrap_entity = entity.entity
+
+            self.deferred_where_criteria = True
+            self.where_criteria = lambdas.DeferredLambdaElement(
+                where_criteria,
+                roles.WhereHavingRole,
+                lambda_args=(_WrapUserEntity(wrap_entity),),
+                opts=lambdas.LambdaOptions(
+                    track_closure_variables=track_closure_variables
+                ),
+            )
+        else:
+            self.deferred_where_criteria = False
+            self.where_criteria = coercions.expect(
+                roles.WhereHavingRole, where_criteria
+            )
+
+        self.include_aliases = include_aliases
+        self.propagate_to_loaders = propagate_to_loaders
+
+    @classmethod
+    def _unreduce(
+        cls, entity, where_criteria, include_aliases, propagate_to_loaders
+    ):
+        return LoaderCriteriaOption(
+            entity,
+            where_criteria,
+            include_aliases=include_aliases,
+            propagate_to_loaders=propagate_to_loaders,
+        )
+
+    def __reduce__(self):
+        return (
+            LoaderCriteriaOption._unreduce,
+            (
+                self.entity.class_ if self.entity else self.root_entity,
+                self._where_crit_orig,
+                self.include_aliases,
+                self.propagate_to_loaders,
+            ),
+        )
+
+    def _all_mappers(self) -> Iterator[Mapper[Any]]:
+        if self.entity:
+            yield from self.entity.mapper.self_and_descendants
+        else:
+            assert self.root_entity
+            stack = list(self.root_entity.__subclasses__())
+            while stack:
+                subclass = stack.pop(0)
+                ent = cast(
+                    "_InternalEntityType[Any]",
+                    inspection.inspect(subclass, raiseerr=False),
+                )
+                if ent:
+                    yield from ent.mapper.self_and_descendants
+                else:
+                    stack.extend(subclass.__subclasses__())
+
+    def _should_include(self, compile_state: ORMCompileState) -> bool:
+        if (
+            compile_state.select_statement._annotations.get(
+                "for_loader_criteria", None
+            )
+            is self
+        ):
+            return False
+        return True
+
+    def _resolve_where_criteria(
+        self, ext_info: _InternalEntityType[Any]
+    ) -> ColumnElement[bool]:
+        if self.deferred_where_criteria:
+            crit = cast(
+                "ColumnElement[bool]",
+                self.where_criteria._resolve_with_args(ext_info.entity),
+            )
+        else:
+            crit = self.where_criteria  # type: ignore
+        assert isinstance(crit, ColumnElement)
+        return sql_util._deep_annotate(
+            crit,
+            {"for_loader_criteria": self},
+            detect_subquery_cols=True,
+            ind_cols_on_fromclause=True,
+        )
+
+    def process_compile_state_replaced_entities(
+        self,
+        compile_state: ORMCompileState,
+        mapper_entities: Iterable[_MapperEntity],
+    ) -> None:
+        self.process_compile_state(compile_state)
+
+    def process_compile_state(self, compile_state: ORMCompileState) -> None:
+        """Apply a modification to a given :class:`.CompileState`."""
+
+        # if options to limit the criteria to immediate query only,
+        # use compile_state.attributes instead
+
+        self.get_global_criteria(compile_state.global_attributes)
+
+    def get_global_criteria(self, attributes: Dict[Any, Any]) -> None:
+        for mp in self._all_mappers():
+            load_criteria = attributes.setdefault(
+                ("additional_entity_criteria", mp), []
+            )
+
+            load_criteria.append(self)
+
+
+inspection._inspects(AliasedClass)(lambda target: target._aliased_insp)
+
+
+@inspection._inspects(type)
+def _inspect_mc(
+    class_: Type[_O],
+) -> Optional[Mapper[_O]]:
+    try:
+        class_manager = opt_manager_of_class(class_)
+        if class_manager is None or not class_manager.is_mapped:
+            return None
+        mapper = class_manager.mapper
+    except exc.NO_STATE:
+        return None
+    else:
+        return mapper
+
+
+GenericAlias = type(List[Any])
+
+
+@inspection._inspects(GenericAlias)
+def _inspect_generic_alias(
+    class_: Type[_O],
+) -> Optional[Mapper[_O]]:
+    origin = cast("Type[_O]", get_origin(class_))
+    return _inspect_mc(origin)
+
+
+@inspection._self_inspects
+class Bundle(
+    ORMColumnsClauseRole[_T],
+    SupportsCloneAnnotations,
+    MemoizedHasCacheKey,
+    inspection.Inspectable["Bundle[_T]"],
+    InspectionAttr,
+):
+    """A grouping of SQL expressions that are returned by a :class:`.Query`
+    under one namespace.
+
+    The :class:`.Bundle` essentially allows nesting of the tuple-based
+    results returned by a column-oriented :class:`_query.Query` object.
+    It also
+    is extensible via simple subclassing, where the primary capability
+    to override is that of how the set of expressions should be returned,
+    allowing post-processing as well as custom return types, without
+    involving ORM identity-mapped classes.
+
+    .. seealso::
+
+        :ref:`bundles`
+
+
+    """
+
+    single_entity = False
+    """If True, queries for a single Bundle will be returned as a single
+    entity, rather than an element within a keyed tuple."""
+
+    is_clause_element = False
+
+    is_mapper = False
+
+    is_aliased_class = False
+
+    is_bundle = True
+
+    _propagate_attrs: _PropagateAttrsType = util.immutabledict()
+
+    proxy_set = util.EMPTY_SET  # type: ignore
+
+    exprs: List[_ColumnsClauseElement]
+
+    def __init__(
+        self, name: str, *exprs: _ColumnExpressionArgument[Any], **kw: Any
+    ):
+        r"""Construct a new :class:`.Bundle`.
+
+        e.g.::
+
+            bn = Bundle("mybundle", MyClass.x, MyClass.y)
+
+            for row in session.query(bn).filter(bn.c.x == 5).filter(bn.c.y == 4):
+                print(row.mybundle.x, row.mybundle.y)
+
+        :param name: name of the bundle.
+        :param \*exprs: columns or SQL expressions comprising the bundle.
+        :param single_entity=False: if True, rows for this :class:`.Bundle`
+         can be returned as a "single entity" outside of any enclosing tuple
+         in the same manner as a mapped entity.
+
+        """  # noqa: E501
+        self.name = self._label = name
+        coerced_exprs = [
+            coercions.expect(
+                roles.ColumnsClauseRole, expr, apply_propagate_attrs=self
+            )
+            for expr in exprs
+        ]
+        self.exprs = coerced_exprs
+
+        self.c = self.columns = ColumnCollection(
+            (getattr(col, "key", col._label), col)
+            for col in [e._annotations.get("bundle", e) for e in coerced_exprs]
+        ).as_readonly()
+        self.single_entity = kw.pop("single_entity", self.single_entity)
+
+    def _gen_cache_key(
+        self, anon_map: anon_map, bindparams: List[BindParameter[Any]]
+    ) -> Tuple[Any, ...]:
+        return (self.__class__, self.name, self.single_entity) + tuple(
+            [expr._gen_cache_key(anon_map, bindparams) for expr in self.exprs]
+        )
+
+    @property
+    def mapper(self) -> Optional[Mapper[Any]]:
+        mp: Optional[Mapper[Any]] = self.exprs[0]._annotations.get(
+            "parentmapper", None
+        )
+        return mp
+
+    @property
+    def entity(self) -> Optional[_InternalEntityType[Any]]:
+        ie: Optional[_InternalEntityType[Any]] = self.exprs[
+            0
+        ]._annotations.get("parententity", None)
+        return ie
+
+    @property
+    def entity_namespace(
+        self,
+    ) -> ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]:
+        return self.c
+
+    columns: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
+
+    """A namespace of SQL expressions referred to by this :class:`.Bundle`.
+
+        e.g.::
+
+            bn = Bundle("mybundle", MyClass.x, MyClass.y)
+
+            q = sess.query(bn).filter(bn.c.x == 5)
+
+        Nesting of bundles is also supported::
+
+            b1 = Bundle(
+                "b1",
+                Bundle("b2", MyClass.a, MyClass.b),
+                Bundle("b3", MyClass.x, MyClass.y),
+            )
+
+            q = sess.query(b1).filter(b1.c.b2.c.a == 5).filter(b1.c.b3.c.y == 9)
+
+    .. seealso::
+
+        :attr:`.Bundle.c`
+
+    """  # noqa: E501
+
+    c: ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]
+    """An alias for :attr:`.Bundle.columns`."""
+
+    def _clone(self, **kw):
+        cloned = self.__class__.__new__(self.__class__)
+        cloned.__dict__.update(self.__dict__)
+        return cloned
+
+    def __clause_element__(self):
+        # ensure existing entity_namespace remains
+        annotations = {"bundle": self, "entity_namespace": self}
+        annotations.update(self._annotations)
+
+        plugin_subject = self.exprs[0]._propagate_attrs.get(
+            "plugin_subject", self.entity
+        )
+        return (
+            expression.ClauseList(
+                _literal_as_text_role=roles.ColumnsClauseRole,
+                group=False,
+                *[e._annotations.get("bundle", e) for e in self.exprs],
+            )
+            ._annotate(annotations)
+            ._set_propagate_attrs(
+                # the Bundle *must* use the orm plugin no matter what.  the
+                # subject can be None but it's much better if it's not.
+                {
+                    "compile_state_plugin": "orm",
+                    "plugin_subject": plugin_subject,
+                }
+            )
+        )
+
+    @property
+    def clauses(self):
+        return self.__clause_element__().clauses
+
+    def label(self, name):
+        """Provide a copy of this :class:`.Bundle` passing a new label."""
+
+        cloned = self._clone()
+        cloned.name = name
+        return cloned
+
+    def create_row_processor(
+        self,
+        query: Select[Any],
+        procs: Sequence[Callable[[Row[Any]], Any]],
+        labels: Sequence[str],
+    ) -> Callable[[Row[Any]], Any]:
+        """Produce the "row processing" function for this :class:`.Bundle`.
+
+        May be overridden by subclasses to provide custom behaviors when
+        results are fetched. The method is passed the statement object and a
+        set of "row processor" functions at query execution time; these
+        processor functions when given a result row will return the individual
+        attribute value, which can then be adapted into any kind of return data
+        structure.
+
+        The example below illustrates replacing the usual :class:`.Row`
+        return structure with a straight Python dictionary::
+
+            from sqlalchemy.orm import Bundle
+
+
+            class DictBundle(Bundle):
+                def create_row_processor(self, query, procs, labels):
+                    "Override create_row_processor to return values as dictionaries"
+
+                    def proc(row):
+                        return dict(zip(labels, (proc(row) for proc in procs)))
+
+                    return proc
+
+        A result from the above :class:`_orm.Bundle` will return dictionary
+        values::
+
+            bn = DictBundle("mybundle", MyClass.data1, MyClass.data2)
+            for row in session.execute(select(bn)).where(bn.c.data1 == "d1"):
+                print(row.mybundle["data1"], row.mybundle["data2"])
+
+        """  # noqa: E501
+        keyed_tuple = result_tuple(labels, [() for l in labels])
+
+        def proc(row: Row[Any]) -> Any:
+            return keyed_tuple([proc(row) for proc in procs])
+
+        return proc
+
+
+def _orm_annotate(element: _SA, exclude: Optional[Any] = None) -> _SA:
+    """Deep copy the given ClauseElement, annotating each element with the
+    "_orm_adapt" flag.
+
+    Elements within the exclude collection will be cloned but not annotated.
+
+    """
+    return sql_util._deep_annotate(element, {"_orm_adapt": True}, exclude)
+
+
+def _orm_deannotate(element: _SA) -> _SA:
+    """Remove annotations that link a column to a particular mapping.
+
+    Note this doesn't affect "remote" and "foreign" annotations
+    passed by the :func:`_orm.foreign` and :func:`_orm.remote`
+    annotators.
+
+    """
+
+    return sql_util._deep_deannotate(
+        element, values=("_orm_adapt", "parententity")
+    )
+
+
+def _orm_full_deannotate(element: _SA) -> _SA:
+    return sql_util._deep_deannotate(element)
+
+
+class _ORMJoin(expression.Join):
+    """Extend Join to support ORM constructs as input."""
+
+    __visit_name__ = expression.Join.__visit_name__
+
+    inherit_cache = True
+
+    def __init__(
+        self,
+        left: _FromClauseArgument,
+        right: _FromClauseArgument,
+        onclause: Optional[_OnClauseArgument] = None,
+        isouter: bool = False,
+        full: bool = False,
+        _left_memo: Optional[Any] = None,
+        _right_memo: Optional[Any] = None,
+        _extra_criteria: Tuple[ColumnElement[bool], ...] = (),
+    ):
+        left_info = cast(
+            "Union[FromClause, _InternalEntityType[Any]]",
+            inspection.inspect(left),
+        )
+
+        right_info = cast(
+            "Union[FromClause, _InternalEntityType[Any]]",
+            inspection.inspect(right),
+        )
+        adapt_to = right_info.selectable
+
+        # used by joined eager loader
+        self._left_memo = _left_memo
+        self._right_memo = _right_memo
+
+        if isinstance(onclause, attributes.QueryableAttribute):
+            if TYPE_CHECKING:
+                assert isinstance(
+                    onclause.comparator, RelationshipProperty.Comparator
+                )
+            on_selectable = onclause.comparator._source_selectable()
+            prop = onclause.property
+            _extra_criteria += onclause._extra_criteria
+        elif isinstance(onclause, MapperProperty):
+            # used internally by joined eager loader...possibly not ideal
+            prop = onclause
+            on_selectable = prop.parent.selectable
+        else:
+            prop = None
+            on_selectable = None
+
+        left_selectable = left_info.selectable
+        if prop:
+            adapt_from: Optional[FromClause]
+            if sql_util.clause_is_present(on_selectable, left_selectable):
+                adapt_from = on_selectable
+            else:
+                assert isinstance(left_selectable, FromClause)
+                adapt_from = left_selectable
+
+            (
+                pj,
+                sj,
+                source,
+                dest,
+                secondary,
+                target_adapter,
+            ) = prop._create_joins(
+                source_selectable=adapt_from,
+                dest_selectable=adapt_to,
+                source_polymorphic=True,
+                of_type_entity=right_info,
+                alias_secondary=True,
+                extra_criteria=_extra_criteria,
+            )
+
+            if sj is not None:
+                if isouter:
+                    # note this is an inner join from secondary->right
+                    right = sql.join(secondary, right, sj)
+                    onclause = pj
+                else:
+                    left = sql.join(left, secondary, pj, isouter)
+                    onclause = sj
+            else:
+                onclause = pj
+
+            self._target_adapter = target_adapter
+
+        # we don't use the normal coercions logic for _ORMJoin
+        # (probably should), so do some gymnastics to get the entity.
+        # logic here is for #8721, which was a major bug in 1.4
+        # for almost two years, not reported/fixed until 1.4.43 (!)
+        if is_selectable(left_info):
+            parententity = left_selectable._annotations.get(
+                "parententity", None
+            )
+        elif insp_is_mapper(left_info) or insp_is_aliased_class(left_info):
+            parententity = left_info
+        else:
+            parententity = None
+
+        if parententity is not None:
+            self._annotations = self._annotations.union(
+                {"parententity": parententity}
+            )
+
+        augment_onclause = bool(_extra_criteria) and not prop
+        expression.Join.__init__(self, left, right, onclause, isouter, full)
+
+        assert self.onclause is not None
+
+        if augment_onclause:
+            self.onclause &= sql.and_(*_extra_criteria)
+
+        if (
+            not prop
+            and getattr(right_info, "mapper", None)
+            and right_info.mapper.single  # type: ignore
+        ):
+            right_info = cast("_InternalEntityType[Any]", right_info)
+            # if single inheritance target and we are using a manual
+            # or implicit ON clause, augment it the same way we'd augment the
+            # WHERE.
+            single_crit = right_info.mapper._single_table_criterion
+            if single_crit is not None:
+                if insp_is_aliased_class(right_info):
+                    single_crit = right_info._adapter.traverse(single_crit)
+                self.onclause = self.onclause & single_crit
+
+    def _splice_into_center(self, other):
+        """Splice a join into the center.
+
+        Given join(a, b) and join(b, c), return join(a, b).join(c)
+
+        """
+        leftmost = other
+        while isinstance(leftmost, sql.Join):
+            leftmost = leftmost.left
+
+        assert self.right is leftmost
+
+        left = _ORMJoin(
+            self.left,
+            other.left,
+            self.onclause,
+            isouter=self.isouter,
+            _left_memo=self._left_memo,
+            _right_memo=other._left_memo._path_registry,
+        )
+
+        return _ORMJoin(
+            left,
+            other.right,
+            other.onclause,
+            isouter=other.isouter,
+            _right_memo=other._right_memo,
+        )
+
+    def join(
+        self,
+        right: _FromClauseArgument,
+        onclause: Optional[_OnClauseArgument] = None,
+        isouter: bool = False,
+        full: bool = False,
+    ) -> _ORMJoin:
+        return _ORMJoin(self, right, onclause, full=full, isouter=isouter)
+
+    def outerjoin(
+        self,
+        right: _FromClauseArgument,
+        onclause: Optional[_OnClauseArgument] = None,
+        full: bool = False,
+    ) -> _ORMJoin:
+        return _ORMJoin(self, right, onclause, isouter=True, full=full)
+
+
+def with_parent(
+    instance: object,
+    prop: attributes.QueryableAttribute[Any],
+    from_entity: Optional[_EntityType[Any]] = None,
+) -> ColumnElement[bool]:
+    """Create filtering criterion that relates this query's primary entity
+    to the given related instance, using established
+    :func:`_orm.relationship()`
+    configuration.
+
+    E.g.::
+
+        stmt = select(Address).where(with_parent(some_user, User.addresses))
+
+    The SQL rendered is the same as that rendered when a lazy loader
+    would fire off from the given parent on that attribute, meaning
+    that the appropriate state is taken from the parent object in
+    Python without the need to render joins to the parent table
+    in the rendered statement.
+
+    The given property may also make use of :meth:`_orm.PropComparator.of_type`
+    to indicate the left side of the criteria::
+
+
+        a1 = aliased(Address)
+        a2 = aliased(Address)
+        stmt = select(a1, a2).where(with_parent(u1, User.addresses.of_type(a2)))
+
+    The above use is equivalent to using the
+    :func:`_orm.with_parent.from_entity` argument::
+
+        a1 = aliased(Address)
+        a2 = aliased(Address)
+        stmt = select(a1, a2).where(
+            with_parent(u1, User.addresses, from_entity=a2)
+        )
+
+    :param instance:
+      An instance which has some :func:`_orm.relationship`.
+
+    :param property:
+      Class-bound attribute, which indicates
+      what relationship from the instance should be used to reconcile the
+      parent/child relationship.
+
+    :param from_entity:
+      Entity in which to consider as the left side.  This defaults to the
+      "zero" entity of the :class:`_query.Query` itself.
+
+      .. versionadded:: 1.2
+
+    """  # noqa: E501
+    prop_t: RelationshipProperty[Any]
+
+    if isinstance(prop, str):
+        raise sa_exc.ArgumentError(
+            "with_parent() accepts class-bound mapped attributes, not strings"
+        )
+    elif isinstance(prop, attributes.QueryableAttribute):
+        if prop._of_type:
+            from_entity = prop._of_type
+        mapper_property = prop.property
+        if mapper_property is None or not prop_is_relationship(
+            mapper_property
+        ):
+            raise sa_exc.ArgumentError(
+                f"Expected relationship property for with_parent(), "
+                f"got {mapper_property}"
+            )
+        prop_t = mapper_property
+    else:
+        prop_t = prop
+
+    return prop_t._with_parent(instance, from_entity=from_entity)
+
+
+def has_identity(object_: object) -> bool:
+    """Return True if the given object has a database
+    identity.
+
+    This typically corresponds to the object being
+    in either the persistent or detached state.
+
+    .. seealso::
+
+        :func:`.was_deleted`
+
+    """
+    state = attributes.instance_state(object_)
+    return state.has_identity
+
+
+def was_deleted(object_: object) -> bool:
+    """Return True if the given object was deleted
+    within a session flush.
+
+    This is regardless of whether or not the object is
+    persistent or detached.
+
+    .. seealso::
+
+        :attr:`.InstanceState.was_deleted`
+
+    """
+
+    state = attributes.instance_state(object_)
+    return state.was_deleted
+
+
+def _entity_corresponds_to(
+    given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
+) -> bool:
+    """determine if 'given' corresponds to 'entity', in terms
+    of an entity passed to Query that would match the same entity
+    being referred to elsewhere in the query.
+
+    """
+    if insp_is_aliased_class(entity):
+        if insp_is_aliased_class(given):
+            if entity._base_alias() is given._base_alias():
+                return True
+        return False
+    elif insp_is_aliased_class(given):
+        if given._use_mapper_path:
+            return entity in given.with_polymorphic_mappers
+        else:
+            return entity is given
+
+    assert insp_is_mapper(given)
+    return entity.common_parent(given)
+
+
+def _entity_corresponds_to_use_path_impl(
+    given: _InternalEntityType[Any], entity: _InternalEntityType[Any]
+) -> bool:
+    """determine if 'given' corresponds to 'entity', in terms
+    of a path of loader options where a mapped attribute is taken to
+    be a member of a parent entity.
+
+    e.g.::
+
+        someoption(A).someoption(A.b)  # -> fn(A, A) -> True
+        someoption(A).someoption(C.d)  # -> fn(A, C) -> False
+
+        a1 = aliased(A)
+        someoption(a1).someoption(A.b)  # -> fn(a1, A) -> False
+        someoption(a1).someoption(a1.b)  # -> fn(a1, a1) -> True
+
+        wp = with_polymorphic(A, [A1, A2])
+        someoption(wp).someoption(A1.foo)  # -> fn(wp, A1) -> False
+        someoption(wp).someoption(wp.A1.foo)  # -> fn(wp, wp.A1) -> True
+
+    """
+    if insp_is_aliased_class(given):
+        return (
+            insp_is_aliased_class(entity)
+            and not entity._use_mapper_path
+            and (given is entity or entity in given._with_polymorphic_entities)
+        )
+    elif not insp_is_aliased_class(entity):
+        return given.isa(entity.mapper)
+    else:
+        return (
+            entity._use_mapper_path
+            and given in entity.with_polymorphic_mappers
+        )
+
+
+def _entity_isa(given: _InternalEntityType[Any], mapper: Mapper[Any]) -> bool:
+    """determine if 'given' "is a" mapper, in terms of the given
+    would load rows of type 'mapper'.
+
+    """
+    if given.is_aliased_class:
+        return mapper in given.with_polymorphic_mappers or given.mapper.isa(
+            mapper
+        )
+    elif given.with_polymorphic_mappers:
+        return mapper in given.with_polymorphic_mappers or given.isa(mapper)
+    else:
+        return given.isa(mapper)
+
+
+def _getitem(iterable_query: Query[Any], item: Any) -> Any:
+    """calculate __getitem__ in terms of an iterable query object
+    that also has a slice() method.
+
+    """
+
+    def _no_negative_indexes():
+        raise IndexError(
+            "negative indexes are not accepted by SQL "
+            "index / slice operators"
+        )
+
+    if isinstance(item, slice):
+        start, stop, step = util.decode_slice(item)
+
+        if (
+            isinstance(stop, int)
+            and isinstance(start, int)
+            and stop - start <= 0
+        ):
+            return []
+
+        elif (isinstance(start, int) and start < 0) or (
+            isinstance(stop, int) and stop < 0
+        ):
+            _no_negative_indexes()
+
+        res = iterable_query.slice(start, stop)
+        if step is not None:
+            return list(res)[None : None : item.step]
+        else:
+            return list(res)
+    else:
+        if item == -1:
+            _no_negative_indexes()
+        else:
+            return list(iterable_query[item : item + 1])[0]
+
+
+def _is_mapped_annotation(
+    raw_annotation: _AnnotationScanType,
+    cls: Type[Any],
+    originating_cls: Type[Any],
+) -> bool:
+    try:
+        annotated = de_stringify_annotation(
+            cls, raw_annotation, originating_cls.__module__
+        )
+    except NameError:
+        # in most cases, at least within our own tests, we can raise
+        # here, which is more accurate as it prevents us from returning
+        # false negatives.  However, in the real world, try to avoid getting
+        # involved with end-user annotations that have nothing to do with us.
+        # see issue #8888 where we bypass using this function in the case
+        # that we want to detect an unresolvable Mapped[] type.
+        return False
+    else:
+        return is_origin_of_cls(annotated, _MappedAnnotationBase)
+
+
+class _CleanupError(Exception):
+    pass
+
+
+def _cleanup_mapped_str_annotation(
+    annotation: str, originating_module: str
+) -> str:
+    # fix up an annotation that comes in as the form:
+    # 'Mapped[List[Address]]'  so that it instead looks like:
+    # 'Mapped[List["Address"]]' , which will allow us to get
+    # "Address" as a string
+
+    # additionally, resolve symbols for these names since this is where
+    # we'd have to do it
+
+    inner: Optional[Match[str]]
+
+    mm = re.match(r"^([^ \|]+?)\[(.+)\]$", annotation)
+
+    if not mm:
+        return annotation
+
+    # ticket #8759.  Resolve the Mapped name to a real symbol.
+    # originally this just checked the name.
+    try:
+        obj = eval_name_only(mm.group(1), originating_module)
+    except NameError as ne:
+        raise _CleanupError(
+            f'For annotation "{annotation}", could not resolve '
+            f'container type "{mm.group(1)}".  '
+            "Please ensure this type is imported at the module level "
+            "outside of TYPE_CHECKING blocks"
+        ) from ne
+
+    if obj is typing.ClassVar:
+        real_symbol = "ClassVar"
+    else:
+        try:
+            if issubclass(obj, _MappedAnnotationBase):
+                real_symbol = obj.__name__
+            else:
+                return annotation
+        except TypeError:
+            # avoid isinstance(obj, type) check, just catch TypeError
+            return annotation
+
+    # note: if one of the codepaths above didn't define real_symbol and
+    # then didn't return, real_symbol raises UnboundLocalError
+    # which is actually a NameError, and the calling routines don't
+    # notice this since they are catching NameError anyway.   Just in case
+    # this is being modified in the future, something to be aware of.
+
+    stack = []
+    inner = mm
+    while True:
+        stack.append(real_symbol if mm is inner else inner.group(1))
+        g2 = inner.group(2)
+        inner = re.match(r"^([^ \|]+?)\[(.+)\]$", g2)
+        if inner is None:
+            stack.append(g2)
+            break
+
+    # stacks we want to rewrite, that is, quote the last entry which
+    # we think is a relationship class name:
+    #
+    #   ['Mapped', 'List', 'Address']
+    #   ['Mapped', 'A']
+    #
+    # stacks we dont want to rewrite, which are generally MappedColumn
+    # use cases:
+    #
+    # ['Mapped', "'Optional[Dict[str, str]]'"]
+    # ['Mapped', 'dict[str, str] | None']
+
+    if (
+        # avoid already quoted symbols such as
+        # ['Mapped', "'Optional[Dict[str, str]]'"]
+        not re.match(r"""^["'].*["']$""", stack[-1])
+        # avoid further generics like Dict[] such as
+        # ['Mapped', 'dict[str, str] | None'],
+        # ['Mapped', 'list[int] | list[str]'],
+        # ['Mapped', 'Union[list[int], list[str]]'],
+        and not re.search(r"[\[\]]", stack[-1])
+    ):
+        stripchars = "\"' "
+        stack[-1] = ", ".join(
+            f'"{elem.strip(stripchars)}"' for elem in stack[-1].split(",")
+        )
+
+        annotation = "[".join(stack) + ("]" * (len(stack) - 1))
+
+    return annotation
+
+
+def _extract_mapped_subtype(
+    raw_annotation: Optional[_AnnotationScanType],
+    cls: type,
+    originating_module: str,
+    key: str,
+    attr_cls: Type[Any],
+    required: bool,
+    is_dataclass_field: bool,
+    expect_mapped: bool = True,
+    raiseerr: bool = True,
+) -> Optional[Tuple[Union[_AnnotationScanType, str], Optional[type]]]:
+    """given an annotation, figure out if it's ``Mapped[something]`` and if
+    so, return the ``something`` part.
+
+    Includes error raise scenarios and other options.
+
+    """
+
+    if raw_annotation is None:
+        if required:
+            raise sa_exc.ArgumentError(
+                f"Python typing annotation is required for attribute "
+                f'"{cls.__name__}.{key}" when primary argument(s) for '
+                f'"{attr_cls.__name__}" construct are None or not present'
+            )
+        return None
+
+    try:
+        # destringify the "outside" of the annotation.  note we are not
+        # adding include_generic so it will *not* dig into generic contents,
+        # which will remain as ForwardRef or plain str under future annotations
+        # mode.  The full destringify happens later when mapped_column goes
+        # to do a full lookup in the registry type_annotations_map.
+        annotated = de_stringify_annotation(
+            cls,
+            raw_annotation,
+            originating_module,
+            str_cleanup_fn=_cleanup_mapped_str_annotation,
+        )
+    except _CleanupError as ce:
+        raise sa_exc.ArgumentError(
+            f"Could not interpret annotation {raw_annotation}.  "
+            "Check that it uses names that are correctly imported at the "
+            "module level. See chained stack trace for more hints."
+        ) from ce
+    except NameError as ne:
+        if raiseerr and "Mapped[" in raw_annotation:  # type: ignore
+            raise sa_exc.ArgumentError(
+                f"Could not interpret annotation {raw_annotation}.  "
+                "Check that it uses names that are correctly imported at the "
+                "module level. See chained stack trace for more hints."
+            ) from ne
+
+        annotated = raw_annotation  # type: ignore
+
+    if is_dataclass_field:
+        return annotated, None
+    else:
+        if not hasattr(annotated, "__origin__") or not is_origin_of_cls(
+            annotated, _MappedAnnotationBase
+        ):
+            if expect_mapped:
+                if not raiseerr:
+                    return None
+
+                origin = getattr(annotated, "__origin__", None)
+                if origin is typing.ClassVar:
+                    return None
+
+                # check for other kind of ORM descriptor like AssociationProxy,
+                # don't raise for that (issue #9957)
+                elif isinstance(origin, type) and issubclass(
+                    origin, ORMDescriptor
+                ):
+                    return None
+
+                raise sa_exc.ArgumentError(
+                    f'Type annotation for "{cls.__name__}.{key}" '
+                    "can't be correctly interpreted for "
+                    "Annotated Declarative Table form.  ORM annotations "
+                    "should normally make use of the ``Mapped[]`` generic "
+                    "type, or other ORM-compatible generic type, as a "
+                    "container for the actual type, which indicates the "
+                    "intent that the attribute is mapped. "
+                    "Class variables that are not intended to be mapped "
+                    "by the ORM should use ClassVar[].  "
+                    "To allow Annotated Declarative to disregard legacy "
+                    "annotations which don't use Mapped[] to pass, set "
+                    '"__allow_unmapped__ = True" on the class or a '
+                    "superclass this class.",
+                    code="zlpr",
+                )
+
+            else:
+                return annotated, None
+
+        if len(annotated.__args__) != 1:
+            raise sa_exc.ArgumentError(
+                "Expected sub-type for Mapped[] annotation"
+            )
+
+        return (
+            # fix dict/list/set args to be ForwardRef, see #11814
+            fixup_container_fwd_refs(annotated.__args__[0]),
+            annotated.__origin__,
+        )
+
+
+def _mapper_property_as_plain_name(prop: Type[Any]) -> str:
+    if hasattr(prop, "_mapper_property_name"):
+        name = prop._mapper_property_name()
+    else:
+        name = None
+    return util.clsname_as_plain_name(prop, name)