about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/orm/loading.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sqlalchemy/orm/loading.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/orm/loading.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/orm/loading.py1682
1 files changed, 1682 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/orm/loading.py b/.venv/lib/python3.12/site-packages/sqlalchemy/orm/loading.py
new file mode 100644
index 00000000..679286f5
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/orm/loading.py
@@ -0,0 +1,1682 @@
+# orm/loading.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+
+"""private module containing functions used to convert database
+rows into object instances and associated state.
+
+the functions here are called primarily by Query, Mapper,
+as well as some of the attribute loading strategies.
+
+"""
+
+from __future__ import annotations
+
+from typing import Any
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Mapping
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from . import attributes
+from . import exc as orm_exc
+from . import path_registry
+from .base import _DEFER_FOR_STATE
+from .base import _RAISE_FOR_STATE
+from .base import _SET_DEFERRED_EXPIRED
+from .base import PassiveFlag
+from .context import FromStatement
+from .context import ORMCompileState
+from .context import QueryContext
+from .util import _none_set
+from .util import state_str
+from .. import exc as sa_exc
+from .. import util
+from ..engine import result_tuple
+from ..engine.result import ChunkedIteratorResult
+from ..engine.result import FrozenResult
+from ..engine.result import SimpleResultMetaData
+from ..sql import select
+from ..sql import util as sql_util
+from ..sql.selectable import ForUpdateArg
+from ..sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
+from ..sql.selectable import SelectState
+from ..util import EMPTY_DICT
+
+if TYPE_CHECKING:
+    from ._typing import _IdentityKeyType
+    from .base import LoaderCallableStatus
+    from .interfaces import ORMOption
+    from .mapper import Mapper
+    from .query import Query
+    from .session import Session
+    from .state import InstanceState
+    from ..engine.cursor import CursorResult
+    from ..engine.interfaces import _ExecuteOptions
+    from ..engine.result import Result
+    from ..sql import Select
+
+_T = TypeVar("_T", bound=Any)
+_O = TypeVar("_O", bound=object)
+_new_runid = util.counter()
+
+
+_PopulatorDict = Dict[str, List[Tuple[str, Any]]]
+
+
+def instances(cursor: CursorResult[Any], context: QueryContext) -> Result[Any]:
+    """Return a :class:`.Result` given an ORM query context.
+
+    :param cursor: a :class:`.CursorResult`, generated by a statement
+     which came from :class:`.ORMCompileState`
+
+    :param context: a :class:`.QueryContext` object
+
+    :return: a :class:`.Result` object representing ORM results
+
+    .. versionchanged:: 1.4 The instances() function now uses
+       :class:`.Result` objects and has an all new interface.
+
+    """
+
+    context.runid = _new_runid()
+
+    if context.top_level_context:
+        is_top_level = False
+        context.post_load_paths = context.top_level_context.post_load_paths
+    else:
+        is_top_level = True
+        context.post_load_paths = {}
+
+    compile_state = context.compile_state
+    filtered = compile_state._has_mapper_entities
+    single_entity = (
+        not context.load_options._only_return_tuples
+        and len(compile_state._entities) == 1
+        and compile_state._entities[0].supports_single_entity
+    )
+
+    try:
+        (process, labels, extra) = list(
+            zip(
+                *[
+                    query_entity.row_processor(context, cursor)
+                    for query_entity in context.compile_state._entities
+                ]
+            )
+        )
+
+        if context.yield_per and (
+            context.loaders_require_buffering
+            or context.loaders_require_uniquing
+        ):
+            raise sa_exc.InvalidRequestError(
+                "Can't use yield_per with eager loaders that require uniquing "
+                "or row buffering, e.g. joinedload() against collections "
+                "or subqueryload().  Consider the selectinload() strategy "
+                "for better flexibility in loading objects."
+            )
+
+    except Exception:
+        with util.safe_reraise():
+            cursor.close()
+
+    def _no_unique(entry):
+        raise sa_exc.InvalidRequestError(
+            "Can't use the ORM yield_per feature in conjunction with unique()"
+        )
+
+    def _not_hashable(datatype, *, legacy=False, uncertain=False):
+        if not legacy:
+
+            def go(obj):
+                if uncertain:
+                    try:
+                        return hash(obj)
+                    except:
+                        pass
+
+                raise sa_exc.InvalidRequestError(
+                    "Can't apply uniqueness to row tuple containing value of "
+                    f"""type {datatype!r}; {
+                        'the values returned appear to be'
+                        if uncertain
+                        else 'this datatype produces'
+                    } non-hashable values"""
+                )
+
+            return go
+        elif not uncertain:
+            return id
+        else:
+            _use_id = False
+
+            def go(obj):
+                nonlocal _use_id
+
+                if not _use_id:
+                    try:
+                        return hash(obj)
+                    except:
+                        pass
+
+                    # in #10459, we considered using a warning here, however
+                    # as legacy query uses result.unique() in all cases, this
+                    # would lead to too many warning cases.
+                    _use_id = True
+
+                return id(obj)
+
+            return go
+
+    unique_filters = [
+        (
+            _no_unique
+            if context.yield_per
+            else (
+                _not_hashable(
+                    ent.column.type,  # type: ignore
+                    legacy=context.load_options._legacy_uniquing,
+                    uncertain=ent._null_column_type,
+                )
+                if (
+                    not ent.use_id_for_hash
+                    and (ent._non_hashable_value or ent._null_column_type)
+                )
+                else id if ent.use_id_for_hash else None
+            )
+        )
+        for ent in context.compile_state._entities
+    ]
+
+    row_metadata = SimpleResultMetaData(
+        labels, extra, _unique_filters=unique_filters
+    )
+
+    def chunks(size):  # type: ignore
+        while True:
+            yield_per = size
+
+            context.partials = {}
+
+            if yield_per:
+                fetch = cursor.fetchmany(yield_per)
+
+                if not fetch:
+                    break
+            else:
+                fetch = cursor._raw_all_rows()
+
+            if single_entity:
+                proc = process[0]
+                rows = [proc(row) for row in fetch]
+            else:
+                rows = [
+                    tuple([proc(row) for proc in process]) for row in fetch
+                ]
+
+            # if we are the originating load from a query, meaning we
+            # aren't being called as a result of a nested "post load",
+            # iterate through all the collected post loaders and fire them
+            # off.  Previously this used to work recursively, however that
+            # prevented deeply nested structures from being loadable
+            if is_top_level:
+                if yield_per:
+                    # if using yield per, memoize the state of the
+                    # collection so that it can be restored
+                    top_level_post_loads = list(
+                        context.post_load_paths.items()
+                    )
+
+                while context.post_load_paths:
+                    post_loads = list(context.post_load_paths.items())
+                    context.post_load_paths.clear()
+                    for path, post_load in post_loads:
+                        post_load.invoke(context, path)
+
+                if yield_per:
+                    context.post_load_paths.clear()
+                    context.post_load_paths.update(top_level_post_loads)
+
+            yield rows
+
+            if not yield_per:
+                break
+
+    if context.execution_options.get("prebuffer_rows", False):
+        # this is a bit of a hack at the moment.
+        # I would rather have some option in the result to pre-buffer
+        # internally.
+        _prebuffered = list(chunks(None))
+
+        def chunks(size):
+            return iter(_prebuffered)
+
+    result = ChunkedIteratorResult(
+        row_metadata,
+        chunks,
+        source_supports_scalars=single_entity,
+        raw=cursor,
+        dynamic_yield_per=cursor.context._is_server_side,
+    )
+
+    # filtered and single_entity are used to indicate to legacy Query that the
+    # query has ORM entities, so legacy deduping and scalars should be called
+    # on the result.
+    result._attributes = result._attributes.union(
+        dict(filtered=filtered, is_single_entity=single_entity)
+    )
+
+    # multi_row_eager_loaders OTOH is specific to joinedload.
+    if context.compile_state.multi_row_eager_loaders:
+
+        def require_unique(obj):
+            raise sa_exc.InvalidRequestError(
+                "The unique() method must be invoked on this Result, "
+                "as it contains results that include joined eager loads "
+                "against collections"
+            )
+
+        result._unique_filter_state = (None, require_unique)
+
+    if context.yield_per:
+        result.yield_per(context.yield_per)
+
+    return result
+
+
+@util.preload_module("sqlalchemy.orm.context")
+def merge_frozen_result(session, statement, frozen_result, load=True):
+    """Merge a :class:`_engine.FrozenResult` back into a :class:`_orm.Session`,
+    returning a new :class:`_engine.Result` object with :term:`persistent`
+    objects.
+
+    See the section :ref:`do_orm_execute_re_executing` for an example.
+
+    .. seealso::
+
+        :ref:`do_orm_execute_re_executing`
+
+        :meth:`_engine.Result.freeze`
+
+        :class:`_engine.FrozenResult`
+
+    """
+    querycontext = util.preloaded.orm_context
+
+    if load:
+        # flush current contents if we expect to load data
+        session._autoflush()
+
+    ctx = querycontext.ORMSelectCompileState._create_entities_collection(
+        statement, legacy=False
+    )
+
+    autoflush = session.autoflush
+    try:
+        session.autoflush = False
+        mapped_entities = [
+            i
+            for i, e in enumerate(ctx._entities)
+            if isinstance(e, querycontext._MapperEntity)
+        ]
+        keys = [ent._label_name for ent in ctx._entities]
+
+        keyed_tuple = result_tuple(
+            keys, [ent._extra_entities for ent in ctx._entities]
+        )
+
+        result = []
+        for newrow in frozen_result.rewrite_rows():
+            for i in mapped_entities:
+                if newrow[i] is not None:
+                    newrow[i] = session._merge(
+                        attributes.instance_state(newrow[i]),
+                        attributes.instance_dict(newrow[i]),
+                        load=load,
+                        _recursive={},
+                        _resolve_conflict_map={},
+                    )
+
+            result.append(keyed_tuple(newrow))
+
+        return frozen_result.with_new_rows(result)
+    finally:
+        session.autoflush = autoflush
+
+
+@util.became_legacy_20(
+    ":func:`_orm.merge_result`",
+    alternative="The function as well as the method on :class:`_orm.Query` "
+    "is superseded by the :func:`_orm.merge_frozen_result` function.",
+)
+@util.preload_module("sqlalchemy.orm.context")
+def merge_result(
+    query: Query[Any],
+    iterator: Union[FrozenResult, Iterable[Sequence[Any]], Iterable[object]],
+    load: bool = True,
+) -> Union[FrozenResult, Iterable[Any]]:
+    """Merge a result into the given :class:`.Query` object's Session.
+
+    See :meth:`_orm.Query.merge_result` for top-level documentation on this
+    function.
+
+    """
+
+    querycontext = util.preloaded.orm_context
+
+    session = query.session
+    if load:
+        # flush current contents if we expect to load data
+        session._autoflush()
+
+    # TODO: need test coverage and documentation for the FrozenResult
+    # use case.
+    if isinstance(iterator, FrozenResult):
+        frozen_result = iterator
+        iterator = iter(frozen_result.data)
+    else:
+        frozen_result = None
+
+    ctx = querycontext.ORMSelectCompileState._create_entities_collection(
+        query, legacy=True
+    )
+
+    autoflush = session.autoflush
+    try:
+        session.autoflush = False
+        single_entity = not frozen_result and len(ctx._entities) == 1
+
+        if single_entity:
+            if isinstance(ctx._entities[0], querycontext._MapperEntity):
+                result = [
+                    session._merge(
+                        attributes.instance_state(instance),
+                        attributes.instance_dict(instance),
+                        load=load,
+                        _recursive={},
+                        _resolve_conflict_map={},
+                    )
+                    for instance in iterator
+                ]
+            else:
+                result = list(iterator)
+        else:
+            mapped_entities = [
+                i
+                for i, e in enumerate(ctx._entities)
+                if isinstance(e, querycontext._MapperEntity)
+            ]
+            result = []
+            keys = [ent._label_name for ent in ctx._entities]
+
+            keyed_tuple = result_tuple(
+                keys, [ent._extra_entities for ent in ctx._entities]
+            )
+
+            for row in iterator:
+                newrow = list(row)
+                for i in mapped_entities:
+                    if newrow[i] is not None:
+                        newrow[i] = session._merge(
+                            attributes.instance_state(newrow[i]),
+                            attributes.instance_dict(newrow[i]),
+                            load=load,
+                            _recursive={},
+                            _resolve_conflict_map={},
+                        )
+                result.append(keyed_tuple(newrow))
+
+        if frozen_result:
+            return frozen_result.with_new_rows(result)
+        else:
+            return iter(result)
+    finally:
+        session.autoflush = autoflush
+
+
+def get_from_identity(
+    session: Session,
+    mapper: Mapper[_O],
+    key: _IdentityKeyType[_O],
+    passive: PassiveFlag,
+) -> Union[LoaderCallableStatus, Optional[_O]]:
+    """Look up the given key in the given session's identity map,
+    check the object for expired state if found.
+
+    """
+    instance = session.identity_map.get(key)
+    if instance is not None:
+        state = attributes.instance_state(instance)
+
+        if mapper.inherits and not state.mapper.isa(mapper):
+            return attributes.PASSIVE_CLASS_MISMATCH
+
+        # expired - ensure it still exists
+        if state.expired:
+            if not passive & attributes.SQL_OK:
+                # TODO: no coverage here
+                return attributes.PASSIVE_NO_RESULT
+            elif not passive & attributes.RELATED_OBJECT_OK:
+                # this mode is used within a flush and the instance's
+                # expired state will be checked soon enough, if necessary.
+                # also used by immediateloader for a mutually-dependent
+                # o2m->m2m load, :ticket:`6301`
+                return instance
+            try:
+                state._load_expired(state, passive)
+            except orm_exc.ObjectDeletedError:
+                session._remove_newly_deleted([state])
+                return None
+        return instance
+    else:
+        return None
+
+
+def load_on_ident(
+    session: Session,
+    statement: Union[Select, FromStatement],
+    key: Optional[_IdentityKeyType],
+    *,
+    load_options: Optional[Sequence[ORMOption]] = None,
+    refresh_state: Optional[InstanceState[Any]] = None,
+    with_for_update: Optional[ForUpdateArg] = None,
+    only_load_props: Optional[Iterable[str]] = None,
+    no_autoflush: bool = False,
+    bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
+    execution_options: _ExecuteOptions = util.EMPTY_DICT,
+    require_pk_cols: bool = False,
+    is_user_refresh: bool = False,
+):
+    """Load the given identity key from the database."""
+    if key is not None:
+        ident = key[1]
+        identity_token = key[2]
+    else:
+        ident = identity_token = None
+
+    return load_on_pk_identity(
+        session,
+        statement,
+        ident,
+        load_options=load_options,
+        refresh_state=refresh_state,
+        with_for_update=with_for_update,
+        only_load_props=only_load_props,
+        identity_token=identity_token,
+        no_autoflush=no_autoflush,
+        bind_arguments=bind_arguments,
+        execution_options=execution_options,
+        require_pk_cols=require_pk_cols,
+        is_user_refresh=is_user_refresh,
+    )
+
+
+def load_on_pk_identity(
+    session: Session,
+    statement: Union[Select, FromStatement],
+    primary_key_identity: Optional[Tuple[Any, ...]],
+    *,
+    load_options: Optional[Sequence[ORMOption]] = None,
+    refresh_state: Optional[InstanceState[Any]] = None,
+    with_for_update: Optional[ForUpdateArg] = None,
+    only_load_props: Optional[Iterable[str]] = None,
+    identity_token: Optional[Any] = None,
+    no_autoflush: bool = False,
+    bind_arguments: Mapping[str, Any] = util.EMPTY_DICT,
+    execution_options: _ExecuteOptions = util.EMPTY_DICT,
+    require_pk_cols: bool = False,
+    is_user_refresh: bool = False,
+):
+    """Load the given primary key identity from the database."""
+
+    query = statement
+    q = query._clone()
+
+    assert not q._is_lambda_element
+
+    if load_options is None:
+        load_options = QueryContext.default_load_options
+
+    if (
+        statement._compile_options
+        is SelectState.default_select_compile_options
+    ):
+        compile_options = ORMCompileState.default_compile_options
+    else:
+        compile_options = statement._compile_options
+
+    if primary_key_identity is not None:
+        mapper = query._propagate_attrs["plugin_subject"]
+
+        (_get_clause, _get_params) = mapper._get_clause
+
+        # None present in ident - turn those comparisons
+        # into "IS NULL"
+        if None in primary_key_identity:
+            nones = {
+                _get_params[col].key
+                for col, value in zip(mapper.primary_key, primary_key_identity)
+                if value is None
+            }
+
+            _get_clause = sql_util.adapt_criterion_to_null(_get_clause, nones)
+
+            if len(nones) == len(primary_key_identity):
+                util.warn(
+                    "fully NULL primary key identity cannot load any "
+                    "object.  This condition may raise an error in a future "
+                    "release."
+                )
+
+        q._where_criteria = (
+            sql_util._deep_annotate(_get_clause, {"_orm_adapt": True}),
+        )
+
+        params = {
+            _get_params[primary_key].key: id_val
+            for id_val, primary_key in zip(
+                primary_key_identity, mapper.primary_key
+            )
+        }
+    else:
+        params = None
+
+    if with_for_update is not None:
+        version_check = True
+        q._for_update_arg = with_for_update
+    elif query._for_update_arg is not None:
+        version_check = True
+        q._for_update_arg = query._for_update_arg
+    else:
+        version_check = False
+
+    if require_pk_cols and only_load_props:
+        if not refresh_state:
+            raise sa_exc.ArgumentError(
+                "refresh_state is required when require_pk_cols is present"
+            )
+
+        refresh_state_prokeys = refresh_state.mapper._primary_key_propkeys
+        has_changes = {
+            key
+            for key in refresh_state_prokeys.difference(only_load_props)
+            if refresh_state.attrs[key].history.has_changes()
+        }
+        if has_changes:
+            # raise if pending pk changes are present.
+            # technically, this could be limited to the case where we have
+            # relationships in the only_load_props collection to be refreshed
+            # also (and only ones that have a secondary eager loader, at that).
+            # however, the error is in place across the board so that behavior
+            # here is easier to predict.   The use case it prevents is one
+            # of mutating PK attrs, leaving them unflushed,
+            # calling session.refresh(), and expecting those attrs to remain
+            # still unflushed.   It seems likely someone doing all those
+            # things would be better off having the PK attributes flushed
+            # to the database before tinkering like that (session.refresh() is
+            # tinkering).
+            raise sa_exc.InvalidRequestError(
+                f"Please flush pending primary key changes on "
+                "attributes "
+                f"{has_changes} for mapper {refresh_state.mapper} before "
+                "proceeding with a refresh"
+            )
+
+        # overall, the ORM has no internal flow right now for "dont load the
+        # primary row of an object at all, but fire off
+        # selectinload/subqueryload/immediateload for some relationships".
+        # It would probably be a pretty big effort to add such a flow.  So
+        # here, the case for #8703 is introduced; user asks to refresh some
+        # relationship attributes only which are
+        # selectinload/subqueryload/immediateload/ etc. (not joinedload).
+        # ORM complains there's no columns in the primary row to load.
+        # So here, we just add the PK cols if that
+        # case is detected, so that there is a SELECT emitted for the primary
+        # row.
+        #
+        # Let's just state right up front, for this one little case,
+        # the ORM here is adding a whole extra SELECT just to satisfy
+        # limitations in the internal flow.  This is really not a thing
+        # SQLAlchemy finds itself doing like, ever, obviously, we are
+        # constantly working to *remove* SELECTs we don't need.   We
+        # rationalize this for now based on 1. session.refresh() is not
+        # commonly used 2. session.refresh() with only relationship attrs is
+        # even less commonly used 3. the SELECT in question is very low
+        # latency.
+        #
+        # to add the flow to not include the SELECT, the quickest way
+        # might be to just manufacture a single-row result set to send off to
+        # instances(), but we'd have to weave that into context.py and all
+        # that.  For 2.0.0, we have enough big changes to navigate for now.
+        #
+        mp = refresh_state.mapper._props
+        for p in only_load_props:
+            if mp[p]._is_relationship:
+                only_load_props = refresh_state_prokeys.union(only_load_props)
+                break
+
+    if refresh_state and refresh_state.load_options:
+        compile_options += {"_current_path": refresh_state.load_path.parent}
+        q = q.options(*refresh_state.load_options)
+
+    new_compile_options, load_options = _set_get_options(
+        compile_options,
+        load_options,
+        version_check=version_check,
+        only_load_props=only_load_props,
+        refresh_state=refresh_state,
+        identity_token=identity_token,
+        is_user_refresh=is_user_refresh,
+    )
+
+    q._compile_options = new_compile_options
+    q._order_by = None
+
+    if no_autoflush:
+        load_options += {"_autoflush": False}
+
+    execution_options = util.EMPTY_DICT.merge_with(
+        execution_options, {"_sa_orm_load_options": load_options}
+    )
+    result = (
+        session.execute(
+            q,
+            params=params,
+            execution_options=execution_options,
+            bind_arguments=bind_arguments,
+        )
+        .unique()
+        .scalars()
+    )
+
+    try:
+        return result.one()
+    except orm_exc.NoResultFound:
+        return None
+
+
+def _set_get_options(
+    compile_opt,
+    load_opt,
+    populate_existing=None,
+    version_check=None,
+    only_load_props=None,
+    refresh_state=None,
+    identity_token=None,
+    is_user_refresh=None,
+):
+    compile_options = {}
+    load_options = {}
+    if version_check:
+        load_options["_version_check"] = version_check
+    if populate_existing:
+        load_options["_populate_existing"] = populate_existing
+    if refresh_state:
+        load_options["_refresh_state"] = refresh_state
+        compile_options["_for_refresh_state"] = True
+    if only_load_props:
+        compile_options["_only_load_props"] = frozenset(only_load_props)
+    if identity_token:
+        load_options["_identity_token"] = identity_token
+
+    if is_user_refresh:
+        load_options["_is_user_refresh"] = is_user_refresh
+    if load_options:
+        load_opt += load_options
+    if compile_options:
+        compile_opt += compile_options
+
+    return compile_opt, load_opt
+
+
+def _setup_entity_query(
+    compile_state,
+    mapper,
+    query_entity,
+    path,
+    adapter,
+    column_collection,
+    with_polymorphic=None,
+    only_load_props=None,
+    polymorphic_discriminator=None,
+    **kw,
+):
+    if with_polymorphic:
+        poly_properties = mapper._iterate_polymorphic_properties(
+            with_polymorphic
+        )
+    else:
+        poly_properties = mapper._polymorphic_properties
+
+    quick_populators = {}
+
+    path.set(compile_state.attributes, "memoized_setups", quick_populators)
+
+    # for the lead entities in the path, e.g. not eager loads, and
+    # assuming a user-passed aliased class, e.g. not a from_self() or any
+    # implicit aliasing, don't add columns to the SELECT that aren't
+    # in the thing that's aliased.
+    check_for_adapt = adapter and len(path) == 1 and path[-1].is_aliased_class
+
+    for value in poly_properties:
+        if only_load_props and value.key not in only_load_props:
+            continue
+        value.setup(
+            compile_state,
+            query_entity,
+            path,
+            adapter,
+            only_load_props=only_load_props,
+            column_collection=column_collection,
+            memoized_populators=quick_populators,
+            check_for_adapt=check_for_adapt,
+            **kw,
+        )
+
+    if (
+        polymorphic_discriminator is not None
+        and polymorphic_discriminator is not mapper.polymorphic_on
+    ):
+        if adapter:
+            pd = adapter.columns[polymorphic_discriminator]
+        else:
+            pd = polymorphic_discriminator
+        column_collection.append(pd)
+
+
+def _warn_for_runid_changed(state):
+    util.warn(
+        "Loading context for %s has changed within a load/refresh "
+        "handler, suggesting a row refresh operation took place. If this "
+        "event handler is expected to be "
+        "emitting row refresh operations within an existing load or refresh "
+        "operation, set restore_load_context=True when establishing the "
+        "listener to ensure the context remains unchanged when the event "
+        "handler completes." % (state_str(state),)
+    )
+
+
+def _instance_processor(
+    query_entity,
+    mapper,
+    context,
+    result,
+    path,
+    adapter,
+    only_load_props=None,
+    refresh_state=None,
+    polymorphic_discriminator=None,
+    _polymorphic_from=None,
+):
+    """Produce a mapper level row processor callable
+    which processes rows into mapped instances."""
+
+    # note that this method, most of which exists in a closure
+    # called _instance(), resists being broken out, as
+    # attempts to do so tend to add significant function
+    # call overhead.  _instance() is the most
+    # performance-critical section in the whole ORM.
+
+    identity_class = mapper._identity_class
+    compile_state = context.compile_state
+
+    # look for "row getter" functions that have been assigned along
+    # with the compile state that were cached from a previous load.
+    # these are operator.itemgetter() objects that each will extract a
+    # particular column from each row.
+
+    getter_key = ("getters", mapper)
+    getters = path.get(compile_state.attributes, getter_key, None)
+
+    if getters is None:
+        # no getters, so go through a list of attributes we are loading for,
+        # and the ones that are column based will have already put information
+        # for us in another collection "memoized_setups", which represents the
+        # output of the LoaderStrategy.setup_query() method.  We can just as
+        # easily call LoaderStrategy.create_row_processor for each, but by
+        # getting it all at once from setup_query we save another method call
+        # per attribute.
+        props = mapper._prop_set
+        if only_load_props is not None:
+            props = props.intersection(
+                mapper._props[k] for k in only_load_props
+            )
+
+        quick_populators = path.get(
+            context.attributes, "memoized_setups", EMPTY_DICT
+        )
+
+        todo = []
+        cached_populators = {
+            "new": [],
+            "quick": [],
+            "deferred": [],
+            "expire": [],
+            "existing": [],
+            "eager": [],
+        }
+
+        if refresh_state is None:
+            # we can also get the "primary key" tuple getter function
+            pk_cols = mapper.primary_key
+
+            if adapter:
+                pk_cols = [adapter.columns[c] for c in pk_cols]
+            primary_key_getter = result._tuple_getter(pk_cols)
+        else:
+            primary_key_getter = None
+
+        getters = {
+            "cached_populators": cached_populators,
+            "todo": todo,
+            "primary_key_getter": primary_key_getter,
+        }
+        for prop in props:
+            if prop in quick_populators:
+                # this is an inlined path just for column-based attributes.
+                col = quick_populators[prop]
+                if col is _DEFER_FOR_STATE:
+                    cached_populators["new"].append(
+                        (prop.key, prop._deferred_column_loader)
+                    )
+                elif col is _SET_DEFERRED_EXPIRED:
+                    # note that in this path, we are no longer
+                    # searching in the result to see if the column might
+                    # be present in some unexpected way.
+                    cached_populators["expire"].append((prop.key, False))
+                elif col is _RAISE_FOR_STATE:
+                    cached_populators["new"].append(
+                        (prop.key, prop._raise_column_loader)
+                    )
+                else:
+                    getter = None
+                    if adapter:
+                        # this logic had been removed for all 1.4 releases
+                        # up until 1.4.18; the adapter here is particularly
+                        # the compound eager adapter which isn't accommodated
+                        # in the quick_populators right now.  The "fallback"
+                        # logic below instead took over in many more cases
+                        # until issue #6596 was identified.
+
+                        # note there is still an issue where this codepath
+                        # produces no "getter" for cases where a joined-inh
+                        # mapping includes a labeled column property, meaning
+                        # KeyError is caught internally and we fall back to
+                        # _getter(col), which works anyway.   The adapter
+                        # here for joined inh without any aliasing might not
+                        # be useful.  Tests which see this include
+                        # test.orm.inheritance.test_basic ->
+                        # EagerTargetingTest.test_adapt_stringency
+                        # OptimizedLoadTest.test_column_expression_joined
+                        # PolymorphicOnNotLocalTest.test_polymorphic_on_column_prop  # noqa: E501
+                        #
+
+                        adapted_col = adapter.columns[col]
+                        if adapted_col is not None:
+                            getter = result._getter(adapted_col, False)
+                    if not getter:
+                        getter = result._getter(col, False)
+                    if getter:
+                        cached_populators["quick"].append((prop.key, getter))
+                    else:
+                        # fall back to the ColumnProperty itself, which
+                        # will iterate through all of its columns
+                        # to see if one fits
+                        prop.create_row_processor(
+                            context,
+                            query_entity,
+                            path,
+                            mapper,
+                            result,
+                            adapter,
+                            cached_populators,
+                        )
+            else:
+                # loader strategies like subqueryload, selectinload,
+                # joinedload, basically relationships, these need to interact
+                # with the context each time to work correctly.
+                todo.append(prop)
+
+        path.set(compile_state.attributes, getter_key, getters)
+
+    cached_populators = getters["cached_populators"]
+
+    populators = {key: list(value) for key, value in cached_populators.items()}
+    for prop in getters["todo"]:
+        prop.create_row_processor(
+            context, query_entity, path, mapper, result, adapter, populators
+        )
+
+    propagated_loader_options = context.propagated_loader_options
+    load_path = (
+        context.compile_state.current_path + path
+        if context.compile_state.current_path.path
+        else path
+    )
+
+    session_identity_map = context.session.identity_map
+
+    populate_existing = context.populate_existing or mapper.always_refresh
+    load_evt = bool(mapper.class_manager.dispatch.load)
+    refresh_evt = bool(mapper.class_manager.dispatch.refresh)
+    persistent_evt = bool(context.session.dispatch.loaded_as_persistent)
+    if persistent_evt:
+        loaded_as_persistent = context.session.dispatch.loaded_as_persistent
+    instance_state = attributes.instance_state
+    instance_dict = attributes.instance_dict
+    session_id = context.session.hash_key
+    runid = context.runid
+    identity_token = context.identity_token
+
+    version_check = context.version_check
+    if version_check:
+        version_id_col = mapper.version_id_col
+        if version_id_col is not None:
+            if adapter:
+                version_id_col = adapter.columns[version_id_col]
+            version_id_getter = result._getter(version_id_col)
+        else:
+            version_id_getter = None
+
+    if not refresh_state and _polymorphic_from is not None:
+        key = ("loader", path.path)
+
+        if key in context.attributes and context.attributes[key].strategy == (
+            ("selectinload_polymorphic", True),
+        ):
+            option_entities = context.attributes[key].local_opts["entities"]
+        else:
+            option_entities = None
+        selectin_load_via = mapper._should_selectin_load(
+            option_entities,
+            _polymorphic_from,
+        )
+
+        if selectin_load_via and selectin_load_via is not _polymorphic_from:
+            # only_load_props goes w/ refresh_state only, and in a refresh
+            # we are a single row query for the exact entity; polymorphic
+            # loading does not apply
+            assert only_load_props is None
+
+            if selectin_load_via.is_mapper:
+                _load_supers = []
+                _endmost_mapper = selectin_load_via
+                while (
+                    _endmost_mapper
+                    and _endmost_mapper is not _polymorphic_from
+                ):
+                    _load_supers.append(_endmost_mapper)
+                    _endmost_mapper = _endmost_mapper.inherits
+            else:
+                _load_supers = [selectin_load_via]
+
+            for _selectinload_entity in _load_supers:
+                if PostLoad.path_exists(
+                    context, load_path, _selectinload_entity
+                ):
+                    continue
+                callable_ = _load_subclass_via_in(
+                    context,
+                    path,
+                    _selectinload_entity,
+                    _polymorphic_from,
+                    option_entities,
+                )
+                PostLoad.callable_for_path(
+                    context,
+                    load_path,
+                    _selectinload_entity.mapper,
+                    _selectinload_entity,
+                    callable_,
+                    _selectinload_entity,
+                )
+
+    post_load = PostLoad.for_context(context, load_path, only_load_props)
+
+    if refresh_state:
+        refresh_identity_key = refresh_state.key
+        if refresh_identity_key is None:
+            # super-rare condition; a refresh is being called
+            # on a non-instance-key instance; this is meant to only
+            # occur within a flush()
+            refresh_identity_key = mapper._identity_key_from_state(
+                refresh_state
+            )
+    else:
+        refresh_identity_key = None
+
+        primary_key_getter = getters["primary_key_getter"]
+
+    if mapper.allow_partial_pks:
+        is_not_primary_key = _none_set.issuperset
+    else:
+        is_not_primary_key = _none_set.intersection
+
+    def _instance(row):
+        # determine the state that we'll be populating
+        if refresh_identity_key:
+            # fixed state that we're refreshing
+            state = refresh_state
+            instance = state.obj()
+            dict_ = instance_dict(instance)
+            isnew = state.runid != runid
+            currentload = True
+            loaded_instance = False
+        else:
+            # look at the row, see if that identity is in the
+            # session, or we have to create a new one
+            identitykey = (
+                identity_class,
+                primary_key_getter(row),
+                identity_token,
+            )
+
+            instance = session_identity_map.get(identitykey)
+
+            if instance is not None:
+                # existing instance
+                state = instance_state(instance)
+                dict_ = instance_dict(instance)
+
+                isnew = state.runid != runid
+                currentload = not isnew
+                loaded_instance = False
+
+                if version_check and version_id_getter and not currentload:
+                    _validate_version_id(
+                        mapper, state, dict_, row, version_id_getter
+                    )
+
+            else:
+                # create a new instance
+
+                # check for non-NULL values in the primary key columns,
+                # else no entity is returned for the row
+                if is_not_primary_key(identitykey[1]):
+                    return None
+
+                isnew = True
+                currentload = True
+                loaded_instance = True
+
+                instance = mapper.class_manager.new_instance()
+
+                dict_ = instance_dict(instance)
+                state = instance_state(instance)
+                state.key = identitykey
+                state.identity_token = identity_token
+
+                # attach instance to session.
+                state.session_id = session_id
+                session_identity_map._add_unpresent(state, identitykey)
+
+        effective_populate_existing = populate_existing
+        if refresh_state is state:
+            effective_populate_existing = True
+
+        # populate.  this looks at whether this state is new
+        # for this load or was existing, and whether or not this
+        # row is the first row with this identity.
+        if currentload or effective_populate_existing:
+            # full population routines.  Objects here are either
+            # just created, or we are doing a populate_existing
+
+            # be conservative about setting load_path when populate_existing
+            # is in effect; want to maintain options from the original
+            # load.  see test_expire->test_refresh_maintains_deferred_options
+            if isnew and (
+                propagated_loader_options or not effective_populate_existing
+            ):
+                state.load_options = propagated_loader_options
+                state.load_path = load_path
+
+            _populate_full(
+                context,
+                row,
+                state,
+                dict_,
+                isnew,
+                load_path,
+                loaded_instance,
+                effective_populate_existing,
+                populators,
+            )
+
+            if isnew:
+                # state.runid should be equal to context.runid / runid
+                # here, however for event checks we are being more conservative
+                # and checking against existing run id
+                # assert state.runid == runid
+
+                existing_runid = state.runid
+
+                if loaded_instance:
+                    if load_evt:
+                        state.manager.dispatch.load(state, context)
+                        if state.runid != existing_runid:
+                            _warn_for_runid_changed(state)
+                    if persistent_evt:
+                        loaded_as_persistent(context.session, state)
+                        if state.runid != existing_runid:
+                            _warn_for_runid_changed(state)
+                elif refresh_evt:
+                    state.manager.dispatch.refresh(
+                        state, context, only_load_props
+                    )
+                    if state.runid != runid:
+                        _warn_for_runid_changed(state)
+
+                if effective_populate_existing or state.modified:
+                    if refresh_state and only_load_props:
+                        state._commit(dict_, only_load_props)
+                    else:
+                        state._commit_all(dict_, session_identity_map)
+
+            if post_load:
+                post_load.add_state(state, True)
+
+        else:
+            # partial population routines, for objects that were already
+            # in the Session, but a row matches them; apply eager loaders
+            # on existing objects, etc.
+            unloaded = state.unloaded
+            isnew = state not in context.partials
+
+            if not isnew or unloaded or populators["eager"]:
+                # state is having a partial set of its attributes
+                # refreshed.  Populate those attributes,
+                # and add to the "context.partials" collection.
+
+                to_load = _populate_partial(
+                    context,
+                    row,
+                    state,
+                    dict_,
+                    isnew,
+                    load_path,
+                    unloaded,
+                    populators,
+                )
+
+                if isnew:
+                    if refresh_evt:
+                        existing_runid = state.runid
+                        state.manager.dispatch.refresh(state, context, to_load)
+                        if state.runid != existing_runid:
+                            _warn_for_runid_changed(state)
+
+                    state._commit(dict_, to_load)
+
+            if post_load and context.invoke_all_eagers:
+                post_load.add_state(state, False)
+
+        return instance
+
+    if mapper.polymorphic_map and not _polymorphic_from and not refresh_state:
+        # if we are doing polymorphic, dispatch to a different _instance()
+        # method specific to the subclass mapper
+        def ensure_no_pk(row):
+            identitykey = (
+                identity_class,
+                primary_key_getter(row),
+                identity_token,
+            )
+            if not is_not_primary_key(identitykey[1]):
+                return identitykey
+            else:
+                return None
+
+        _instance = _decorate_polymorphic_switch(
+            _instance,
+            context,
+            query_entity,
+            mapper,
+            result,
+            path,
+            polymorphic_discriminator,
+            adapter,
+            ensure_no_pk,
+        )
+
+    return _instance
+
+
+def _load_subclass_via_in(
+    context, path, entity, polymorphic_from, option_entities
+):
+    mapper = entity.mapper
+
+    # TODO: polymorphic_from seems to be a Mapper in all cases.
+    # this is likely not needed, but as we dont have typing in loading.py
+    # yet, err on the safe side
+    polymorphic_from_mapper = polymorphic_from.mapper
+    not_against_basemost = polymorphic_from_mapper.inherits is not None
+
+    zero_idx = len(mapper.base_mapper.primary_key) == 1
+
+    if entity.is_aliased_class or not_against_basemost:
+        q, enable_opt, disable_opt = mapper._subclass_load_via_in(
+            entity, polymorphic_from
+        )
+    else:
+        q, enable_opt, disable_opt = mapper._subclass_load_via_in_mapper
+
+    def do_load(context, path, states, load_only, effective_entity):
+        if not option_entities:
+            # filter out states for those that would have selectinloaded
+            # from another loader
+            # TODO: we are currently ignoring the case where the
+            # "selectin_polymorphic" option is used, as this is much more
+            # complex / specific / very uncommon API use
+            states = [
+                (s, v)
+                for s, v in states
+                if s.mapper._would_selectin_load_only_from_given_mapper(mapper)
+            ]
+
+            if not states:
+                return
+
+        orig_query = context.query
+
+        if path.parent:
+            enable_opt_lcl = enable_opt._prepend_path(path)
+            disable_opt_lcl = disable_opt._prepend_path(path)
+        else:
+            enable_opt_lcl = enable_opt
+            disable_opt_lcl = disable_opt
+        options = (
+            (enable_opt_lcl,) + orig_query._with_options + (disable_opt_lcl,)
+        )
+
+        q2 = q.options(*options)
+
+        q2._compile_options = context.compile_state.default_compile_options
+        q2._compile_options += {"_current_path": path.parent}
+
+        if context.populate_existing:
+            q2 = q2.execution_options(populate_existing=True)
+
+        context.session.execute(
+            q2,
+            dict(
+                primary_keys=[
+                    state.key[1][0] if zero_idx else state.key[1]
+                    for state, load_attrs in states
+                ]
+            ),
+        ).unique().scalars().all()
+
+    return do_load
+
+
+def _populate_full(
+    context,
+    row,
+    state,
+    dict_,
+    isnew,
+    load_path,
+    loaded_instance,
+    populate_existing,
+    populators,
+):
+    if isnew:
+        # first time we are seeing a row with this identity.
+        state.runid = context.runid
+
+        for key, getter in populators["quick"]:
+            dict_[key] = getter(row)
+        if populate_existing:
+            for key, set_callable in populators["expire"]:
+                dict_.pop(key, None)
+                if set_callable:
+                    state.expired_attributes.add(key)
+        else:
+            for key, set_callable in populators["expire"]:
+                if set_callable:
+                    state.expired_attributes.add(key)
+
+        for key, populator in populators["new"]:
+            populator(state, dict_, row)
+
+    elif load_path != state.load_path:
+        # new load path, e.g. object is present in more than one
+        # column position in a series of rows
+        state.load_path = load_path
+
+        # if we have data, and the data isn't in the dict, OK, let's put
+        # it in.
+        for key, getter in populators["quick"]:
+            if key not in dict_:
+                dict_[key] = getter(row)
+
+        # otherwise treat like an "already seen" row
+        for key, populator in populators["existing"]:
+            populator(state, dict_, row)
+            # TODO:  allow "existing" populator to know this is
+            # a new path for the state:
+            # populator(state, dict_, row, new_path=True)
+
+    else:
+        # have already seen rows with this identity in this same path.
+        for key, populator in populators["existing"]:
+            populator(state, dict_, row)
+
+            # TODO: same path
+            # populator(state, dict_, row, new_path=False)
+
+
+def _populate_partial(
+    context, row, state, dict_, isnew, load_path, unloaded, populators
+):
+    if not isnew:
+        if unloaded:
+            # extra pass, see #8166
+            for key, getter in populators["quick"]:
+                if key in unloaded:
+                    dict_[key] = getter(row)
+
+        to_load = context.partials[state]
+        for key, populator in populators["existing"]:
+            if key in to_load:
+                populator(state, dict_, row)
+    else:
+        to_load = unloaded
+        context.partials[state] = to_load
+
+        for key, getter in populators["quick"]:
+            if key in to_load:
+                dict_[key] = getter(row)
+        for key, set_callable in populators["expire"]:
+            if key in to_load:
+                dict_.pop(key, None)
+                if set_callable:
+                    state.expired_attributes.add(key)
+        for key, populator in populators["new"]:
+            if key in to_load:
+                populator(state, dict_, row)
+
+    for key, populator in populators["eager"]:
+        if key not in unloaded:
+            populator(state, dict_, row)
+
+    return to_load
+
+
+def _validate_version_id(mapper, state, dict_, row, getter):
+    if mapper._get_state_attr_by_column(
+        state, dict_, mapper.version_id_col
+    ) != getter(row):
+        raise orm_exc.StaleDataError(
+            "Instance '%s' has version id '%s' which "
+            "does not match database-loaded version id '%s'."
+            % (
+                state_str(state),
+                mapper._get_state_attr_by_column(
+                    state, dict_, mapper.version_id_col
+                ),
+                getter(row),
+            )
+        )
+
+
+def _decorate_polymorphic_switch(
+    instance_fn,
+    context,
+    query_entity,
+    mapper,
+    result,
+    path,
+    polymorphic_discriminator,
+    adapter,
+    ensure_no_pk,
+):
+    if polymorphic_discriminator is not None:
+        polymorphic_on = polymorphic_discriminator
+    else:
+        polymorphic_on = mapper.polymorphic_on
+    if polymorphic_on is None:
+        return instance_fn
+
+    if adapter:
+        polymorphic_on = adapter.columns[polymorphic_on]
+
+    def configure_subclass_mapper(discriminator):
+        try:
+            sub_mapper = mapper.polymorphic_map[discriminator]
+        except KeyError:
+            raise AssertionError(
+                "No such polymorphic_identity %r is defined" % discriminator
+            )
+        else:
+            if sub_mapper is mapper:
+                return None
+            elif not sub_mapper.isa(mapper):
+                return False
+
+            return _instance_processor(
+                query_entity,
+                sub_mapper,
+                context,
+                result,
+                path,
+                adapter,
+                _polymorphic_from=mapper,
+            )
+
+    polymorphic_instances = util.PopulateDict(configure_subclass_mapper)
+
+    getter = result._getter(polymorphic_on)
+
+    def polymorphic_instance(row):
+        discriminator = getter(row)
+        if discriminator is not None:
+            _instance = polymorphic_instances[discriminator]
+            if _instance:
+                return _instance(row)
+            elif _instance is False:
+                identitykey = ensure_no_pk(row)
+
+                if identitykey:
+                    raise sa_exc.InvalidRequestError(
+                        "Row with identity key %s can't be loaded into an "
+                        "object; the polymorphic discriminator column '%s' "
+                        "refers to %s, which is not a sub-mapper of "
+                        "the requested %s"
+                        % (
+                            identitykey,
+                            polymorphic_on,
+                            mapper.polymorphic_map[discriminator],
+                            mapper,
+                        )
+                    )
+                else:
+                    return None
+            else:
+                return instance_fn(row)
+        else:
+            identitykey = ensure_no_pk(row)
+
+            if identitykey:
+                raise sa_exc.InvalidRequestError(
+                    "Row with identity key %s can't be loaded into an "
+                    "object; the polymorphic discriminator column '%s' is "
+                    "NULL" % (identitykey, polymorphic_on)
+                )
+            else:
+                return None
+
+    return polymorphic_instance
+
+
+class PostLoad:
+    """Track loaders and states for "post load" operations."""
+
+    __slots__ = "loaders", "states", "load_keys"
+
+    def __init__(self):
+        self.loaders = {}
+        self.states = util.OrderedDict()
+        self.load_keys = None
+
+    def add_state(self, state, overwrite):
+        # the states for a polymorphic load here are all shared
+        # within a single PostLoad object among multiple subtypes.
+        # Filtering of callables on a per-subclass basis needs to be done at
+        # the invocation level
+        self.states[state] = overwrite
+
+    def invoke(self, context, path):
+        if not self.states:
+            return
+        path = path_registry.PathRegistry.coerce(path)
+        for (
+            effective_context,
+            token,
+            limit_to_mapper,
+            loader,
+            arg,
+            kw,
+        ) in self.loaders.values():
+            states = [
+                (state, overwrite)
+                for state, overwrite in self.states.items()
+                if state.manager.mapper.isa(limit_to_mapper)
+            ]
+            if states:
+                loader(
+                    effective_context, path, states, self.load_keys, *arg, **kw
+                )
+        self.states.clear()
+
+    @classmethod
+    def for_context(cls, context, path, only_load_props):
+        pl = context.post_load_paths.get(path.path)
+        if pl is not None and only_load_props:
+            pl.load_keys = only_load_props
+        return pl
+
+    @classmethod
+    def path_exists(self, context, path, key):
+        return (
+            path.path in context.post_load_paths
+            and key in context.post_load_paths[path.path].loaders
+        )
+
+    @classmethod
+    def callable_for_path(
+        cls, context, path, limit_to_mapper, token, loader_callable, *arg, **kw
+    ):
+        if path.path in context.post_load_paths:
+            pl = context.post_load_paths[path.path]
+        else:
+            pl = context.post_load_paths[path.path] = PostLoad()
+        pl.loaders[token] = (
+            context,
+            token,
+            limit_to_mapper,
+            loader_callable,
+            arg,
+            kw,
+        )
+
+
+def load_scalar_attributes(mapper, state, attribute_names, passive):
+    """initiate a column-based attribute refresh operation."""
+
+    # assert mapper is _state_mapper(state)
+    session = state.session
+    if not session:
+        raise orm_exc.DetachedInstanceError(
+            "Instance %s is not bound to a Session; "
+            "attribute refresh operation cannot proceed" % (state_str(state))
+        )
+
+    no_autoflush = bool(passive & attributes.NO_AUTOFLUSH)
+
+    # in the case of inheritance, particularly concrete and abstract
+    # concrete inheritance, the class manager might have some keys
+    # of attributes on the superclass that we didn't actually map.
+    # These could be mapped as "concrete, don't load" or could be completely
+    # excluded from the mapping and we know nothing about them.  Filter them
+    # here to prevent them from coming through.
+    if attribute_names:
+        attribute_names = attribute_names.intersection(mapper.attrs.keys())
+
+    if mapper.inherits and not mapper.concrete:
+        # load based on committed attributes in the object, formed into
+        # a truncated SELECT that only includes relevant tables.  does not
+        # currently use state.key
+        statement = mapper._optimized_get_statement(state, attribute_names)
+        if statement is not None:
+            # undefer() isn't needed here because statement has the
+            # columns needed already, this implicitly undefers that column
+            stmt = FromStatement(mapper, statement)
+
+            return load_on_ident(
+                session,
+                stmt,
+                None,
+                only_load_props=attribute_names,
+                refresh_state=state,
+                no_autoflush=no_autoflush,
+            )
+
+    # normal load, use state.key as the identity to SELECT
+    has_key = bool(state.key)
+
+    if has_key:
+        identity_key = state.key
+    else:
+        # this codepath is rare - only valid when inside a flush, and the
+        # object is becoming persistent but hasn't yet been assigned
+        # an identity_key.
+        # check here to ensure we have the attrs we need.
+        pk_attrs = [
+            mapper._columntoproperty[col].key for col in mapper.primary_key
+        ]
+        if state.expired_attributes.intersection(pk_attrs):
+            raise sa_exc.InvalidRequestError(
+                "Instance %s cannot be refreshed - it's not "
+                " persistent and does not "
+                "contain a full primary key." % state_str(state)
+            )
+        identity_key = mapper._identity_key_from_state(state)
+
+    if (
+        _none_set.issubset(identity_key) and not mapper.allow_partial_pks
+    ) or _none_set.issuperset(identity_key):
+        util.warn_limited(
+            "Instance %s to be refreshed doesn't "
+            "contain a full primary key - can't be refreshed "
+            "(and shouldn't be expired, either).",
+            state_str(state),
+        )
+        return
+
+    result = load_on_ident(
+        session,
+        select(mapper).set_label_style(LABEL_STYLE_TABLENAME_PLUS_COL),
+        identity_key,
+        refresh_state=state,
+        only_load_props=attribute_names,
+        no_autoflush=no_autoflush,
+    )
+
+    # if instance is pending, a refresh operation
+    # may not complete (even if PK attributes are assigned)
+    if has_key and result is None:
+        raise orm_exc.ObjectDeletedError(state)