about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py2835
1 files changed, 2835 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py b/.venv/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py
new file mode 100644
index 00000000..8207b4ca
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/orm/attributes.py
@@ -0,0 +1,2835 @@
+# orm/attributes.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: allow-untyped-defs, allow-untyped-calls
+
+"""Defines instrumentation for class attributes and their interaction
+with instances.
+
+This module is usually not directly visible to user applications, but
+defines a large part of the ORM's interactivity.
+
+
+"""
+
+from __future__ import annotations
+
+import dataclasses
+import operator
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import ClassVar
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import NamedTuple
+from typing import Optional
+from typing import overload
+from typing import Sequence
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from . import collections
+from . import exc as orm_exc
+from . import interfaces
+from ._typing import insp_is_aliased_class
+from .base import _DeclarativeMapped
+from .base import ATTR_EMPTY
+from .base import ATTR_WAS_SET
+from .base import CALLABLES_OK
+from .base import DEFERRED_HISTORY_LOAD
+from .base import INCLUDE_PENDING_MUTATIONS  # noqa
+from .base import INIT_OK
+from .base import instance_dict as instance_dict
+from .base import instance_state as instance_state
+from .base import instance_str
+from .base import LOAD_AGAINST_COMMITTED
+from .base import LoaderCallableStatus
+from .base import manager_of_class as manager_of_class
+from .base import Mapped as Mapped  # noqa
+from .base import NEVER_SET  # noqa
+from .base import NO_AUTOFLUSH
+from .base import NO_CHANGE  # noqa
+from .base import NO_KEY
+from .base import NO_RAISE
+from .base import NO_VALUE
+from .base import NON_PERSISTENT_OK  # noqa
+from .base import opt_manager_of_class as opt_manager_of_class
+from .base import PASSIVE_CLASS_MISMATCH  # noqa
+from .base import PASSIVE_NO_FETCH
+from .base import PASSIVE_NO_FETCH_RELATED  # noqa
+from .base import PASSIVE_NO_INITIALIZE
+from .base import PASSIVE_NO_RESULT
+from .base import PASSIVE_OFF
+from .base import PASSIVE_ONLY_PERSISTENT
+from .base import PASSIVE_RETURN_NO_VALUE
+from .base import PassiveFlag
+from .base import RELATED_OBJECT_OK  # noqa
+from .base import SQL_OK  # noqa
+from .base import SQLORMExpression
+from .base import state_str
+from .. import event
+from .. import exc
+from .. import inspection
+from .. import util
+from ..event import dispatcher
+from ..event import EventTarget
+from ..sql import base as sql_base
+from ..sql import cache_key
+from ..sql import coercions
+from ..sql import roles
+from ..sql import visitors
+from ..sql.cache_key import HasCacheKey
+from ..sql.visitors import _TraverseInternalsType
+from ..sql.visitors import InternalTraversal
+from ..util.typing import Literal
+from ..util.typing import Self
+from ..util.typing import TypeGuard
+
+if TYPE_CHECKING:
+    from ._typing import _EntityType
+    from ._typing import _ExternalEntityType
+    from ._typing import _InstanceDict
+    from ._typing import _InternalEntityType
+    from ._typing import _LoaderCallable
+    from ._typing import _O
+    from .collections import _AdaptedCollectionProtocol
+    from .collections import CollectionAdapter
+    from .interfaces import MapperProperty
+    from .relationships import RelationshipProperty
+    from .state import InstanceState
+    from .util import AliasedInsp
+    from .writeonly import WriteOnlyAttributeImpl
+    from ..event.base import _Dispatch
+    from ..sql._typing import _ColumnExpressionArgument
+    from ..sql._typing import _DMLColumnArgument
+    from ..sql._typing import _InfoType
+    from ..sql._typing import _PropagateAttrsType
+    from ..sql.annotation import _AnnotationDict
+    from ..sql.elements import ColumnElement
+    from ..sql.elements import Label
+    from ..sql.operators import OperatorType
+    from ..sql.selectable import FromClause
+
+
+_T = TypeVar("_T")
+_T_co = TypeVar("_T_co", bound=Any, covariant=True)
+
+
+_AllPendingType = Sequence[
+    Tuple[Optional["InstanceState[Any]"], Optional[object]]
+]
+
+
+_UNKNOWN_ATTR_KEY = object()
+
+
+@inspection._self_inspects
+class QueryableAttribute(
+    _DeclarativeMapped[_T_co],
+    SQLORMExpression[_T_co],
+    interfaces.InspectionAttr,
+    interfaces.PropComparator[_T_co],
+    roles.JoinTargetRole,
+    roles.OnClauseRole,
+    sql_base.Immutable,
+    cache_key.SlotsMemoizedHasCacheKey,
+    util.MemoizedSlots,
+    EventTarget,
+):
+    """Base class for :term:`descriptor` objects that intercept
+    attribute events on behalf of a :class:`.MapperProperty`
+    object.  The actual :class:`.MapperProperty` is accessible
+    via the :attr:`.QueryableAttribute.property`
+    attribute.
+
+
+    .. seealso::
+
+        :class:`.InstrumentedAttribute`
+
+        :class:`.MapperProperty`
+
+        :attr:`_orm.Mapper.all_orm_descriptors`
+
+        :attr:`_orm.Mapper.attrs`
+    """
+
+    __slots__ = (
+        "class_",
+        "key",
+        "impl",
+        "comparator",
+        "property",
+        "parent",
+        "expression",
+        "_of_type",
+        "_extra_criteria",
+        "_slots_dispatch",
+        "_propagate_attrs",
+        "_doc",
+    )
+
+    is_attribute = True
+
+    dispatch: dispatcher[QueryableAttribute[_T_co]]
+
+    class_: _ExternalEntityType[Any]
+    key: str
+    parententity: _InternalEntityType[Any]
+    impl: AttributeImpl
+    comparator: interfaces.PropComparator[_T_co]
+    _of_type: Optional[_InternalEntityType[Any]]
+    _extra_criteria: Tuple[ColumnElement[bool], ...]
+    _doc: Optional[str]
+
+    # PropComparator has a __visit_name__ to participate within
+    # traversals.   Disambiguate the attribute vs. a comparator.
+    __visit_name__ = "orm_instrumented_attribute"
+
+    def __init__(
+        self,
+        class_: _ExternalEntityType[_O],
+        key: str,
+        parententity: _InternalEntityType[_O],
+        comparator: interfaces.PropComparator[_T_co],
+        impl: Optional[AttributeImpl] = None,
+        of_type: Optional[_InternalEntityType[Any]] = None,
+        extra_criteria: Tuple[ColumnElement[bool], ...] = (),
+    ):
+        self.class_ = class_
+        self.key = key
+
+        self._parententity = self.parent = parententity
+
+        # this attribute is non-None after mappers are set up, however in the
+        # interim class manager setup, there's a check for None to see if it
+        # needs to be populated, so we assign None here leaving the attribute
+        # in a temporarily not-type-correct state
+        self.impl = impl  # type: ignore
+
+        assert comparator is not None
+        self.comparator = comparator
+        self._of_type = of_type
+        self._extra_criteria = extra_criteria
+        self._doc = None
+
+        manager = opt_manager_of_class(class_)
+        # manager is None in the case of AliasedClass
+        if manager:
+            # propagate existing event listeners from
+            # immediate superclass
+            for base in manager._bases:
+                if key in base:
+                    self.dispatch._update(base[key].dispatch)
+                    if base[key].dispatch._active_history:
+                        self.dispatch._active_history = True  # type: ignore
+
+    _cache_key_traversal = [
+        ("key", visitors.ExtendedInternalTraversal.dp_string),
+        ("_parententity", visitors.ExtendedInternalTraversal.dp_multi),
+        ("_of_type", visitors.ExtendedInternalTraversal.dp_multi),
+        ("_extra_criteria", visitors.InternalTraversal.dp_clauseelement_list),
+    ]
+
+    def __reduce__(self) -> Any:
+        # this method is only used in terms of the
+        # sqlalchemy.ext.serializer extension
+        return (
+            _queryable_attribute_unreduce,
+            (
+                self.key,
+                self._parententity.mapper.class_,
+                self._parententity,
+                self._parententity.entity,
+            ),
+        )
+
+    @property
+    def _impl_uses_objects(self) -> bool:
+        return self.impl.uses_objects
+
+    def get_history(
+        self, instance: Any, passive: PassiveFlag = PASSIVE_OFF
+    ) -> History:
+        return self.impl.get_history(
+            instance_state(instance), instance_dict(instance), passive
+        )
+
+    @property
+    def info(self) -> _InfoType:
+        """Return the 'info' dictionary for the underlying SQL element.
+
+        The behavior here is as follows:
+
+        * If the attribute is a column-mapped property, i.e.
+          :class:`.ColumnProperty`, which is mapped directly
+          to a schema-level :class:`_schema.Column` object, this attribute
+          will return the :attr:`.SchemaItem.info` dictionary associated
+          with the core-level :class:`_schema.Column` object.
+
+        * If the attribute is a :class:`.ColumnProperty` but is mapped to
+          any other kind of SQL expression other than a
+          :class:`_schema.Column`,
+          the attribute will refer to the :attr:`.MapperProperty.info`
+          dictionary associated directly with the :class:`.ColumnProperty`,
+          assuming the SQL expression itself does not have its own ``.info``
+          attribute (which should be the case, unless a user-defined SQL
+          construct has defined one).
+
+        * If the attribute refers to any other kind of
+          :class:`.MapperProperty`, including :class:`.Relationship`,
+          the attribute will refer to the :attr:`.MapperProperty.info`
+          dictionary associated with that :class:`.MapperProperty`.
+
+        * To access the :attr:`.MapperProperty.info` dictionary of the
+          :class:`.MapperProperty` unconditionally, including for a
+          :class:`.ColumnProperty` that's associated directly with a
+          :class:`_schema.Column`, the attribute can be referred to using
+          :attr:`.QueryableAttribute.property` attribute, as
+          ``MyClass.someattribute.property.info``.
+
+        .. seealso::
+
+            :attr:`.SchemaItem.info`
+
+            :attr:`.MapperProperty.info`
+
+        """
+        return self.comparator.info
+
+    parent: _InternalEntityType[Any]
+    """Return an inspection instance representing the parent.
+
+    This will be either an instance of :class:`_orm.Mapper`
+    or :class:`.AliasedInsp`, depending upon the nature
+    of the parent entity which this attribute is associated
+    with.
+
+    """
+
+    expression: ColumnElement[_T_co]
+    """The SQL expression object represented by this
+    :class:`.QueryableAttribute`.
+
+    This will typically be an instance of a :class:`_sql.ColumnElement`
+    subclass representing a column expression.
+
+    """
+
+    def _memoized_attr_expression(self) -> ColumnElement[_T]:
+        annotations: _AnnotationDict
+
+        # applies only to Proxy() as used by hybrid.
+        # currently is an exception to typing rather than feeding through
+        # non-string keys.
+        # ideally Proxy() would have a separate set of methods to deal
+        # with this case.
+        entity_namespace = self._entity_namespace
+        assert isinstance(entity_namespace, HasCacheKey)
+
+        if self.key is _UNKNOWN_ATTR_KEY:
+            annotations = {"entity_namespace": entity_namespace}
+        else:
+            annotations = {
+                "proxy_key": self.key,
+                "proxy_owner": self._parententity,
+                "entity_namespace": entity_namespace,
+            }
+
+        ce = self.comparator.__clause_element__()
+        try:
+            if TYPE_CHECKING:
+                assert isinstance(ce, ColumnElement)
+            anno = ce._annotate
+        except AttributeError as ae:
+            raise exc.InvalidRequestError(
+                'When interpreting attribute "%s" as a SQL expression, '
+                "expected __clause_element__() to return "
+                "a ClauseElement object, got: %r" % (self, ce)
+            ) from ae
+        else:
+            return anno(annotations)
+
+    def _memoized_attr__propagate_attrs(self) -> _PropagateAttrsType:
+        # this suits the case in coercions where we don't actually
+        # call ``__clause_element__()`` but still need to get
+        # resolved._propagate_attrs.  See #6558.
+        return util.immutabledict(
+            {
+                "compile_state_plugin": "orm",
+                "plugin_subject": self._parentmapper,
+            }
+        )
+
+    @property
+    def _entity_namespace(self) -> _InternalEntityType[Any]:
+        return self._parententity
+
+    @property
+    def _annotations(self) -> _AnnotationDict:
+        return self.__clause_element__()._annotations
+
+    def __clause_element__(self) -> ColumnElement[_T_co]:
+        return self.expression
+
+    @property
+    def _from_objects(self) -> List[FromClause]:
+        return self.expression._from_objects
+
+    def _bulk_update_tuples(
+        self, value: Any
+    ) -> Sequence[Tuple[_DMLColumnArgument, Any]]:
+        """Return setter tuples for a bulk UPDATE."""
+
+        return self.comparator._bulk_update_tuples(value)
+
+    def adapt_to_entity(self, adapt_to_entity: AliasedInsp[Any]) -> Self:
+        assert not self._of_type
+        return self.__class__(
+            adapt_to_entity.entity,
+            self.key,
+            impl=self.impl,
+            comparator=self.comparator.adapt_to_entity(adapt_to_entity),
+            parententity=adapt_to_entity,
+        )
+
+    def of_type(self, entity: _EntityType[_T]) -> QueryableAttribute[_T]:
+        return QueryableAttribute(
+            self.class_,
+            self.key,
+            self._parententity,
+            impl=self.impl,
+            comparator=self.comparator.of_type(entity),
+            of_type=inspection.inspect(entity),
+            extra_criteria=self._extra_criteria,
+        )
+
+    def and_(
+        self, *clauses: _ColumnExpressionArgument[bool]
+    ) -> QueryableAttribute[bool]:
+        if TYPE_CHECKING:
+            assert isinstance(self.comparator, RelationshipProperty.Comparator)
+
+        exprs = tuple(
+            coercions.expect(roles.WhereHavingRole, clause)
+            for clause in util.coerce_generator_arg(clauses)
+        )
+
+        return QueryableAttribute(
+            self.class_,
+            self.key,
+            self._parententity,
+            impl=self.impl,
+            comparator=self.comparator.and_(*exprs),
+            of_type=self._of_type,
+            extra_criteria=self._extra_criteria + exprs,
+        )
+
+    def _clone(self, **kw: Any) -> QueryableAttribute[_T]:
+        return QueryableAttribute(
+            self.class_,
+            self.key,
+            self._parententity,
+            impl=self.impl,
+            comparator=self.comparator,
+            of_type=self._of_type,
+            extra_criteria=self._extra_criteria,
+        )
+
+    def label(self, name: Optional[str]) -> Label[_T_co]:
+        return self.__clause_element__().label(name)
+
+    def operate(
+        self, op: OperatorType, *other: Any, **kwargs: Any
+    ) -> ColumnElement[Any]:
+        return op(self.comparator, *other, **kwargs)  # type: ignore[no-any-return]  # noqa: E501
+
+    def reverse_operate(
+        self, op: OperatorType, other: Any, **kwargs: Any
+    ) -> ColumnElement[Any]:
+        return op(other, self.comparator, **kwargs)  # type: ignore[no-any-return]  # noqa: E501
+
+    def hasparent(
+        self, state: InstanceState[Any], optimistic: bool = False
+    ) -> bool:
+        return self.impl.hasparent(state, optimistic=optimistic) is not False
+
+    def __getattr__(self, key: str) -> Any:
+        try:
+            return util.MemoizedSlots.__getattr__(self, key)
+        except AttributeError:
+            pass
+
+        try:
+            return getattr(self.comparator, key)
+        except AttributeError as err:
+            raise AttributeError(
+                "Neither %r object nor %r object associated with %s "
+                "has an attribute %r"
+                % (
+                    type(self).__name__,
+                    type(self.comparator).__name__,
+                    self,
+                    key,
+                )
+            ) from err
+
+    def __str__(self) -> str:
+        return f"{self.class_.__name__}.{self.key}"
+
+    def _memoized_attr_property(self) -> Optional[MapperProperty[Any]]:
+        return self.comparator.property
+
+
+def _queryable_attribute_unreduce(
+    key: str,
+    mapped_class: Type[_O],
+    parententity: _InternalEntityType[_O],
+    entity: _ExternalEntityType[Any],
+) -> Any:
+    # this method is only used in terms of the
+    # sqlalchemy.ext.serializer extension
+    if insp_is_aliased_class(parententity):
+        return entity._get_from_serialized(key, mapped_class, parententity)
+    else:
+        return getattr(entity, key)
+
+
+class InstrumentedAttribute(QueryableAttribute[_T_co]):
+    """Class bound instrumented attribute which adds basic
+    :term:`descriptor` methods.
+
+    See :class:`.QueryableAttribute` for a description of most features.
+
+
+    """
+
+    __slots__ = ()
+
+    inherit_cache = True
+    """:meta private:"""
+
+    # hack to make __doc__ writeable on instances of
+    # InstrumentedAttribute, while still keeping classlevel
+    # __doc__ correct
+
+    @util.rw_hybridproperty
+    def __doc__(self) -> Optional[str]:
+        return self._doc
+
+    @__doc__.setter  # type: ignore
+    def __doc__(self, value: Optional[str]) -> None:
+        self._doc = value
+
+    @__doc__.classlevel  # type: ignore
+    def __doc__(cls) -> Optional[str]:
+        return super().__doc__
+
+    def __set__(self, instance: object, value: Any) -> None:
+        self.impl.set(
+            instance_state(instance), instance_dict(instance), value, None
+        )
+
+    def __delete__(self, instance: object) -> None:
+        self.impl.delete(instance_state(instance), instance_dict(instance))
+
+    @overload
+    def __get__(
+        self, instance: None, owner: Any
+    ) -> InstrumentedAttribute[_T_co]: ...
+
+    @overload
+    def __get__(self, instance: object, owner: Any) -> _T_co: ...
+
+    def __get__(
+        self, instance: Optional[object], owner: Any
+    ) -> Union[InstrumentedAttribute[_T_co], _T_co]:
+        if instance is None:
+            return self
+
+        dict_ = instance_dict(instance)
+        if self.impl.supports_population and self.key in dict_:
+            return dict_[self.key]  # type: ignore[no-any-return]
+        else:
+            try:
+                state = instance_state(instance)
+            except AttributeError as err:
+                raise orm_exc.UnmappedInstanceError(instance) from err
+            return self.impl.get(state, dict_)  # type: ignore[no-any-return]
+
+
+@dataclasses.dataclass(frozen=True)
+class AdHocHasEntityNamespace(HasCacheKey):
+    _traverse_internals: ClassVar[_TraverseInternalsType] = [
+        ("_entity_namespace", InternalTraversal.dp_has_cache_key),
+    ]
+
+    # py37 compat, no slots=True on dataclass
+    __slots__ = ("_entity_namespace",)
+    _entity_namespace: _InternalEntityType[Any]
+    is_mapper: ClassVar[bool] = False
+    is_aliased_class: ClassVar[bool] = False
+
+    @property
+    def entity_namespace(self):
+        return self._entity_namespace.entity_namespace
+
+
+def create_proxied_attribute(
+    descriptor: Any,
+) -> Callable[..., QueryableAttribute[Any]]:
+    """Create an QueryableAttribute / user descriptor hybrid.
+
+    Returns a new QueryableAttribute type that delegates descriptor
+    behavior and getattr() to the given descriptor.
+    """
+
+    # TODO: can move this to descriptor_props if the need for this
+    # function is removed from ext/hybrid.py
+
+    class Proxy(QueryableAttribute[Any]):
+        """Presents the :class:`.QueryableAttribute` interface as a
+        proxy on top of a Python descriptor / :class:`.PropComparator`
+        combination.
+
+        """
+
+        _extra_criteria = ()
+
+        # the attribute error catches inside of __getattr__ basically create a
+        # singularity if you try putting slots on this too
+        # __slots__ = ("descriptor", "original_property", "_comparator")
+
+        def __init__(
+            self,
+            class_,
+            key,
+            descriptor,
+            comparator,
+            adapt_to_entity=None,
+            doc=None,
+            original_property=None,
+        ):
+            self.class_ = class_
+            self.key = key
+            self.descriptor = descriptor
+            self.original_property = original_property
+            self._comparator = comparator
+            self._adapt_to_entity = adapt_to_entity
+            self._doc = self.__doc__ = doc
+
+        @property
+        def _parententity(self):
+            return inspection.inspect(self.class_, raiseerr=False)
+
+        @property
+        def parent(self):
+            return inspection.inspect(self.class_, raiseerr=False)
+
+        _is_internal_proxy = True
+
+        _cache_key_traversal = [
+            ("key", visitors.ExtendedInternalTraversal.dp_string),
+            ("_parententity", visitors.ExtendedInternalTraversal.dp_multi),
+        ]
+
+        @property
+        def _impl_uses_objects(self):
+            return (
+                self.original_property is not None
+                and getattr(self.class_, self.key).impl.uses_objects
+            )
+
+        @property
+        def _entity_namespace(self):
+            if hasattr(self._comparator, "_parententity"):
+                return self._comparator._parententity
+            else:
+                # used by hybrid attributes which try to remain
+                # agnostic of any ORM concepts like mappers
+                return AdHocHasEntityNamespace(self._parententity)
+
+        @property
+        def property(self):
+            return self.comparator.property
+
+        @util.memoized_property
+        def comparator(self):
+            if callable(self._comparator):
+                self._comparator = self._comparator()
+            if self._adapt_to_entity:
+                self._comparator = self._comparator.adapt_to_entity(
+                    self._adapt_to_entity
+                )
+            return self._comparator
+
+        def adapt_to_entity(self, adapt_to_entity):
+            return self.__class__(
+                adapt_to_entity.entity,
+                self.key,
+                self.descriptor,
+                self._comparator,
+                adapt_to_entity,
+            )
+
+        def _clone(self, **kw):
+            return self.__class__(
+                self.class_,
+                self.key,
+                self.descriptor,
+                self._comparator,
+                adapt_to_entity=self._adapt_to_entity,
+                original_property=self.original_property,
+            )
+
+        def __get__(self, instance, owner):
+            retval = self.descriptor.__get__(instance, owner)
+            # detect if this is a plain Python @property, which just returns
+            # itself for class level access.  If so, then return us.
+            # Otherwise, return the object returned by the descriptor.
+            if retval is self.descriptor and instance is None:
+                return self
+            else:
+                return retval
+
+        def __str__(self) -> str:
+            return f"{self.class_.__name__}.{self.key}"
+
+        def __getattr__(self, attribute):
+            """Delegate __getattr__ to the original descriptor and/or
+            comparator."""
+
+            # this is unfortunately very complicated, and is easily prone
+            # to recursion overflows when implementations of related
+            # __getattr__ schemes are changed
+
+            try:
+                return util.MemoizedSlots.__getattr__(self, attribute)
+            except AttributeError:
+                pass
+
+            try:
+                return getattr(descriptor, attribute)
+            except AttributeError as err:
+                if attribute == "comparator":
+                    raise AttributeError("comparator") from err
+                try:
+                    # comparator itself might be unreachable
+                    comparator = self.comparator
+                except AttributeError as err2:
+                    raise AttributeError(
+                        "Neither %r object nor unconfigured comparator "
+                        "object associated with %s has an attribute %r"
+                        % (type(descriptor).__name__, self, attribute)
+                    ) from err2
+                else:
+                    try:
+                        return getattr(comparator, attribute)
+                    except AttributeError as err3:
+                        raise AttributeError(
+                            "Neither %r object nor %r object "
+                            "associated with %s has an attribute %r"
+                            % (
+                                type(descriptor).__name__,
+                                type(comparator).__name__,
+                                self,
+                                attribute,
+                            )
+                        ) from err3
+
+    Proxy.__name__ = type(descriptor).__name__ + "Proxy"
+
+    util.monkeypatch_proxied_specials(
+        Proxy, type(descriptor), name="descriptor", from_instance=descriptor
+    )
+    return Proxy
+
+
+OP_REMOVE = util.symbol("REMOVE")
+OP_APPEND = util.symbol("APPEND")
+OP_REPLACE = util.symbol("REPLACE")
+OP_BULK_REPLACE = util.symbol("BULK_REPLACE")
+OP_MODIFIED = util.symbol("MODIFIED")
+
+
+class AttributeEventToken:
+    """A token propagated throughout the course of a chain of attribute
+    events.
+
+    Serves as an indicator of the source of the event and also provides
+    a means of controlling propagation across a chain of attribute
+    operations.
+
+    The :class:`.Event` object is sent as the ``initiator`` argument
+    when dealing with events such as :meth:`.AttributeEvents.append`,
+    :meth:`.AttributeEvents.set`,
+    and :meth:`.AttributeEvents.remove`.
+
+    The :class:`.Event` object is currently interpreted by the backref
+    event handlers, and is used to control the propagation of operations
+    across two mutually-dependent attributes.
+
+    .. versionchanged:: 2.0  Changed the name from ``AttributeEvent``
+       to ``AttributeEventToken``.
+
+    :attribute impl: The :class:`.AttributeImpl` which is the current event
+     initiator.
+
+    :attribute op: The symbol :attr:`.OP_APPEND`, :attr:`.OP_REMOVE`,
+     :attr:`.OP_REPLACE`, or :attr:`.OP_BULK_REPLACE`, indicating the
+     source operation.
+
+    """
+
+    __slots__ = "impl", "op", "parent_token"
+
+    def __init__(self, attribute_impl: AttributeImpl, op: util.symbol):
+        self.impl = attribute_impl
+        self.op = op
+        self.parent_token = self.impl.parent_token
+
+    def __eq__(self, other):
+        return (
+            isinstance(other, AttributeEventToken)
+            and other.impl is self.impl
+            and other.op == self.op
+        )
+
+    @property
+    def key(self):
+        return self.impl.key
+
+    def hasparent(self, state):
+        return self.impl.hasparent(state)
+
+
+AttributeEvent = AttributeEventToken  # legacy
+Event = AttributeEventToken  # legacy
+
+
+class AttributeImpl:
+    """internal implementation for instrumented attributes."""
+
+    collection: bool
+    default_accepts_scalar_loader: bool
+    uses_objects: bool
+    supports_population: bool
+    dynamic: bool
+
+    _is_has_collection_adapter = False
+
+    _replace_token: AttributeEventToken
+    _remove_token: AttributeEventToken
+    _append_token: AttributeEventToken
+
+    def __init__(
+        self,
+        class_: _ExternalEntityType[_O],
+        key: str,
+        callable_: Optional[_LoaderCallable],
+        dispatch: _Dispatch[QueryableAttribute[Any]],
+        trackparent: bool = False,
+        compare_function: Optional[Callable[..., bool]] = None,
+        active_history: bool = False,
+        parent_token: Optional[AttributeEventToken] = None,
+        load_on_unexpire: bool = True,
+        send_modified_events: bool = True,
+        accepts_scalar_loader: Optional[bool] = None,
+        **kwargs: Any,
+    ):
+        r"""Construct an AttributeImpl.
+
+        :param \class_: associated class
+
+        :param key: string name of the attribute
+
+        :param \callable_:
+          optional function which generates a callable based on a parent
+          instance, which produces the "default" values for a scalar or
+          collection attribute when it's first accessed, if not present
+          already.
+
+        :param trackparent:
+          if True, attempt to track if an instance has a parent attached
+          to it via this attribute.
+
+        :param compare_function:
+          a function that compares two values which are normally
+          assignable to this attribute.
+
+        :param active_history:
+          indicates that get_history() should always return the "old" value,
+          even if it means executing a lazy callable upon attribute change.
+
+        :param parent_token:
+          Usually references the MapperProperty, used as a key for
+          the hasparent() function to identify an "owning" attribute.
+          Allows multiple AttributeImpls to all match a single
+          owner attribute.
+
+        :param load_on_unexpire:
+          if False, don't include this attribute in a load-on-expired
+          operation, i.e. the "expired_attribute_loader" process.
+          The attribute can still be in the "expired" list and be
+          considered to be "expired".   Previously, this flag was called
+          "expire_missing" and is only used by a deferred column
+          attribute.
+
+        :param send_modified_events:
+          if False, the InstanceState._modified_event method will have no
+          effect; this means the attribute will never show up as changed in a
+          history entry.
+
+        """
+        self.class_ = class_
+        self.key = key
+        self.callable_ = callable_
+        self.dispatch = dispatch
+        self.trackparent = trackparent
+        self.parent_token = parent_token or self
+        self.send_modified_events = send_modified_events
+        if compare_function is None:
+            self.is_equal = operator.eq
+        else:
+            self.is_equal = compare_function
+
+        if accepts_scalar_loader is not None:
+            self.accepts_scalar_loader = accepts_scalar_loader
+        else:
+            self.accepts_scalar_loader = self.default_accepts_scalar_loader
+
+        _deferred_history = kwargs.pop("_deferred_history", False)
+        self._deferred_history = _deferred_history
+
+        if active_history:
+            self.dispatch._active_history = True
+
+        self.load_on_unexpire = load_on_unexpire
+        self._modified_token = AttributeEventToken(self, OP_MODIFIED)
+
+    __slots__ = (
+        "class_",
+        "key",
+        "callable_",
+        "dispatch",
+        "trackparent",
+        "parent_token",
+        "send_modified_events",
+        "is_equal",
+        "load_on_unexpire",
+        "_modified_token",
+        "accepts_scalar_loader",
+        "_deferred_history",
+    )
+
+    def __str__(self) -> str:
+        return f"{self.class_.__name__}.{self.key}"
+
+    def _get_active_history(self):
+        """Backwards compat for impl.active_history"""
+
+        return self.dispatch._active_history
+
+    def _set_active_history(self, value):
+        self.dispatch._active_history = value
+
+    active_history = property(_get_active_history, _set_active_history)
+
+    def hasparent(
+        self, state: InstanceState[Any], optimistic: bool = False
+    ) -> bool:
+        """Return the boolean value of a `hasparent` flag attached to
+        the given state.
+
+        The `optimistic` flag determines what the default return value
+        should be if no `hasparent` flag can be located.
+
+        As this function is used to determine if an instance is an
+        *orphan*, instances that were loaded from storage should be
+        assumed to not be orphans, until a True/False value for this
+        flag is set.
+
+        An instance attribute that is loaded by a callable function
+        will also not have a `hasparent` flag.
+
+        """
+        msg = "This AttributeImpl is not configured to track parents."
+        assert self.trackparent, msg
+
+        return (
+            state.parents.get(id(self.parent_token), optimistic) is not False
+        )
+
+    def sethasparent(
+        self,
+        state: InstanceState[Any],
+        parent_state: InstanceState[Any],
+        value: bool,
+    ) -> None:
+        """Set a boolean flag on the given item corresponding to
+        whether or not it is attached to a parent object via the
+        attribute represented by this ``InstrumentedAttribute``.
+
+        """
+        msg = "This AttributeImpl is not configured to track parents."
+        assert self.trackparent, msg
+
+        id_ = id(self.parent_token)
+        if value:
+            state.parents[id_] = parent_state
+        else:
+            if id_ in state.parents:
+                last_parent = state.parents[id_]
+
+                if (
+                    last_parent is not False
+                    and last_parent.key != parent_state.key
+                ):
+                    if last_parent.obj() is None:
+                        raise orm_exc.StaleDataError(
+                            "Removing state %s from parent "
+                            "state %s along attribute '%s', "
+                            "but the parent record "
+                            "has gone stale, can't be sure this "
+                            "is the most recent parent."
+                            % (
+                                state_str(state),
+                                state_str(parent_state),
+                                self.key,
+                            )
+                        )
+
+                    return
+
+            state.parents[id_] = False
+
+    def get_history(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> History:
+        raise NotImplementedError()
+
+    def get_all_pending(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
+    ) -> _AllPendingType:
+        """Return a list of tuples of (state, obj)
+        for all objects in this attribute's current state
+        + history.
+
+        Only applies to object-based attributes.
+
+        This is an inlining of existing functionality
+        which roughly corresponds to:
+
+            get_state_history(
+                        state,
+                        key,
+                        passive=PASSIVE_NO_INITIALIZE).sum()
+
+        """
+        raise NotImplementedError()
+
+    def _default_value(
+        self, state: InstanceState[Any], dict_: _InstanceDict
+    ) -> Any:
+        """Produce an empty value for an uninitialized scalar attribute."""
+
+        assert self.key not in dict_, (
+            "_default_value should only be invoked for an "
+            "uninitialized or expired attribute"
+        )
+
+        value = None
+        for fn in self.dispatch.init_scalar:
+            ret = fn(state, value, dict_)
+            if ret is not ATTR_EMPTY:
+                value = ret
+
+        return value
+
+    def get(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> Any:
+        """Retrieve a value from the given object.
+        If a callable is assembled on this object's attribute, and
+        passive is False, the callable will be executed and the
+        resulting value will be set as the new value for this attribute.
+        """
+        if self.key in dict_:
+            return dict_[self.key]
+        else:
+            # if history present, don't load
+            key = self.key
+            if (
+                key not in state.committed_state
+                or state.committed_state[key] is NO_VALUE
+            ):
+                if not passive & CALLABLES_OK:
+                    return PASSIVE_NO_RESULT
+
+                value = self._fire_loader_callables(state, key, passive)
+
+                if value is PASSIVE_NO_RESULT or value is NO_VALUE:
+                    return value
+                elif value is ATTR_WAS_SET:
+                    try:
+                        return dict_[key]
+                    except KeyError as err:
+                        # TODO: no test coverage here.
+                        raise KeyError(
+                            "Deferred loader for attribute "
+                            "%r failed to populate "
+                            "correctly" % key
+                        ) from err
+                elif value is not ATTR_EMPTY:
+                    return self.set_committed_value(state, dict_, value)
+
+            if not passive & INIT_OK:
+                return NO_VALUE
+            else:
+                return self._default_value(state, dict_)
+
+    def _fire_loader_callables(
+        self, state: InstanceState[Any], key: str, passive: PassiveFlag
+    ) -> Any:
+        if (
+            self.accepts_scalar_loader
+            and self.load_on_unexpire
+            and key in state.expired_attributes
+        ):
+            return state._load_expired(state, passive)
+        elif key in state.callables:
+            callable_ = state.callables[key]
+            return callable_(state, passive)
+        elif self.callable_:
+            return self.callable_(state, passive)
+        else:
+            return ATTR_EMPTY
+
+    def append(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> None:
+        self.set(state, dict_, value, initiator, passive=passive)
+
+    def remove(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> None:
+        self.set(
+            state, dict_, None, initiator, passive=passive, check_old=value
+        )
+
+    def pop(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> None:
+        self.set(
+            state,
+            dict_,
+            None,
+            initiator,
+            passive=passive,
+            check_old=value,
+            pop=True,
+        )
+
+    def set(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken] = None,
+        passive: PassiveFlag = PASSIVE_OFF,
+        check_old: Any = None,
+        pop: bool = False,
+    ) -> None:
+        raise NotImplementedError()
+
+    def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
+        raise NotImplementedError()
+
+    def get_committed_value(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> Any:
+        """return the unchanged value of this attribute"""
+
+        if self.key in state.committed_state:
+            value = state.committed_state[self.key]
+            if value is NO_VALUE:
+                return None
+            else:
+                return value
+        else:
+            return self.get(state, dict_, passive=passive)
+
+    def set_committed_value(self, state, dict_, value):
+        """set an attribute value on the given instance and 'commit' it."""
+
+        dict_[self.key] = value
+        state._commit(dict_, [self.key])
+        return value
+
+
+class ScalarAttributeImpl(AttributeImpl):
+    """represents a scalar value-holding InstrumentedAttribute."""
+
+    default_accepts_scalar_loader = True
+    uses_objects = False
+    supports_population = True
+    collection = False
+    dynamic = False
+
+    __slots__ = "_replace_token", "_append_token", "_remove_token"
+
+    def __init__(self, *arg, **kw):
+        super().__init__(*arg, **kw)
+        self._replace_token = self._append_token = AttributeEventToken(
+            self, OP_REPLACE
+        )
+        self._remove_token = AttributeEventToken(self, OP_REMOVE)
+
+    def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
+        if self.dispatch._active_history:
+            old = self.get(state, dict_, PASSIVE_RETURN_NO_VALUE)
+        else:
+            old = dict_.get(self.key, NO_VALUE)
+
+        if self.dispatch.remove:
+            self.fire_remove_event(state, dict_, old, self._remove_token)
+        state._modified_event(dict_, self, old)
+
+        existing = dict_.pop(self.key, NO_VALUE)
+        if (
+            existing is NO_VALUE
+            and old is NO_VALUE
+            and not state.expired
+            and self.key not in state.expired_attributes
+        ):
+            raise AttributeError("%s object does not have a value" % self)
+
+    def get_history(
+        self,
+        state: InstanceState[Any],
+        dict_: Dict[str, Any],
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> History:
+        if self.key in dict_:
+            return History.from_scalar_attribute(self, state, dict_[self.key])
+        elif self.key in state.committed_state:
+            return History.from_scalar_attribute(self, state, NO_VALUE)
+        else:
+            if passive & INIT_OK:
+                passive ^= INIT_OK
+            current = self.get(state, dict_, passive=passive)
+            if current is PASSIVE_NO_RESULT:
+                return HISTORY_BLANK
+            else:
+                return History.from_scalar_attribute(self, state, current)
+
+    def set(
+        self,
+        state: InstanceState[Any],
+        dict_: Dict[str, Any],
+        value: Any,
+        initiator: Optional[AttributeEventToken] = None,
+        passive: PassiveFlag = PASSIVE_OFF,
+        check_old: Optional[object] = None,
+        pop: bool = False,
+    ) -> None:
+        if self.dispatch._active_history:
+            old = self.get(state, dict_, PASSIVE_RETURN_NO_VALUE)
+        else:
+            old = dict_.get(self.key, NO_VALUE)
+
+        if self.dispatch.set:
+            value = self.fire_replace_event(
+                state, dict_, value, old, initiator
+            )
+        state._modified_event(dict_, self, old)
+        dict_[self.key] = value
+
+    def fire_replace_event(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: _T,
+        previous: Any,
+        initiator: Optional[AttributeEventToken],
+    ) -> _T:
+        for fn in self.dispatch.set:
+            value = fn(
+                state, value, previous, initiator or self._replace_token
+            )
+        return value
+
+    def fire_remove_event(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+    ) -> None:
+        for fn in self.dispatch.remove:
+            fn(state, value, initiator or self._remove_token)
+
+
+class ScalarObjectAttributeImpl(ScalarAttributeImpl):
+    """represents a scalar-holding InstrumentedAttribute,
+    where the target object is also instrumented.
+
+    Adds events to delete/set operations.
+
+    """
+
+    default_accepts_scalar_loader = False
+    uses_objects = True
+    supports_population = True
+    collection = False
+
+    __slots__ = ()
+
+    def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
+        if self.dispatch._active_history:
+            old = self.get(
+                state,
+                dict_,
+                passive=PASSIVE_ONLY_PERSISTENT
+                | NO_AUTOFLUSH
+                | LOAD_AGAINST_COMMITTED,
+            )
+        else:
+            old = self.get(
+                state,
+                dict_,
+                passive=PASSIVE_NO_FETCH ^ INIT_OK
+                | LOAD_AGAINST_COMMITTED
+                | NO_RAISE,
+            )
+
+        self.fire_remove_event(state, dict_, old, self._remove_token)
+
+        existing = dict_.pop(self.key, NO_VALUE)
+
+        # if the attribute is expired, we currently have no way to tell
+        # that an object-attribute was expired vs. not loaded.   So
+        # for this test, we look to see if the object has a DB identity.
+        if (
+            existing is NO_VALUE
+            and old is not PASSIVE_NO_RESULT
+            and state.key is None
+        ):
+            raise AttributeError("%s object does not have a value" % self)
+
+    def get_history(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> History:
+        if self.key in dict_:
+            current = dict_[self.key]
+        else:
+            if passive & INIT_OK:
+                passive ^= INIT_OK
+            current = self.get(state, dict_, passive=passive)
+            if current is PASSIVE_NO_RESULT:
+                return HISTORY_BLANK
+
+        if not self._deferred_history:
+            return History.from_object_attribute(self, state, current)
+        else:
+            original = state.committed_state.get(self.key, _NO_HISTORY)
+            if original is PASSIVE_NO_RESULT:
+                loader_passive = passive | (
+                    PASSIVE_ONLY_PERSISTENT
+                    | NO_AUTOFLUSH
+                    | LOAD_AGAINST_COMMITTED
+                    | NO_RAISE
+                    | DEFERRED_HISTORY_LOAD
+                )
+                original = self._fire_loader_callables(
+                    state, self.key, loader_passive
+                )
+            return History.from_object_attribute(
+                self, state, current, original=original
+            )
+
+    def get_all_pending(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
+    ) -> _AllPendingType:
+        if self.key in dict_:
+            current = dict_[self.key]
+        elif passive & CALLABLES_OK:
+            current = self.get(state, dict_, passive=passive)
+        else:
+            return []
+
+        ret: _AllPendingType
+
+        # can't use __hash__(), can't use __eq__() here
+        if (
+            current is not None
+            and current is not PASSIVE_NO_RESULT
+            and current is not NO_VALUE
+        ):
+            ret = [(instance_state(current), current)]
+        else:
+            ret = [(None, None)]
+
+        if self.key in state.committed_state:
+            original = state.committed_state[self.key]
+            if (
+                original is not None
+                and original is not PASSIVE_NO_RESULT
+                and original is not NO_VALUE
+                and original is not current
+            ):
+                ret.append((instance_state(original), original))
+        return ret
+
+    def set(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken] = None,
+        passive: PassiveFlag = PASSIVE_OFF,
+        check_old: Any = None,
+        pop: bool = False,
+    ) -> None:
+        """Set a value on the given InstanceState."""
+
+        if self.dispatch._active_history:
+            old = self.get(
+                state,
+                dict_,
+                passive=PASSIVE_ONLY_PERSISTENT
+                | NO_AUTOFLUSH
+                | LOAD_AGAINST_COMMITTED,
+            )
+        else:
+            old = self.get(
+                state,
+                dict_,
+                passive=PASSIVE_NO_FETCH ^ INIT_OK
+                | LOAD_AGAINST_COMMITTED
+                | NO_RAISE,
+            )
+
+        if (
+            check_old is not None
+            and old is not PASSIVE_NO_RESULT
+            and check_old is not old
+        ):
+            if pop:
+                return
+            else:
+                raise ValueError(
+                    "Object %s not associated with %s on attribute '%s'"
+                    % (instance_str(check_old), state_str(state), self.key)
+                )
+
+        value = self.fire_replace_event(state, dict_, value, old, initiator)
+        dict_[self.key] = value
+
+    def fire_remove_event(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+    ) -> None:
+        if self.trackparent and value not in (
+            None,
+            PASSIVE_NO_RESULT,
+            NO_VALUE,
+        ):
+            self.sethasparent(instance_state(value), state, False)
+
+        for fn in self.dispatch.remove:
+            fn(state, value, initiator or self._remove_token)
+
+        state._modified_event(dict_, self, value)
+
+    def fire_replace_event(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: _T,
+        previous: Any,
+        initiator: Optional[AttributeEventToken],
+    ) -> _T:
+        if self.trackparent:
+            if previous is not value and previous not in (
+                None,
+                PASSIVE_NO_RESULT,
+                NO_VALUE,
+            ):
+                self.sethasparent(instance_state(previous), state, False)
+
+        for fn in self.dispatch.set:
+            value = fn(
+                state, value, previous, initiator or self._replace_token
+            )
+
+        state._modified_event(dict_, self, previous)
+
+        if self.trackparent:
+            if value is not None:
+                self.sethasparent(instance_state(value), state, True)
+
+        return value
+
+
+class HasCollectionAdapter:
+    __slots__ = ()
+
+    collection: bool
+    _is_has_collection_adapter = True
+
+    def _dispose_previous_collection(
+        self,
+        state: InstanceState[Any],
+        collection: _AdaptedCollectionProtocol,
+        adapter: CollectionAdapter,
+        fire_event: bool,
+    ) -> None:
+        raise NotImplementedError()
+
+    @overload
+    def get_collection(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        user_data: Literal[None] = ...,
+        passive: Literal[PassiveFlag.PASSIVE_OFF] = ...,
+    ) -> CollectionAdapter: ...
+
+    @overload
+    def get_collection(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        user_data: _AdaptedCollectionProtocol = ...,
+        passive: PassiveFlag = ...,
+    ) -> CollectionAdapter: ...
+
+    @overload
+    def get_collection(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        user_data: Optional[_AdaptedCollectionProtocol] = ...,
+        passive: PassiveFlag = ...,
+    ) -> Union[
+        Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
+    ]: ...
+
+    def get_collection(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        user_data: Optional[_AdaptedCollectionProtocol] = None,
+        passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
+    ) -> Union[
+        Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
+    ]:
+        raise NotImplementedError()
+
+    def set(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken] = None,
+        passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
+        check_old: Any = None,
+        pop: bool = False,
+        _adapt: bool = True,
+    ) -> None:
+        raise NotImplementedError()
+
+
+if TYPE_CHECKING:
+
+    def _is_collection_attribute_impl(
+        impl: AttributeImpl,
+    ) -> TypeGuard[CollectionAttributeImpl]: ...
+
+else:
+    _is_collection_attribute_impl = operator.attrgetter("collection")
+
+
+class CollectionAttributeImpl(HasCollectionAdapter, AttributeImpl):
+    """A collection-holding attribute that instruments changes in membership.
+
+    Only handles collections of instrumented objects.
+
+    InstrumentedCollectionAttribute holds an arbitrary, user-specified
+    container object (defaulting to a list) and brokers access to the
+    CollectionAdapter, a "view" onto that object that presents consistent bag
+    semantics to the orm layer independent of the user data implementation.
+
+    """
+
+    uses_objects = True
+    collection = True
+    default_accepts_scalar_loader = False
+    supports_population = True
+    dynamic = False
+
+    _bulk_replace_token: AttributeEventToken
+
+    __slots__ = (
+        "copy",
+        "collection_factory",
+        "_append_token",
+        "_remove_token",
+        "_bulk_replace_token",
+        "_duck_typed_as",
+    )
+
+    def __init__(
+        self,
+        class_,
+        key,
+        callable_,
+        dispatch,
+        typecallable=None,
+        trackparent=False,
+        copy_function=None,
+        compare_function=None,
+        **kwargs,
+    ):
+        super().__init__(
+            class_,
+            key,
+            callable_,
+            dispatch,
+            trackparent=trackparent,
+            compare_function=compare_function,
+            **kwargs,
+        )
+
+        if copy_function is None:
+            copy_function = self.__copy
+        self.copy = copy_function
+        self.collection_factory = typecallable
+        self._append_token = AttributeEventToken(self, OP_APPEND)
+        self._remove_token = AttributeEventToken(self, OP_REMOVE)
+        self._bulk_replace_token = AttributeEventToken(self, OP_BULK_REPLACE)
+        self._duck_typed_as = util.duck_type_collection(
+            self.collection_factory()
+        )
+
+        if getattr(self.collection_factory, "_sa_linker", None):
+
+            @event.listens_for(self, "init_collection")
+            def link(target, collection, collection_adapter):
+                collection._sa_linker(collection_adapter)
+
+            @event.listens_for(self, "dispose_collection")
+            def unlink(target, collection, collection_adapter):
+                collection._sa_linker(None)
+
+    def __copy(self, item):
+        return [y for y in collections.collection_adapter(item)]
+
+    def get_history(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> History:
+        current = self.get(state, dict_, passive=passive)
+
+        if current is PASSIVE_NO_RESULT:
+            if (
+                passive & PassiveFlag.INCLUDE_PENDING_MUTATIONS
+                and self.key in state._pending_mutations
+            ):
+                pending = state._pending_mutations[self.key]
+                return pending.merge_with_history(HISTORY_BLANK)
+            else:
+                return HISTORY_BLANK
+        else:
+            if passive & PassiveFlag.INCLUDE_PENDING_MUTATIONS:
+                # this collection is loaded / present.  should not be any
+                # pending mutations
+                assert self.key not in state._pending_mutations
+
+            return History.from_collection(self, state, current)
+
+    def get_all_pending(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        passive: PassiveFlag = PASSIVE_NO_INITIALIZE,
+    ) -> _AllPendingType:
+        # NOTE: passive is ignored here at the moment
+
+        if self.key not in dict_:
+            return []
+
+        current = dict_[self.key]
+        current = getattr(current, "_sa_adapter")
+
+        if self.key in state.committed_state:
+            original = state.committed_state[self.key]
+            if original is not NO_VALUE:
+                current_states = [
+                    ((c is not None) and instance_state(c) or None, c)
+                    for c in current
+                ]
+                original_states = [
+                    ((c is not None) and instance_state(c) or None, c)
+                    for c in original
+                ]
+
+                current_set = dict(current_states)
+                original_set = dict(original_states)
+
+                return (
+                    [
+                        (s, o)
+                        for s, o in current_states
+                        if s not in original_set
+                    ]
+                    + [(s, o) for s, o in current_states if s in original_set]
+                    + [
+                        (s, o)
+                        for s, o in original_states
+                        if s not in current_set
+                    ]
+                )
+
+        return [(instance_state(o), o) for o in current]
+
+    def fire_append_event(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: _T,
+        initiator: Optional[AttributeEventToken],
+        key: Optional[Any],
+    ) -> _T:
+        for fn in self.dispatch.append:
+            value = fn(state, value, initiator or self._append_token, key=key)
+
+        state._modified_event(dict_, self, NO_VALUE, True)
+
+        if self.trackparent and value is not None:
+            self.sethasparent(instance_state(value), state, True)
+
+        return value
+
+    def fire_append_wo_mutation_event(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: _T,
+        initiator: Optional[AttributeEventToken],
+        key: Optional[Any],
+    ) -> _T:
+        for fn in self.dispatch.append_wo_mutation:
+            value = fn(state, value, initiator or self._append_token, key=key)
+
+        return value
+
+    def fire_pre_remove_event(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        initiator: Optional[AttributeEventToken],
+        key: Optional[Any],
+    ) -> None:
+        """A special event used for pop() operations.
+
+        The "remove" event needs to have the item to be removed passed to
+        it, which in the case of pop from a set, we don't have a way to access
+        the item before the operation.   the event is used for all pop()
+        operations (even though set.pop is the one where it is really needed).
+
+        """
+        state._modified_event(dict_, self, NO_VALUE, True)
+
+    def fire_remove_event(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+        key: Optional[Any],
+    ) -> None:
+        if self.trackparent and value is not None:
+            self.sethasparent(instance_state(value), state, False)
+
+        for fn in self.dispatch.remove:
+            fn(state, value, initiator or self._remove_token, key=key)
+
+        state._modified_event(dict_, self, NO_VALUE, True)
+
+    def delete(self, state: InstanceState[Any], dict_: _InstanceDict) -> None:
+        if self.key not in dict_:
+            return
+
+        state._modified_event(dict_, self, NO_VALUE, True)
+
+        collection = self.get_collection(state, state.dict)
+        collection.clear_with_event()
+
+        # key is always present because we checked above.  e.g.
+        # del is a no-op if collection not present.
+        del dict_[self.key]
+
+    def _default_value(
+        self, state: InstanceState[Any], dict_: _InstanceDict
+    ) -> _AdaptedCollectionProtocol:
+        """Produce an empty collection for an un-initialized attribute"""
+
+        assert self.key not in dict_, (
+            "_default_value should only be invoked for an "
+            "uninitialized or expired attribute"
+        )
+
+        if self.key in state._empty_collections:
+            return state._empty_collections[self.key]
+
+        adapter, user_data = self._initialize_collection(state)
+        adapter._set_empty(user_data)
+        return user_data
+
+    def _initialize_collection(
+        self, state: InstanceState[Any]
+    ) -> Tuple[CollectionAdapter, _AdaptedCollectionProtocol]:
+        adapter, collection = state.manager.initialize_collection(
+            self.key, state, self.collection_factory
+        )
+
+        self.dispatch.init_collection(state, collection, adapter)
+
+        return adapter, collection
+
+    def append(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> None:
+        collection = self.get_collection(
+            state, dict_, user_data=None, passive=passive
+        )
+        if collection is PASSIVE_NO_RESULT:
+            value = self.fire_append_event(
+                state, dict_, value, initiator, key=NO_KEY
+            )
+            assert (
+                self.key not in dict_
+            ), "Collection was loaded during event handling."
+            state._get_pending_mutation(self.key).append(value)
+        else:
+            if TYPE_CHECKING:
+                assert isinstance(collection, CollectionAdapter)
+            collection.append_with_event(value, initiator)
+
+    def remove(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> None:
+        collection = self.get_collection(
+            state, state.dict, user_data=None, passive=passive
+        )
+        if collection is PASSIVE_NO_RESULT:
+            self.fire_remove_event(state, dict_, value, initiator, key=NO_KEY)
+            assert (
+                self.key not in dict_
+            ), "Collection was loaded during event handling."
+            state._get_pending_mutation(self.key).remove(value)
+        else:
+            if TYPE_CHECKING:
+                assert isinstance(collection, CollectionAdapter)
+            collection.remove_with_event(value, initiator)
+
+    def pop(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken],
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> None:
+        try:
+            # TODO: better solution here would be to add
+            # a "popper" role to collections.py to complement
+            # "remover".
+            self.remove(state, dict_, value, initiator, passive=passive)
+        except (ValueError, KeyError, IndexError):
+            pass
+
+    def set(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        value: Any,
+        initiator: Optional[AttributeEventToken] = None,
+        passive: PassiveFlag = PassiveFlag.PASSIVE_OFF,
+        check_old: Any = None,
+        pop: bool = False,
+        _adapt: bool = True,
+    ) -> None:
+        iterable = orig_iterable = value
+        new_keys = None
+
+        # pulling a new collection first so that an adaptation exception does
+        # not trigger a lazy load of the old collection.
+        new_collection, user_data = self._initialize_collection(state)
+        if _adapt:
+            if new_collection._converter is not None:
+                iterable = new_collection._converter(iterable)
+            else:
+                setting_type = util.duck_type_collection(iterable)
+                receiving_type = self._duck_typed_as
+
+                if setting_type is not receiving_type:
+                    given = (
+                        iterable is None
+                        and "None"
+                        or iterable.__class__.__name__
+                    )
+                    wanted = self._duck_typed_as.__name__
+                    raise TypeError(
+                        "Incompatible collection type: %s is not %s-like"
+                        % (given, wanted)
+                    )
+
+                # If the object is an adapted collection, return the (iterable)
+                # adapter.
+                if hasattr(iterable, "_sa_iterator"):
+                    iterable = iterable._sa_iterator()
+                elif setting_type is dict:
+                    new_keys = list(iterable)
+                    iterable = iterable.values()
+                else:
+                    iterable = iter(iterable)
+        elif util.duck_type_collection(iterable) is dict:
+            new_keys = list(value)
+
+        new_values = list(iterable)
+
+        evt = self._bulk_replace_token
+
+        self.dispatch.bulk_replace(state, new_values, evt, keys=new_keys)
+
+        # propagate NO_RAISE in passive through to the get() for the
+        # existing object (ticket #8862)
+        old = self.get(
+            state,
+            dict_,
+            passive=PASSIVE_ONLY_PERSISTENT ^ (passive & PassiveFlag.NO_RAISE),
+        )
+        if old is PASSIVE_NO_RESULT:
+            old = self._default_value(state, dict_)
+        elif old is orig_iterable:
+            # ignore re-assignment of the current collection, as happens
+            # implicitly with in-place operators (foo.collection |= other)
+            return
+
+        # place a copy of "old" in state.committed_state
+        state._modified_event(dict_, self, old, True)
+
+        old_collection = old._sa_adapter
+
+        dict_[self.key] = user_data
+
+        collections.bulk_replace(
+            new_values, old_collection, new_collection, initiator=evt
+        )
+
+        self._dispose_previous_collection(state, old, old_collection, True)
+
+    def _dispose_previous_collection(
+        self,
+        state: InstanceState[Any],
+        collection: _AdaptedCollectionProtocol,
+        adapter: CollectionAdapter,
+        fire_event: bool,
+    ) -> None:
+        del collection._sa_adapter
+
+        # discarding old collection make sure it is not referenced in empty
+        # collections.
+        state._empty_collections.pop(self.key, None)
+        if fire_event:
+            self.dispatch.dispose_collection(state, collection, adapter)
+
+    def _invalidate_collection(
+        self, collection: _AdaptedCollectionProtocol
+    ) -> None:
+        adapter = getattr(collection, "_sa_adapter")
+        adapter.invalidated = True
+
+    def set_committed_value(
+        self, state: InstanceState[Any], dict_: _InstanceDict, value: Any
+    ) -> _AdaptedCollectionProtocol:
+        """Set an attribute value on the given instance and 'commit' it."""
+
+        collection, user_data = self._initialize_collection(state)
+
+        if value:
+            collection.append_multiple_without_event(value)
+
+        state.dict[self.key] = user_data
+
+        state._commit(dict_, [self.key])
+
+        if self.key in state._pending_mutations:
+            # pending items exist.  issue a modified event,
+            # add/remove new items.
+            state._modified_event(dict_, self, user_data, True)
+
+            pending = state._pending_mutations.pop(self.key)
+            added = pending.added_items
+            removed = pending.deleted_items
+            for item in added:
+                collection.append_without_event(item)
+            for item in removed:
+                collection.remove_without_event(item)
+
+        return user_data
+
+    @overload
+    def get_collection(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        user_data: Literal[None] = ...,
+        passive: Literal[PassiveFlag.PASSIVE_OFF] = ...,
+    ) -> CollectionAdapter: ...
+
+    @overload
+    def get_collection(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        user_data: _AdaptedCollectionProtocol = ...,
+        passive: PassiveFlag = ...,
+    ) -> CollectionAdapter: ...
+
+    @overload
+    def get_collection(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        user_data: Optional[_AdaptedCollectionProtocol] = ...,
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> Union[
+        Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
+    ]: ...
+
+    def get_collection(
+        self,
+        state: InstanceState[Any],
+        dict_: _InstanceDict,
+        user_data: Optional[_AdaptedCollectionProtocol] = None,
+        passive: PassiveFlag = PASSIVE_OFF,
+    ) -> Union[
+        Literal[LoaderCallableStatus.PASSIVE_NO_RESULT], CollectionAdapter
+    ]:
+        """Retrieve the CollectionAdapter associated with the given state.
+
+        if user_data is None, retrieves it from the state using normal
+        "get()" rules, which will fire lazy callables or return the "empty"
+        collection value.
+
+        """
+        if user_data is None:
+            fetch_user_data = self.get(state, dict_, passive=passive)
+            if fetch_user_data is LoaderCallableStatus.PASSIVE_NO_RESULT:
+                return fetch_user_data
+            else:
+                user_data = cast("_AdaptedCollectionProtocol", fetch_user_data)
+
+        return user_data._sa_adapter
+
+
+def backref_listeners(
+    attribute: QueryableAttribute[Any], key: str, uselist: bool
+) -> None:
+    """Apply listeners to synchronize a two-way relationship."""
+
+    # use easily recognizable names for stack traces.
+
+    # in the sections marked "tokens to test for a recursive loop",
+    # this is somewhat brittle and very performance-sensitive logic
+    # that is specific to how we might arrive at each event.  a marker
+    # that can target us directly to arguments being invoked against
+    # the impl might be simpler, but could interfere with other systems.
+
+    parent_token = attribute.impl.parent_token
+    parent_impl = attribute.impl
+
+    def _acceptable_key_err(child_state, initiator, child_impl):
+        raise ValueError(
+            "Bidirectional attribute conflict detected: "
+            'Passing object %s to attribute "%s" '
+            'triggers a modify event on attribute "%s" '
+            'via the backref "%s".'
+            % (
+                state_str(child_state),
+                initiator.parent_token,
+                child_impl.parent_token,
+                attribute.impl.parent_token,
+            )
+        )
+
+    def emit_backref_from_scalar_set_event(
+        state, child, oldchild, initiator, **kw
+    ):
+        if oldchild is child:
+            return child
+        if (
+            oldchild is not None
+            and oldchild is not PASSIVE_NO_RESULT
+            and oldchild is not NO_VALUE
+        ):
+            # With lazy=None, there's no guarantee that the full collection is
+            # present when updating via a backref.
+            old_state, old_dict = (
+                instance_state(oldchild),
+                instance_dict(oldchild),
+            )
+            impl = old_state.manager[key].impl
+
+            # tokens to test for a recursive loop.
+            if not impl.collection and not impl.dynamic:
+                check_recursive_token = impl._replace_token
+            else:
+                check_recursive_token = impl._remove_token
+
+            if initiator is not check_recursive_token:
+                impl.pop(
+                    old_state,
+                    old_dict,
+                    state.obj(),
+                    parent_impl._append_token,
+                    passive=PASSIVE_NO_FETCH,
+                )
+
+        if child is not None:
+            child_state, child_dict = (
+                instance_state(child),
+                instance_dict(child),
+            )
+            child_impl = child_state.manager[key].impl
+
+            if (
+                initiator.parent_token is not parent_token
+                and initiator.parent_token is not child_impl.parent_token
+            ):
+                _acceptable_key_err(state, initiator, child_impl)
+
+            # tokens to test for a recursive loop.
+            check_append_token = child_impl._append_token
+            check_bulk_replace_token = (
+                child_impl._bulk_replace_token
+                if _is_collection_attribute_impl(child_impl)
+                else None
+            )
+
+            if (
+                initiator is not check_append_token
+                and initiator is not check_bulk_replace_token
+            ):
+                child_impl.append(
+                    child_state,
+                    child_dict,
+                    state.obj(),
+                    initiator,
+                    passive=PASSIVE_NO_FETCH,
+                )
+        return child
+
+    def emit_backref_from_collection_append_event(
+        state, child, initiator, **kw
+    ):
+        if child is None:
+            return
+
+        child_state, child_dict = instance_state(child), instance_dict(child)
+        child_impl = child_state.manager[key].impl
+
+        if (
+            initiator.parent_token is not parent_token
+            and initiator.parent_token is not child_impl.parent_token
+        ):
+            _acceptable_key_err(state, initiator, child_impl)
+
+        # tokens to test for a recursive loop.
+        check_append_token = child_impl._append_token
+        check_bulk_replace_token = (
+            child_impl._bulk_replace_token
+            if _is_collection_attribute_impl(child_impl)
+            else None
+        )
+
+        if (
+            initiator is not check_append_token
+            and initiator is not check_bulk_replace_token
+        ):
+            child_impl.append(
+                child_state,
+                child_dict,
+                state.obj(),
+                initiator,
+                passive=PASSIVE_NO_FETCH,
+            )
+        return child
+
+    def emit_backref_from_collection_remove_event(
+        state, child, initiator, **kw
+    ):
+        if (
+            child is not None
+            and child is not PASSIVE_NO_RESULT
+            and child is not NO_VALUE
+        ):
+            child_state, child_dict = (
+                instance_state(child),
+                instance_dict(child),
+            )
+            child_impl = child_state.manager[key].impl
+
+            check_replace_token: Optional[AttributeEventToken]
+
+            # tokens to test for a recursive loop.
+            if not child_impl.collection and not child_impl.dynamic:
+                check_remove_token = child_impl._remove_token
+                check_replace_token = child_impl._replace_token
+                check_for_dupes_on_remove = uselist and not parent_impl.dynamic
+            else:
+                check_remove_token = child_impl._remove_token
+                check_replace_token = (
+                    child_impl._bulk_replace_token
+                    if _is_collection_attribute_impl(child_impl)
+                    else None
+                )
+                check_for_dupes_on_remove = False
+
+            if (
+                initiator is not check_remove_token
+                and initiator is not check_replace_token
+            ):
+                if not check_for_dupes_on_remove or not util.has_dupes(
+                    # when this event is called, the item is usually
+                    # present in the list, except for a pop() operation.
+                    state.dict[parent_impl.key],
+                    child,
+                ):
+                    child_impl.pop(
+                        child_state,
+                        child_dict,
+                        state.obj(),
+                        initiator,
+                        passive=PASSIVE_NO_FETCH,
+                    )
+
+    if uselist:
+        event.listen(
+            attribute,
+            "append",
+            emit_backref_from_collection_append_event,
+            retval=True,
+            raw=True,
+            include_key=True,
+        )
+    else:
+        event.listen(
+            attribute,
+            "set",
+            emit_backref_from_scalar_set_event,
+            retval=True,
+            raw=True,
+            include_key=True,
+        )
+    # TODO: need coverage in test/orm/ of remove event
+    event.listen(
+        attribute,
+        "remove",
+        emit_backref_from_collection_remove_event,
+        retval=True,
+        raw=True,
+        include_key=True,
+    )
+
+
+_NO_HISTORY = util.symbol("NO_HISTORY")
+_NO_STATE_SYMBOLS = frozenset([id(PASSIVE_NO_RESULT), id(NO_VALUE)])
+
+
+class History(NamedTuple):
+    """A 3-tuple of added, unchanged and deleted values,
+    representing the changes which have occurred on an instrumented
+    attribute.
+
+    The easiest way to get a :class:`.History` object for a particular
+    attribute on an object is to use the :func:`_sa.inspect` function::
+
+        from sqlalchemy import inspect
+
+        hist = inspect(myobject).attrs.myattribute.history
+
+    Each tuple member is an iterable sequence:
+
+    * ``added`` - the collection of items added to the attribute (the first
+      tuple element).
+
+    * ``unchanged`` - the collection of items that have not changed on the
+      attribute (the second tuple element).
+
+    * ``deleted`` - the collection of items that have been removed from the
+      attribute (the third tuple element).
+
+    """
+
+    added: Union[Tuple[()], List[Any]]
+    unchanged: Union[Tuple[()], List[Any]]
+    deleted: Union[Tuple[()], List[Any]]
+
+    def __bool__(self) -> bool:
+        return self != HISTORY_BLANK
+
+    def empty(self) -> bool:
+        """Return True if this :class:`.History` has no changes
+        and no existing, unchanged state.
+
+        """
+
+        return not bool((self.added or self.deleted) or self.unchanged)
+
+    def sum(self) -> Sequence[Any]:
+        """Return a collection of added + unchanged + deleted."""
+
+        return (
+            (self.added or []) + (self.unchanged or []) + (self.deleted or [])
+        )
+
+    def non_deleted(self) -> Sequence[Any]:
+        """Return a collection of added + unchanged."""
+
+        return (self.added or []) + (self.unchanged or [])
+
+    def non_added(self) -> Sequence[Any]:
+        """Return a collection of unchanged + deleted."""
+
+        return (self.unchanged or []) + (self.deleted or [])
+
+    def has_changes(self) -> bool:
+        """Return True if this :class:`.History` has changes."""
+
+        return bool(self.added or self.deleted)
+
+    def _merge(self, added: Iterable[Any], deleted: Iterable[Any]) -> History:
+        return History(
+            list(self.added) + list(added),
+            self.unchanged,
+            list(self.deleted) + list(deleted),
+        )
+
+    def as_state(self) -> History:
+        return History(
+            [
+                (c is not None) and instance_state(c) or None
+                for c in self.added
+            ],
+            [
+                (c is not None) and instance_state(c) or None
+                for c in self.unchanged
+            ],
+            [
+                (c is not None) and instance_state(c) or None
+                for c in self.deleted
+            ],
+        )
+
+    @classmethod
+    def from_scalar_attribute(
+        cls,
+        attribute: ScalarAttributeImpl,
+        state: InstanceState[Any],
+        current: Any,
+    ) -> History:
+        original = state.committed_state.get(attribute.key, _NO_HISTORY)
+
+        deleted: Union[Tuple[()], List[Any]]
+
+        if original is _NO_HISTORY:
+            if current is NO_VALUE:
+                return cls((), (), ())
+            else:
+                return cls((), [current], ())
+        # don't let ClauseElement expressions here trip things up
+        elif (
+            current is not NO_VALUE
+            and attribute.is_equal(current, original) is True
+        ):
+            return cls((), [current], ())
+        else:
+            # current convention on native scalars is to not
+            # include information
+            # about missing previous value in "deleted", but
+            # we do include None, which helps in some primary
+            # key situations
+            if id(original) in _NO_STATE_SYMBOLS:
+                deleted = ()
+                # indicate a "del" operation occurred when we don't have
+                # the previous value as: ([None], (), ())
+                if id(current) in _NO_STATE_SYMBOLS:
+                    current = None
+            else:
+                deleted = [original]
+            if current is NO_VALUE:
+                return cls((), (), deleted)
+            else:
+                return cls([current], (), deleted)
+
+    @classmethod
+    def from_object_attribute(
+        cls,
+        attribute: ScalarObjectAttributeImpl,
+        state: InstanceState[Any],
+        current: Any,
+        original: Any = _NO_HISTORY,
+    ) -> History:
+        deleted: Union[Tuple[()], List[Any]]
+
+        if original is _NO_HISTORY:
+            original = state.committed_state.get(attribute.key, _NO_HISTORY)
+
+        if original is _NO_HISTORY:
+            if current is NO_VALUE:
+                return cls((), (), ())
+            else:
+                return cls((), [current], ())
+        elif current is original and current is not NO_VALUE:
+            return cls((), [current], ())
+        else:
+            # current convention on related objects is to not
+            # include information
+            # about missing previous value in "deleted", and
+            # to also not include None - the dependency.py rules
+            # ignore the None in any case.
+            if id(original) in _NO_STATE_SYMBOLS or original is None:
+                deleted = ()
+                # indicate a "del" operation occurred when we don't have
+                # the previous value as: ([None], (), ())
+                if id(current) in _NO_STATE_SYMBOLS:
+                    current = None
+            else:
+                deleted = [original]
+            if current is NO_VALUE:
+                return cls((), (), deleted)
+            else:
+                return cls([current], (), deleted)
+
+    @classmethod
+    def from_collection(
+        cls,
+        attribute: CollectionAttributeImpl,
+        state: InstanceState[Any],
+        current: Any,
+    ) -> History:
+        original = state.committed_state.get(attribute.key, _NO_HISTORY)
+        if current is NO_VALUE:
+            return cls((), (), ())
+
+        current = getattr(current, "_sa_adapter")
+        if original is NO_VALUE:
+            return cls(list(current), (), ())
+        elif original is _NO_HISTORY:
+            return cls((), list(current), ())
+        else:
+            current_states = [
+                ((c is not None) and instance_state(c) or None, c)
+                for c in current
+            ]
+            original_states = [
+                ((c is not None) and instance_state(c) or None, c)
+                for c in original
+            ]
+
+            current_set = dict(current_states)
+            original_set = dict(original_states)
+
+            return cls(
+                [o for s, o in current_states if s not in original_set],
+                [o for s, o in current_states if s in original_set],
+                [o for s, o in original_states if s not in current_set],
+            )
+
+
+HISTORY_BLANK = History((), (), ())
+
+
+def get_history(
+    obj: object, key: str, passive: PassiveFlag = PASSIVE_OFF
+) -> History:
+    """Return a :class:`.History` record for the given object
+    and attribute key.
+
+    This is the **pre-flush** history for a given attribute, which is
+    reset each time the :class:`.Session` flushes changes to the
+    current database transaction.
+
+    .. note::
+
+        Prefer to use the :attr:`.AttributeState.history` and
+        :meth:`.AttributeState.load_history` accessors to retrieve the
+        :class:`.History` for instance attributes.
+
+
+    :param obj: an object whose class is instrumented by the
+      attributes package.
+
+    :param key: string attribute name.
+
+    :param passive: indicates loading behavior for the attribute
+       if the value is not already present.   This is a
+       bitflag attribute, which defaults to the symbol
+       :attr:`.PASSIVE_OFF` indicating all necessary SQL
+       should be emitted.
+
+    .. seealso::
+
+        :attr:`.AttributeState.history`
+
+        :meth:`.AttributeState.load_history` - retrieve history
+        using loader callables if the value is not locally present.
+
+    """
+
+    return get_state_history(instance_state(obj), key, passive)
+
+
+def get_state_history(
+    state: InstanceState[Any], key: str, passive: PassiveFlag = PASSIVE_OFF
+) -> History:
+    return state.get_history(key, passive)
+
+
+def has_parent(
+    cls: Type[_O], obj: _O, key: str, optimistic: bool = False
+) -> bool:
+    """TODO"""
+    manager = manager_of_class(cls)
+    state = instance_state(obj)
+    return manager.has_parent(state, key, optimistic)
+
+
+def register_attribute(
+    class_: Type[_O],
+    key: str,
+    *,
+    comparator: interfaces.PropComparator[_T],
+    parententity: _InternalEntityType[_O],
+    doc: Optional[str] = None,
+    **kw: Any,
+) -> InstrumentedAttribute[_T]:
+    desc = register_descriptor(
+        class_, key, comparator=comparator, parententity=parententity, doc=doc
+    )
+    register_attribute_impl(class_, key, **kw)
+    return desc
+
+
+def register_attribute_impl(
+    class_: Type[_O],
+    key: str,
+    uselist: bool = False,
+    callable_: Optional[_LoaderCallable] = None,
+    useobject: bool = False,
+    impl_class: Optional[Type[AttributeImpl]] = None,
+    backref: Optional[str] = None,
+    **kw: Any,
+) -> QueryableAttribute[Any]:
+    manager = manager_of_class(class_)
+    if uselist:
+        factory = kw.pop("typecallable", None)
+        typecallable = manager.instrument_collection_class(
+            key, factory or list
+        )
+    else:
+        typecallable = kw.pop("typecallable", None)
+
+    dispatch = cast(
+        "_Dispatch[QueryableAttribute[Any]]", manager[key].dispatch
+    )  # noqa: E501
+
+    impl: AttributeImpl
+
+    if impl_class:
+        # TODO: this appears to be the WriteOnlyAttributeImpl /
+        # DynamicAttributeImpl constructor which is hardcoded
+        impl = cast("Type[WriteOnlyAttributeImpl]", impl_class)(
+            class_, key, dispatch, **kw
+        )
+    elif uselist:
+        impl = CollectionAttributeImpl(
+            class_, key, callable_, dispatch, typecallable=typecallable, **kw
+        )
+    elif useobject:
+        impl = ScalarObjectAttributeImpl(
+            class_, key, callable_, dispatch, **kw
+        )
+    else:
+        impl = ScalarAttributeImpl(class_, key, callable_, dispatch, **kw)
+
+    manager[key].impl = impl
+
+    if backref:
+        backref_listeners(manager[key], backref, uselist)
+
+    manager.post_configure_attribute(key)
+    return manager[key]
+
+
+def register_descriptor(
+    class_: Type[Any],
+    key: str,
+    *,
+    comparator: interfaces.PropComparator[_T],
+    parententity: _InternalEntityType[Any],
+    doc: Optional[str] = None,
+) -> InstrumentedAttribute[_T]:
+    manager = manager_of_class(class_)
+
+    descriptor = InstrumentedAttribute(
+        class_, key, comparator=comparator, parententity=parententity
+    )
+
+    descriptor.__doc__ = doc  # type: ignore
+
+    manager.instrument_attribute(key, descriptor)
+    return descriptor
+
+
+def unregister_attribute(class_: Type[Any], key: str) -> None:
+    manager_of_class(class_).uninstrument_attribute(key)
+
+
+def init_collection(obj: object, key: str) -> CollectionAdapter:
+    """Initialize a collection attribute and return the collection adapter.
+
+    This function is used to provide direct access to collection internals
+    for a previously unloaded attribute.  e.g.::
+
+        collection_adapter = init_collection(someobject, "elements")
+        for elem in values:
+            collection_adapter.append_without_event(elem)
+
+    For an easier way to do the above, see
+    :func:`~sqlalchemy.orm.attributes.set_committed_value`.
+
+    :param obj: a mapped object
+
+    :param key: string attribute name where the collection is located.
+
+    """
+    state = instance_state(obj)
+    dict_ = state.dict
+    return init_state_collection(state, dict_, key)
+
+
+def init_state_collection(
+    state: InstanceState[Any], dict_: _InstanceDict, key: str
+) -> CollectionAdapter:
+    """Initialize a collection attribute and return the collection adapter.
+
+    Discards any existing collection which may be there.
+
+    """
+    attr = state.manager[key].impl
+
+    if TYPE_CHECKING:
+        assert isinstance(attr, HasCollectionAdapter)
+
+    old = dict_.pop(key, None)  # discard old collection
+    if old is not None:
+        old_collection = old._sa_adapter
+        attr._dispose_previous_collection(state, old, old_collection, False)
+
+    user_data = attr._default_value(state, dict_)
+    adapter: CollectionAdapter = attr.get_collection(
+        state, dict_, user_data, passive=PassiveFlag.PASSIVE_NO_FETCH
+    )
+    adapter._reset_empty()
+
+    return adapter
+
+
+def set_committed_value(instance, key, value):
+    """Set the value of an attribute with no history events.
+
+    Cancels any previous history present.  The value should be
+    a scalar value for scalar-holding attributes, or
+    an iterable for any collection-holding attribute.
+
+    This is the same underlying method used when a lazy loader
+    fires off and loads additional data from the database.
+    In particular, this method can be used by application code
+    which has loaded additional attributes or collections through
+    separate queries, which can then be attached to an instance
+    as though it were part of its original loaded state.
+
+    """
+    state, dict_ = instance_state(instance), instance_dict(instance)
+    state.manager[key].impl.set_committed_value(state, dict_, value)
+
+
+def set_attribute(
+    instance: object,
+    key: str,
+    value: Any,
+    initiator: Optional[AttributeEventToken] = None,
+) -> None:
+    """Set the value of an attribute, firing history events.
+
+    This function may be used regardless of instrumentation
+    applied directly to the class, i.e. no descriptors are required.
+    Custom attribute management schemes will need to make usage
+    of this method to establish attribute state as understood
+    by SQLAlchemy.
+
+    :param instance: the object that will be modified
+
+    :param key: string name of the attribute
+
+    :param value: value to assign
+
+    :param initiator: an instance of :class:`.Event` that would have
+     been propagated from a previous event listener.  This argument
+     is used when the :func:`.set_attribute` function is being used within
+     an existing event listening function where an :class:`.Event` object
+     is being supplied; the object may be used to track the origin of the
+     chain of events.
+
+     .. versionadded:: 1.2.3
+
+    """
+    state, dict_ = instance_state(instance), instance_dict(instance)
+    state.manager[key].impl.set(state, dict_, value, initiator)
+
+
+def get_attribute(instance: object, key: str) -> Any:
+    """Get the value of an attribute, firing any callables required.
+
+    This function may be used regardless of instrumentation
+    applied directly to the class, i.e. no descriptors are required.
+    Custom attribute management schemes will need to make usage
+    of this method to make usage of attribute state as understood
+    by SQLAlchemy.
+
+    """
+    state, dict_ = instance_state(instance), instance_dict(instance)
+    return state.manager[key].impl.get(state, dict_)
+
+
+def del_attribute(instance: object, key: str) -> None:
+    """Delete the value of an attribute, firing history events.
+
+    This function may be used regardless of instrumentation
+    applied directly to the class, i.e. no descriptors are required.
+    Custom attribute management schemes will need to make usage
+    of this method to establish attribute state as understood
+    by SQLAlchemy.
+
+    """
+    state, dict_ = instance_state(instance), instance_dict(instance)
+    state.manager[key].impl.delete(state, dict_)
+
+
+def flag_modified(instance: object, key: str) -> None:
+    """Mark an attribute on an instance as 'modified'.
+
+    This sets the 'modified' flag on the instance and
+    establishes an unconditional change event for the given attribute.
+    The attribute must have a value present, else an
+    :class:`.InvalidRequestError` is raised.
+
+    To mark an object "dirty" without referring to any specific attribute
+    so that it is considered within a flush, use the
+    :func:`.attributes.flag_dirty` call.
+
+    .. seealso::
+
+        :func:`.attributes.flag_dirty`
+
+    """
+    state, dict_ = instance_state(instance), instance_dict(instance)
+    impl = state.manager[key].impl
+    impl.dispatch.modified(state, impl._modified_token)
+    state._modified_event(dict_, impl, NO_VALUE, is_userland=True)
+
+
+def flag_dirty(instance: object) -> None:
+    """Mark an instance as 'dirty' without any specific attribute mentioned.
+
+    This is a special operation that will allow the object to travel through
+    the flush process for interception by events such as
+    :meth:`.SessionEvents.before_flush`.   Note that no SQL will be emitted in
+    the flush process for an object that has no changes, even if marked dirty
+    via this method.  However, a :meth:`.SessionEvents.before_flush` handler
+    will be able to see the object in the :attr:`.Session.dirty` collection and
+    may establish changes on it, which will then be included in the SQL
+    emitted.
+
+    .. versionadded:: 1.2
+
+    .. seealso::
+
+        :func:`.attributes.flag_modified`
+
+    """
+
+    state, dict_ = instance_state(instance), instance_dict(instance)
+    state._modified_event(dict_, None, NO_VALUE, is_userland=True)