about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/alembic/runtime
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/runtime')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/runtime/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/runtime/environment.py1051
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/runtime/migration.py1391
3 files changed, 2442 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/runtime/__init__.py b/.venv/lib/python3.12/site-packages/alembic/runtime/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/runtime/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/alembic/runtime/environment.py b/.venv/lib/python3.12/site-packages/alembic/runtime/environment.py
new file mode 100644
index 00000000..1ff71eef
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/runtime/environment.py
@@ -0,0 +1,1051 @@
+from __future__ import annotations
+
+from typing import Any
+from typing import Callable
+from typing import Collection
+from typing import Dict
+from typing import List
+from typing import Mapping
+from typing import MutableMapping
+from typing import Optional
+from typing import overload
+from typing import Sequence
+from typing import TextIO
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy.sql.schema import Column
+from sqlalchemy.sql.schema import FetchedValue
+from typing_extensions import ContextManager
+from typing_extensions import Literal
+
+from .migration import _ProxyTransaction
+from .migration import MigrationContext
+from .. import util
+from ..operations import Operations
+from ..script.revision import _GetRevArg
+
+if TYPE_CHECKING:
+    from sqlalchemy.engine import URL
+    from sqlalchemy.engine.base import Connection
+    from sqlalchemy.sql import Executable
+    from sqlalchemy.sql.schema import MetaData
+    from sqlalchemy.sql.schema import SchemaItem
+    from sqlalchemy.sql.type_api import TypeEngine
+
+    from .migration import MigrationInfo
+    from ..autogenerate.api import AutogenContext
+    from ..config import Config
+    from ..ddl import DefaultImpl
+    from ..operations.ops import MigrationScript
+    from ..script.base import ScriptDirectory
+
+_RevNumber = Optional[Union[str, Tuple[str, ...]]]
+
+ProcessRevisionDirectiveFn = Callable[
+    [MigrationContext, _GetRevArg, List["MigrationScript"]], None
+]
+
+RenderItemFn = Callable[
+    [str, Any, "AutogenContext"], Union[str, Literal[False]]
+]
+
+NameFilterType = Literal[
+    "schema",
+    "table",
+    "column",
+    "index",
+    "unique_constraint",
+    "foreign_key_constraint",
+]
+NameFilterParentNames = MutableMapping[
+    Literal["schema_name", "table_name", "schema_qualified_table_name"],
+    Optional[str],
+]
+IncludeNameFn = Callable[
+    [Optional[str], NameFilterType, NameFilterParentNames], bool
+]
+
+IncludeObjectFn = Callable[
+    [
+        "SchemaItem",
+        Optional[str],
+        NameFilterType,
+        bool,
+        Optional["SchemaItem"],
+    ],
+    bool,
+]
+
+OnVersionApplyFn = Callable[
+    [MigrationContext, "MigrationInfo", Collection[Any], Mapping[str, Any]],
+    None,
+]
+
+CompareServerDefault = Callable[
+    [
+        MigrationContext,
+        "Column[Any]",
+        "Column[Any]",
+        Optional[str],
+        Optional[FetchedValue],
+        Optional[str],
+    ],
+    Optional[bool],
+]
+
+CompareType = Callable[
+    [
+        MigrationContext,
+        "Column[Any]",
+        "Column[Any]",
+        "TypeEngine[Any]",
+        "TypeEngine[Any]",
+    ],
+    Optional[bool],
+]
+
+
+class EnvironmentContext(util.ModuleClsProxy):
+    """A configurational facade made available in an ``env.py`` script.
+
+    The :class:`.EnvironmentContext` acts as a *facade* to the more
+    nuts-and-bolts objects of :class:`.MigrationContext` as well as certain
+    aspects of :class:`.Config`,
+    within the context of the ``env.py`` script that is invoked by
+    most Alembic commands.
+
+    :class:`.EnvironmentContext` is normally instantiated
+    when a command in :mod:`alembic.command` is run.  It then makes
+    itself available in the ``alembic.context`` module for the scope
+    of the command.   From within an ``env.py`` script, the current
+    :class:`.EnvironmentContext` is available by importing this module.
+
+    :class:`.EnvironmentContext` also supports programmatic usage.
+    At this level, it acts as a Python context manager, that is, is
+    intended to be used using the
+    ``with:`` statement.  A typical use of :class:`.EnvironmentContext`::
+
+        from alembic.config import Config
+        from alembic.script import ScriptDirectory
+
+        config = Config()
+        config.set_main_option("script_location", "myapp:migrations")
+        script = ScriptDirectory.from_config(config)
+
+
+        def my_function(rev, context):
+            '''do something with revision "rev", which
+            will be the current database revision,
+            and "context", which is the MigrationContext
+            that the env.py will create'''
+
+
+        with EnvironmentContext(
+            config,
+            script,
+            fn=my_function,
+            as_sql=False,
+            starting_rev="base",
+            destination_rev="head",
+            tag="sometag",
+        ):
+            script.run_env()
+
+    The above script will invoke the ``env.py`` script
+    within the migration environment.  If and when ``env.py``
+    calls :meth:`.MigrationContext.run_migrations`, the
+    ``my_function()`` function above will be called
+    by the :class:`.MigrationContext`, given the context
+    itself as well as the current revision in the database.
+
+    .. note::
+
+        For most API usages other than full blown
+        invocation of migration scripts, the :class:`.MigrationContext`
+        and :class:`.ScriptDirectory` objects can be created and
+        used directly.  The :class:`.EnvironmentContext` object
+        is *only* needed when you need to actually invoke the
+        ``env.py`` module present in the migration environment.
+
+    """
+
+    _migration_context: Optional[MigrationContext] = None
+
+    config: Config = None  # type:ignore[assignment]
+    """An instance of :class:`.Config` representing the
+    configuration file contents as well as other variables
+    set programmatically within it."""
+
+    script: ScriptDirectory = None  # type:ignore[assignment]
+    """An instance of :class:`.ScriptDirectory` which provides
+    programmatic access to version files within the ``versions/``
+    directory.
+
+    """
+
+    def __init__(
+        self, config: Config, script: ScriptDirectory, **kw: Any
+    ) -> None:
+        r"""Construct a new :class:`.EnvironmentContext`.
+
+        :param config: a :class:`.Config` instance.
+        :param script: a :class:`.ScriptDirectory` instance.
+        :param \**kw: keyword options that will be ultimately
+         passed along to the :class:`.MigrationContext` when
+         :meth:`.EnvironmentContext.configure` is called.
+
+        """
+        self.config = config
+        self.script = script
+        self.context_opts = kw
+
+    def __enter__(self) -> EnvironmentContext:
+        """Establish a context which provides a
+        :class:`.EnvironmentContext` object to
+        env.py scripts.
+
+        The :class:`.EnvironmentContext` will
+        be made available as ``from alembic import context``.
+
+        """
+        self._install_proxy()
+        return self
+
+    def __exit__(self, *arg: Any, **kw: Any) -> None:
+        self._remove_proxy()
+
+    def is_offline_mode(self) -> bool:
+        """Return True if the current migrations environment
+        is running in "offline mode".
+
+        This is ``True`` or ``False`` depending
+        on the ``--sql`` flag passed.
+
+        This function does not require that the :class:`.MigrationContext`
+        has been configured.
+
+        """
+        return self.context_opts.get("as_sql", False)  # type: ignore[no-any-return]  # noqa: E501
+
+    def is_transactional_ddl(self) -> bool:
+        """Return True if the context is configured to expect a
+        transactional DDL capable backend.
+
+        This defaults to the type of database in use, and
+        can be overridden by the ``transactional_ddl`` argument
+        to :meth:`.configure`
+
+        This function requires that a :class:`.MigrationContext`
+        has first been made available via :meth:`.configure`.
+
+        """
+        return self.get_context().impl.transactional_ddl
+
+    def requires_connection(self) -> bool:
+        return not self.is_offline_mode()
+
+    def get_head_revision(self) -> _RevNumber:
+        """Return the hex identifier of the 'head' script revision.
+
+        If the script directory has multiple heads, this
+        method raises a :class:`.CommandError`;
+        :meth:`.EnvironmentContext.get_head_revisions` should be preferred.
+
+        This function does not require that the :class:`.MigrationContext`
+        has been configured.
+
+        .. seealso:: :meth:`.EnvironmentContext.get_head_revisions`
+
+        """
+        return self.script.as_revision_number("head")
+
+    def get_head_revisions(self) -> _RevNumber:
+        """Return the hex identifier of the 'heads' script revision(s).
+
+        This returns a tuple containing the version number of all
+        heads in the script directory.
+
+        This function does not require that the :class:`.MigrationContext`
+        has been configured.
+
+        """
+        return self.script.as_revision_number("heads")
+
+    def get_starting_revision_argument(self) -> _RevNumber:
+        """Return the 'starting revision' argument,
+        if the revision was passed using ``start:end``.
+
+        This is only meaningful in "offline" mode.
+        Returns ``None`` if no value is available
+        or was configured.
+
+        This function does not require that the :class:`.MigrationContext`
+        has been configured.
+
+        """
+        if self._migration_context is not None:
+            return self.script.as_revision_number(
+                self.get_context()._start_from_rev
+            )
+        elif "starting_rev" in self.context_opts:
+            return self.script.as_revision_number(
+                self.context_opts["starting_rev"]
+            )
+        else:
+            # this should raise only in the case that a command
+            # is being run where the "starting rev" is never applicable;
+            # this is to catch scripts which rely upon this in
+            # non-sql mode or similar
+            raise util.CommandError(
+                "No starting revision argument is available."
+            )
+
+    def get_revision_argument(self) -> _RevNumber:
+        """Get the 'destination' revision argument.
+
+        This is typically the argument passed to the
+        ``upgrade`` or ``downgrade`` command.
+
+        If it was specified as ``head``, the actual
+        version number is returned; if specified
+        as ``base``, ``None`` is returned.
+
+        This function does not require that the :class:`.MigrationContext`
+        has been configured.
+
+        """
+        return self.script.as_revision_number(
+            self.context_opts["destination_rev"]
+        )
+
+    def get_tag_argument(self) -> Optional[str]:
+        """Return the value passed for the ``--tag`` argument, if any.
+
+        The ``--tag`` argument is not used directly by Alembic,
+        but is available for custom ``env.py`` configurations that
+        wish to use it; particularly for offline generation scripts
+        that wish to generate tagged filenames.
+
+        This function does not require that the :class:`.MigrationContext`
+        has been configured.
+
+        .. seealso::
+
+            :meth:`.EnvironmentContext.get_x_argument` - a newer and more
+            open ended system of extending ``env.py`` scripts via the command
+            line.
+
+        """
+        return self.context_opts.get("tag", None)  # type: ignore[no-any-return]  # noqa: E501
+
+    @overload
+    def get_x_argument(self, as_dictionary: Literal[False]) -> List[str]: ...
+
+    @overload
+    def get_x_argument(
+        self, as_dictionary: Literal[True]
+    ) -> Dict[str, str]: ...
+
+    @overload
+    def get_x_argument(
+        self, as_dictionary: bool = ...
+    ) -> Union[List[str], Dict[str, str]]: ...
+
+    def get_x_argument(
+        self, as_dictionary: bool = False
+    ) -> Union[List[str], Dict[str, str]]:
+        """Return the value(s) passed for the ``-x`` argument, if any.
+
+        The ``-x`` argument is an open ended flag that allows any user-defined
+        value or values to be passed on the command line, then available
+        here for consumption by a custom ``env.py`` script.
+
+        The return value is a list, returned directly from the ``argparse``
+        structure.  If ``as_dictionary=True`` is passed, the ``x`` arguments
+        are parsed using ``key=value`` format into a dictionary that is
+        then returned. If there is no ``=`` in the argument, value is an empty
+        string.
+
+        .. versionchanged:: 1.13.1 Support ``as_dictionary=True`` when
+           arguments are passed without the ``=`` symbol.
+
+        For example, to support passing a database URL on the command line,
+        the standard ``env.py`` script can be modified like this::
+
+            cmd_line_url = context.get_x_argument(
+                as_dictionary=True).get('dbname')
+            if cmd_line_url:
+                engine = create_engine(cmd_line_url)
+            else:
+                engine = engine_from_config(
+                        config.get_section(config.config_ini_section),
+                        prefix='sqlalchemy.',
+                        poolclass=pool.NullPool)
+
+        This then takes effect by running the ``alembic`` script as::
+
+            alembic -x dbname=postgresql://user:pass@host/dbname upgrade head
+
+        This function does not require that the :class:`.MigrationContext`
+        has been configured.
+
+        .. seealso::
+
+            :meth:`.EnvironmentContext.get_tag_argument`
+
+            :attr:`.Config.cmd_opts`
+
+        """
+        if self.config.cmd_opts is not None:
+            value = self.config.cmd_opts.x or []
+        else:
+            value = []
+        if as_dictionary:
+            dict_value = {}
+            for arg in value:
+                x_key, _, x_value = arg.partition("=")
+                dict_value[x_key] = x_value
+            value = dict_value
+
+        return value
+
+    def configure(
+        self,
+        connection: Optional[Connection] = None,
+        url: Optional[Union[str, URL]] = None,
+        dialect_name: Optional[str] = None,
+        dialect_opts: Optional[Dict[str, Any]] = None,
+        transactional_ddl: Optional[bool] = None,
+        transaction_per_migration: bool = False,
+        output_buffer: Optional[TextIO] = None,
+        starting_rev: Optional[str] = None,
+        tag: Optional[str] = None,
+        template_args: Optional[Dict[str, Any]] = None,
+        render_as_batch: bool = False,
+        target_metadata: Union[MetaData, Sequence[MetaData], None] = None,
+        include_name: Optional[IncludeNameFn] = None,
+        include_object: Optional[IncludeObjectFn] = None,
+        include_schemas: bool = False,
+        process_revision_directives: Optional[
+            ProcessRevisionDirectiveFn
+        ] = None,
+        compare_type: Union[bool, CompareType] = True,
+        compare_server_default: Union[bool, CompareServerDefault] = False,
+        render_item: Optional[RenderItemFn] = None,
+        literal_binds: bool = False,
+        upgrade_token: str = "upgrades",
+        downgrade_token: str = "downgrades",
+        alembic_module_prefix: str = "op.",
+        sqlalchemy_module_prefix: str = "sa.",
+        user_module_prefix: Optional[str] = None,
+        on_version_apply: Optional[OnVersionApplyFn] = None,
+        **kw: Any,
+    ) -> None:
+        """Configure a :class:`.MigrationContext` within this
+        :class:`.EnvironmentContext` which will provide database
+        connectivity and other configuration to a series of
+        migration scripts.
+
+        Many methods on :class:`.EnvironmentContext` require that
+        this method has been called in order to function, as they
+        ultimately need to have database access or at least access
+        to the dialect in use.  Those which do are documented as such.
+
+        The important thing needed by :meth:`.configure` is a
+        means to determine what kind of database dialect is in use.
+        An actual connection to that database is needed only if
+        the :class:`.MigrationContext` is to be used in
+        "online" mode.
+
+        If the :meth:`.is_offline_mode` function returns ``True``,
+        then no connection is needed here.  Otherwise, the
+        ``connection`` parameter should be present as an
+        instance of :class:`sqlalchemy.engine.Connection`.
+
+        This function is typically called from the ``env.py``
+        script within a migration environment.  It can be called
+        multiple times for an invocation.  The most recent
+        :class:`~sqlalchemy.engine.Connection`
+        for which it was called is the one that will be operated upon
+        by the next call to :meth:`.run_migrations`.
+
+        General parameters:
+
+        :param connection: a :class:`~sqlalchemy.engine.Connection`
+         to use
+         for SQL execution in "online" mode.  When present, is also
+         used to determine the type of dialect in use.
+        :param url: a string database url, or a
+         :class:`sqlalchemy.engine.url.URL` object.
+         The type of dialect to be used will be derived from this if
+         ``connection`` is not passed.
+        :param dialect_name: string name of a dialect, such as
+         "postgresql", "mssql", etc.
+         The type of dialect to be used will be derived from this if
+         ``connection`` and ``url`` are not passed.
+        :param dialect_opts: dictionary of options to be passed to dialect
+         constructor.
+        :param transactional_ddl: Force the usage of "transactional"
+         DDL on or off;
+         this otherwise defaults to whether or not the dialect in
+         use supports it.
+        :param transaction_per_migration: if True, nest each migration script
+         in a transaction rather than the full series of migrations to
+         run.
+        :param output_buffer: a file-like object that will be used
+         for textual output
+         when the ``--sql`` option is used to generate SQL scripts.
+         Defaults to
+         ``sys.stdout`` if not passed here and also not present on
+         the :class:`.Config`
+         object.  The value here overrides that of the :class:`.Config`
+         object.
+        :param output_encoding: when using ``--sql`` to generate SQL
+         scripts, apply this encoding to the string output.
+        :param literal_binds: when using ``--sql`` to generate SQL
+         scripts, pass through the ``literal_binds`` flag to the compiler
+         so that any literal values that would ordinarily be bound
+         parameters are converted to plain strings.
+
+         .. warning:: Dialects can typically only handle simple datatypes
+            like strings and numbers for auto-literal generation.  Datatypes
+            like dates, intervals, and others may still require manual
+            formatting, typically using :meth:`.Operations.inline_literal`.
+
+         .. note:: the ``literal_binds`` flag is ignored on SQLAlchemy
+            versions prior to 0.8 where this feature is not supported.
+
+         .. seealso::
+
+            :meth:`.Operations.inline_literal`
+
+        :param starting_rev: Override the "starting revision" argument
+         when using ``--sql`` mode.
+        :param tag: a string tag for usage by custom ``env.py`` scripts.
+         Set via the ``--tag`` option, can be overridden here.
+        :param template_args: dictionary of template arguments which
+         will be added to the template argument environment when
+         running the "revision" command.   Note that the script environment
+         is only run within the "revision" command if the --autogenerate
+         option is used, or if the option "revision_environment=true"
+         is present in the alembic.ini file.
+
+        :param version_table: The name of the Alembic version table.
+         The default is ``'alembic_version'``.
+        :param version_table_schema: Optional schema to place version
+         table within.
+        :param version_table_pk: boolean, whether the Alembic version table
+         should use a primary key constraint for the "value" column; this
+         only takes effect when the table is first created.
+         Defaults to True; setting to False should not be necessary and is
+         here for backwards compatibility reasons.
+        :param on_version_apply: a callable or collection of callables to be
+            run for each migration step.
+            The callables will be run in the order they are given, once for
+            each migration step, after the respective operation has been
+            applied but before its transaction is finalized.
+            Each callable accepts no positional arguments and the following
+            keyword arguments:
+
+            * ``ctx``: the :class:`.MigrationContext` running the migration,
+            * ``step``: a :class:`.MigrationInfo` representing the
+              step currently being applied,
+            * ``heads``: a collection of version strings representing the
+              current heads,
+            * ``run_args``: the ``**kwargs`` passed to :meth:`.run_migrations`.
+
+        Parameters specific to the autogenerate feature, when
+        ``alembic revision`` is run with the ``--autogenerate`` feature:
+
+        :param target_metadata: a :class:`sqlalchemy.schema.MetaData`
+         object, or a sequence of :class:`~sqlalchemy.schema.MetaData`
+         objects, that will be consulted during autogeneration.
+         The tables present in each :class:`~sqlalchemy.schema.MetaData`
+         will be compared against
+         what is locally available on the target
+         :class:`~sqlalchemy.engine.Connection`
+         to produce candidate upgrade/downgrade operations.
+        :param compare_type: Indicates type comparison behavior during
+         an autogenerate
+         operation.  Defaults to ``True`` turning on type comparison, which
+         has good accuracy on most backends.   See :ref:`compare_types`
+         for an example as well as information on other type
+         comparison options. Set to ``False`` which disables type
+         comparison. A callable can also be passed to provide custom type
+         comparison, see :ref:`compare_types` for additional details.
+
+         .. versionchanged:: 1.12.0 The default value of
+            :paramref:`.EnvironmentContext.configure.compare_type` has been
+            changed to ``True``.
+
+         .. seealso::
+
+            :ref:`compare_types`
+
+            :paramref:`.EnvironmentContext.configure.compare_server_default`
+
+        :param compare_server_default: Indicates server default comparison
+         behavior during
+         an autogenerate operation.  Defaults to ``False`` which disables
+         server default
+         comparison.  Set to  ``True`` to turn on server default comparison,
+         which has
+         varied accuracy depending on backend.
+
+         To customize server default comparison behavior, a callable may
+         be specified
+         which can filter server default comparisons during an
+         autogenerate operation.
+         defaults during an autogenerate operation.   The format of this
+         callable is::
+
+            def my_compare_server_default(context, inspected_column,
+                        metadata_column, inspected_default, metadata_default,
+                        rendered_metadata_default):
+                # return True if the defaults are different,
+                # False if not, or None to allow the default implementation
+                # to compare these defaults
+                return None
+
+            context.configure(
+                # ...
+                compare_server_default = my_compare_server_default
+            )
+
+         ``inspected_column`` is a dictionary structure as returned by
+         :meth:`sqlalchemy.engine.reflection.Inspector.get_columns`, whereas
+         ``metadata_column`` is a :class:`sqlalchemy.schema.Column` from
+         the local model environment.
+
+         A return value of ``None`` indicates to allow default server default
+         comparison
+         to proceed.  Note that some backends such as Postgresql actually
+         execute
+         the two defaults on the database side to compare for equivalence.
+
+         .. seealso::
+
+            :paramref:`.EnvironmentContext.configure.compare_type`
+
+        :param include_name: A callable function which is given
+         the chance to return ``True`` or ``False`` for any database reflected
+         object based on its name, including database schema names when
+         the :paramref:`.EnvironmentContext.configure.include_schemas` flag
+         is set to ``True``.
+
+         The function accepts the following positional arguments:
+
+         * ``name``: the name of the object, such as schema name or table name.
+           Will be ``None`` when indicating the default schema name of the
+           database connection.
+         * ``type``: a string describing the type of object; currently
+           ``"schema"``, ``"table"``, ``"column"``, ``"index"``,
+           ``"unique_constraint"``, or ``"foreign_key_constraint"``
+         * ``parent_names``: a dictionary of "parent" object names, that are
+           relative to the name being given.  Keys in this dictionary may
+           include:  ``"schema_name"``, ``"table_name"`` or
+           ``"schema_qualified_table_name"``.
+
+         E.g.::
+
+            def include_name(name, type_, parent_names):
+                if type_ == "schema":
+                    return name in ["schema_one", "schema_two"]
+                else:
+                    return True
+
+            context.configure(
+                # ...
+                include_schemas = True,
+                include_name = include_name
+            )
+
+         .. seealso::
+
+            :ref:`autogenerate_include_hooks`
+
+            :paramref:`.EnvironmentContext.configure.include_object`
+
+            :paramref:`.EnvironmentContext.configure.include_schemas`
+
+
+        :param include_object: A callable function which is given
+         the chance to return ``True`` or ``False`` for any object,
+         indicating if the given object should be considered in the
+         autogenerate sweep.
+
+         The function accepts the following positional arguments:
+
+         * ``object``: a :class:`~sqlalchemy.schema.SchemaItem` object such
+           as a :class:`~sqlalchemy.schema.Table`,
+           :class:`~sqlalchemy.schema.Column`,
+           :class:`~sqlalchemy.schema.Index`
+           :class:`~sqlalchemy.schema.UniqueConstraint`,
+           or :class:`~sqlalchemy.schema.ForeignKeyConstraint` object
+         * ``name``: the name of the object. This is typically available
+           via ``object.name``.
+         * ``type``: a string describing the type of object; currently
+           ``"table"``, ``"column"``, ``"index"``, ``"unique_constraint"``,
+           or ``"foreign_key_constraint"``
+         * ``reflected``: ``True`` if the given object was produced based on
+           table reflection, ``False`` if it's from a local :class:`.MetaData`
+           object.
+         * ``compare_to``: the object being compared against, if available,
+           else ``None``.
+
+         E.g.::
+
+            def include_object(object, name, type_, reflected, compare_to):
+                if (type_ == "column" and
+                    not reflected and
+                    object.info.get("skip_autogenerate", False)):
+                    return False
+                else:
+                    return True
+
+            context.configure(
+                # ...
+                include_object = include_object
+            )
+
+         For the use case of omitting specific schemas from a target database
+         when :paramref:`.EnvironmentContext.configure.include_schemas` is
+         set to ``True``, the :attr:`~sqlalchemy.schema.Table.schema`
+         attribute can be checked for each :class:`~sqlalchemy.schema.Table`
+         object passed to the hook, however it is much more efficient
+         to filter on schemas before reflection of objects takes place
+         using the :paramref:`.EnvironmentContext.configure.include_name`
+         hook.
+
+         .. seealso::
+
+            :ref:`autogenerate_include_hooks`
+
+            :paramref:`.EnvironmentContext.configure.include_name`
+
+            :paramref:`.EnvironmentContext.configure.include_schemas`
+
+        :param render_as_batch: if True, commands which alter elements
+         within a table will be placed under a ``with batch_alter_table():``
+         directive, so that batch migrations will take place.
+
+         .. seealso::
+
+            :ref:`batch_migrations`
+
+        :param include_schemas: If True, autogenerate will scan across
+         all schemas located by the SQLAlchemy
+         :meth:`~sqlalchemy.engine.reflection.Inspector.get_schema_names`
+         method, and include all differences in tables found across all
+         those schemas.  When using this option, you may want to also
+         use the :paramref:`.EnvironmentContext.configure.include_name`
+         parameter to specify a callable which
+         can filter the tables/schemas that get included.
+
+         .. seealso::
+
+            :ref:`autogenerate_include_hooks`
+
+            :paramref:`.EnvironmentContext.configure.include_name`
+
+            :paramref:`.EnvironmentContext.configure.include_object`
+
+        :param render_item: Callable that can be used to override how
+         any schema item, i.e. column, constraint, type,
+         etc., is rendered for autogenerate.  The callable receives a
+         string describing the type of object, the object, and
+         the autogen context.  If it returns False, the
+         default rendering method will be used.  If it returns None,
+         the item will not be rendered in the context of a Table
+         construct, that is, can be used to skip columns or constraints
+         within op.create_table()::
+
+            def my_render_column(type_, col, autogen_context):
+                if type_ == "column" and isinstance(col, MySpecialCol):
+                    return repr(col)
+                else:
+                    return False
+
+            context.configure(
+                # ...
+                render_item = my_render_column
+            )
+
+         Available values for the type string include: ``"column"``,
+         ``"primary_key"``, ``"foreign_key"``, ``"unique"``, ``"check"``,
+         ``"type"``, ``"server_default"``.
+
+         .. seealso::
+
+            :ref:`autogen_render_types`
+
+        :param upgrade_token: When autogenerate completes, the text of the
+         candidate upgrade operations will be present in this template
+         variable when ``script.py.mako`` is rendered.  Defaults to
+         ``upgrades``.
+        :param downgrade_token: When autogenerate completes, the text of the
+         candidate downgrade operations will be present in this
+         template variable when ``script.py.mako`` is rendered.  Defaults to
+         ``downgrades``.
+
+        :param alembic_module_prefix: When autogenerate refers to Alembic
+         :mod:`alembic.operations` constructs, this prefix will be used
+         (i.e. ``op.create_table``)  Defaults to "``op.``".
+         Can be ``None`` to indicate no prefix.
+
+        :param sqlalchemy_module_prefix: When autogenerate refers to
+         SQLAlchemy
+         :class:`~sqlalchemy.schema.Column` or type classes, this prefix
+         will be used
+         (i.e. ``sa.Column("somename", sa.Integer)``)  Defaults to "``sa.``".
+         Can be ``None`` to indicate no prefix.
+         Note that when dialect-specific types are rendered, autogenerate
+         will render them using the dialect module name, i.e. ``mssql.BIT()``,
+         ``postgresql.UUID()``.
+
+        :param user_module_prefix: When autogenerate refers to a SQLAlchemy
+         type (e.g. :class:`.TypeEngine`) where the module name is not
+         under the ``sqlalchemy`` namespace, this prefix will be used
+         within autogenerate.  If left at its default of
+         ``None``, the ``__module__`` attribute of the type is used to
+         render the import module.   It's a good practice to set this
+         and to have all custom types be available from a fixed module space,
+         in order to future-proof migration files against reorganizations
+         in modules.
+
+         .. seealso::
+
+            :ref:`autogen_module_prefix`
+
+        :param process_revision_directives: a callable function that will
+         be passed a structure representing the end result of an autogenerate
+         or plain "revision" operation, which can be manipulated to affect
+         how the ``alembic revision`` command ultimately outputs new
+         revision scripts.   The structure of the callable is::
+
+            def process_revision_directives(context, revision, directives):
+                pass
+
+         The ``directives`` parameter is a Python list containing
+         a single :class:`.MigrationScript` directive, which represents
+         the revision file to be generated.    This list as well as its
+         contents may be freely modified to produce any set of commands.
+         The section :ref:`customizing_revision` shows an example of
+         doing this.  The ``context`` parameter is the
+         :class:`.MigrationContext` in use,
+         and ``revision`` is a tuple of revision identifiers representing the
+         current revision of the database.
+
+         The callable is invoked at all times when the ``--autogenerate``
+         option is passed to ``alembic revision``.  If ``--autogenerate``
+         is not passed, the callable is invoked only if the
+         ``revision_environment`` variable is set to True in the Alembic
+         configuration, in which case the given ``directives`` collection
+         will contain empty :class:`.UpgradeOps` and :class:`.DowngradeOps`
+         collections for ``.upgrade_ops`` and ``.downgrade_ops``.  The
+         ``--autogenerate`` option itself can be inferred by inspecting
+         ``context.config.cmd_opts.autogenerate``.
+
+         The callable function may optionally be an instance of
+         a :class:`.Rewriter` object.  This is a helper object that
+         assists in the production of autogenerate-stream rewriter functions.
+
+         .. seealso::
+
+             :ref:`customizing_revision`
+
+             :ref:`autogen_rewriter`
+
+             :paramref:`.command.revision.process_revision_directives`
+
+        Parameters specific to individual backends:
+
+        :param mssql_batch_separator: The "batch separator" which will
+         be placed between each statement when generating offline SQL Server
+         migrations.  Defaults to ``GO``.  Note this is in addition to the
+         customary semicolon ``;`` at the end of each statement; SQL Server
+         considers the "batch separator" to denote the end of an
+         individual statement execution, and cannot group certain
+         dependent operations in one step.
+        :param oracle_batch_separator: The "batch separator" which will
+         be placed between each statement when generating offline
+         Oracle migrations.  Defaults to ``/``.  Oracle doesn't add a
+         semicolon between statements like most other backends.
+
+        """
+        opts = self.context_opts
+        if transactional_ddl is not None:
+            opts["transactional_ddl"] = transactional_ddl
+        if output_buffer is not None:
+            opts["output_buffer"] = output_buffer
+        elif self.config.output_buffer is not None:
+            opts["output_buffer"] = self.config.output_buffer
+        if starting_rev:
+            opts["starting_rev"] = starting_rev
+        if tag:
+            opts["tag"] = tag
+        if template_args and "template_args" in opts:
+            opts["template_args"].update(template_args)
+        opts["transaction_per_migration"] = transaction_per_migration
+        opts["target_metadata"] = target_metadata
+        opts["include_name"] = include_name
+        opts["include_object"] = include_object
+        opts["include_schemas"] = include_schemas
+        opts["render_as_batch"] = render_as_batch
+        opts["upgrade_token"] = upgrade_token
+        opts["downgrade_token"] = downgrade_token
+        opts["sqlalchemy_module_prefix"] = sqlalchemy_module_prefix
+        opts["alembic_module_prefix"] = alembic_module_prefix
+        opts["user_module_prefix"] = user_module_prefix
+        opts["literal_binds"] = literal_binds
+        opts["process_revision_directives"] = process_revision_directives
+        opts["on_version_apply"] = util.to_tuple(on_version_apply, default=())
+
+        if render_item is not None:
+            opts["render_item"] = render_item
+        opts["compare_type"] = compare_type
+        if compare_server_default is not None:
+            opts["compare_server_default"] = compare_server_default
+        opts["script"] = self.script
+
+        opts.update(kw)
+
+        self._migration_context = MigrationContext.configure(
+            connection=connection,
+            url=url,
+            dialect_name=dialect_name,
+            environment_context=self,
+            dialect_opts=dialect_opts,
+            opts=opts,
+        )
+
+    def run_migrations(self, **kw: Any) -> None:
+        """Run migrations as determined by the current command line
+        configuration
+        as well as versioning information present (or not) in the current
+        database connection (if one is present).
+
+        The function accepts optional ``**kw`` arguments.   If these are
+        passed, they are sent directly to the ``upgrade()`` and
+        ``downgrade()``
+        functions within each target revision file.   By modifying the
+        ``script.py.mako`` file so that the ``upgrade()`` and ``downgrade()``
+        functions accept arguments, parameters can be passed here so that
+        contextual information, usually information to identify a particular
+        database in use, can be passed from a custom ``env.py`` script
+        to the migration functions.
+
+        This function requires that a :class:`.MigrationContext` has
+        first been made available via :meth:`.configure`.
+
+        """
+        assert self._migration_context is not None
+        with Operations.context(self._migration_context):
+            self.get_context().run_migrations(**kw)
+
+    def execute(
+        self,
+        sql: Union[Executable, str],
+        execution_options: Optional[Dict[str, Any]] = None,
+    ) -> None:
+        """Execute the given SQL using the current change context.
+
+        The behavior of :meth:`.execute` is the same
+        as that of :meth:`.Operations.execute`.  Please see that
+        function's documentation for full detail including
+        caveats and limitations.
+
+        This function requires that a :class:`.MigrationContext` has
+        first been made available via :meth:`.configure`.
+
+        """
+        self.get_context().execute(sql, execution_options=execution_options)
+
+    def static_output(self, text: str) -> None:
+        """Emit text directly to the "offline" SQL stream.
+
+        Typically this is for emitting comments that
+        start with --.  The statement is not treated
+        as a SQL execution, no ; or batch separator
+        is added, etc.
+
+        """
+        self.get_context().impl.static_output(text)
+
+    def begin_transaction(
+        self,
+    ) -> Union[_ProxyTransaction, ContextManager[None, Optional[bool]]]:
+        """Return a context manager that will
+        enclose an operation within a "transaction",
+        as defined by the environment's offline
+        and transactional DDL settings.
+
+        e.g.::
+
+            with context.begin_transaction():
+                context.run_migrations()
+
+        :meth:`.begin_transaction` is intended to
+        "do the right thing" regardless of
+        calling context:
+
+        * If :meth:`.is_transactional_ddl` is ``False``,
+          returns a "do nothing" context manager
+          which otherwise produces no transactional
+          state or directives.
+        * If :meth:`.is_offline_mode` is ``True``,
+          returns a context manager that will
+          invoke the :meth:`.DefaultImpl.emit_begin`
+          and :meth:`.DefaultImpl.emit_commit`
+          methods, which will produce the string
+          directives ``BEGIN`` and ``COMMIT`` on
+          the output stream, as rendered by the
+          target backend (e.g. SQL Server would
+          emit ``BEGIN TRANSACTION``).
+        * Otherwise, calls :meth:`sqlalchemy.engine.Connection.begin`
+          on the current online connection, which
+          returns a :class:`sqlalchemy.engine.Transaction`
+          object.  This object demarcates a real
+          transaction and is itself a context manager,
+          which will roll back if an exception
+          is raised.
+
+        Note that a custom ``env.py`` script which
+        has more specific transactional needs can of course
+        manipulate the :class:`~sqlalchemy.engine.Connection`
+        directly to produce transactional state in "online"
+        mode.
+
+        """
+
+        return self.get_context().begin_transaction()
+
+    def get_context(self) -> MigrationContext:
+        """Return the current :class:`.MigrationContext` object.
+
+        If :meth:`.EnvironmentContext.configure` has not been
+        called yet, raises an exception.
+
+        """
+
+        if self._migration_context is None:
+            raise Exception("No context has been configured yet.")
+        return self._migration_context
+
+    def get_bind(self) -> Connection:
+        """Return the current 'bind'.
+
+        In "online" mode, this is the
+        :class:`sqlalchemy.engine.Connection` currently being used
+        to emit SQL to the database.
+
+        This function requires that a :class:`.MigrationContext`
+        has first been made available via :meth:`.configure`.
+
+        """
+        return self.get_context().bind  # type: ignore[return-value]
+
+    def get_impl(self) -> DefaultImpl:
+        return self.get_context().impl
diff --git a/.venv/lib/python3.12/site-packages/alembic/runtime/migration.py b/.venv/lib/python3.12/site-packages/alembic/runtime/migration.py
new file mode 100644
index 00000000..ac431a62
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/runtime/migration.py
@@ -0,0 +1,1391 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from __future__ import annotations
+
+from contextlib import contextmanager
+from contextlib import nullcontext
+import logging
+import sys
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Collection
+from typing import Dict
+from typing import Iterable
+from typing import Iterator
+from typing import List
+from typing import Optional
+from typing import Set
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy import Column
+from sqlalchemy import literal_column
+from sqlalchemy import select
+from sqlalchemy.engine import Engine
+from sqlalchemy.engine import url as sqla_url
+from sqlalchemy.engine.strategies import MockEngineStrategy
+from typing_extensions import ContextManager
+
+from .. import ddl
+from .. import util
+from ..util import sqla_compat
+from ..util.compat import EncodedIO
+
+if TYPE_CHECKING:
+    from sqlalchemy.engine import Dialect
+    from sqlalchemy.engine import URL
+    from sqlalchemy.engine.base import Connection
+    from sqlalchemy.engine.base import Transaction
+    from sqlalchemy.engine.mock import MockConnection
+    from sqlalchemy.sql import Executable
+
+    from .environment import EnvironmentContext
+    from ..config import Config
+    from ..script.base import Script
+    from ..script.base import ScriptDirectory
+    from ..script.revision import _RevisionOrBase
+    from ..script.revision import Revision
+    from ..script.revision import RevisionMap
+
+log = logging.getLogger(__name__)
+
+
+class _ProxyTransaction:
+    def __init__(self, migration_context: MigrationContext) -> None:
+        self.migration_context = migration_context
+
+    @property
+    def _proxied_transaction(self) -> Optional[Transaction]:
+        return self.migration_context._transaction
+
+    def rollback(self) -> None:
+        t = self._proxied_transaction
+        assert t is not None
+        t.rollback()
+        self.migration_context._transaction = None
+
+    def commit(self) -> None:
+        t = self._proxied_transaction
+        assert t is not None
+        t.commit()
+        self.migration_context._transaction = None
+
+    def __enter__(self) -> _ProxyTransaction:
+        return self
+
+    def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
+        if self._proxied_transaction is not None:
+            self._proxied_transaction.__exit__(type_, value, traceback)
+            self.migration_context._transaction = None
+
+
+class MigrationContext:
+    """Represent the database state made available to a migration
+    script.
+
+    :class:`.MigrationContext` is the front end to an actual
+    database connection, or alternatively a string output
+    stream given a particular database dialect,
+    from an Alembic perspective.
+
+    When inside the ``env.py`` script, the :class:`.MigrationContext`
+    is available via the
+    :meth:`.EnvironmentContext.get_context` method,
+    which is available at ``alembic.context``::
+
+        # from within env.py script
+        from alembic import context
+
+        migration_context = context.get_context()
+
+    For usage outside of an ``env.py`` script, such as for
+    utility routines that want to check the current version
+    in the database, the :meth:`.MigrationContext.configure`
+    method to create new :class:`.MigrationContext` objects.
+    For example, to get at the current revision in the
+    database using :meth:`.MigrationContext.get_current_revision`::
+
+        # in any application, outside of an env.py script
+        from alembic.migration import MigrationContext
+        from sqlalchemy import create_engine
+
+        engine = create_engine("postgresql://mydatabase")
+        conn = engine.connect()
+
+        context = MigrationContext.configure(conn)
+        current_rev = context.get_current_revision()
+
+    The above context can also be used to produce
+    Alembic migration operations with an :class:`.Operations`
+    instance::
+
+        # in any application, outside of the normal Alembic environment
+        from alembic.operations import Operations
+
+        op = Operations(context)
+        op.alter_column("mytable", "somecolumn", nullable=True)
+
+    """
+
+    def __init__(
+        self,
+        dialect: Dialect,
+        connection: Optional[Connection],
+        opts: Dict[str, Any],
+        environment_context: Optional[EnvironmentContext] = None,
+    ) -> None:
+        self.environment_context = environment_context
+        self.opts = opts
+        self.dialect = dialect
+        self.script: Optional[ScriptDirectory] = opts.get("script")
+        as_sql: bool = opts.get("as_sql", False)
+        transactional_ddl = opts.get("transactional_ddl")
+        self._transaction_per_migration = opts.get(
+            "transaction_per_migration", False
+        )
+        self.on_version_apply_callbacks = opts.get("on_version_apply", ())
+        self._transaction: Optional[Transaction] = None
+
+        if as_sql:
+            self.connection = cast(
+                Optional["Connection"], self._stdout_connection(connection)
+            )
+            assert self.connection is not None
+            self._in_external_transaction = False
+        else:
+            self.connection = connection
+            self._in_external_transaction = (
+                sqla_compat._get_connection_in_transaction(connection)
+            )
+
+        self._migrations_fn: Optional[
+            Callable[..., Iterable[RevisionStep]]
+        ] = opts.get("fn")
+        self.as_sql = as_sql
+
+        self.purge = opts.get("purge", False)
+
+        if "output_encoding" in opts:
+            self.output_buffer = EncodedIO(
+                opts.get("output_buffer")
+                or sys.stdout,  # type:ignore[arg-type]
+                opts["output_encoding"],
+            )
+        else:
+            self.output_buffer = opts.get("output_buffer", sys.stdout)
+
+        self._user_compare_type = opts.get("compare_type", True)
+        self._user_compare_server_default = opts.get(
+            "compare_server_default", False
+        )
+        self.version_table = version_table = opts.get(
+            "version_table", "alembic_version"
+        )
+        self.version_table_schema = version_table_schema = opts.get(
+            "version_table_schema", None
+        )
+
+        self._start_from_rev: Optional[str] = opts.get("starting_rev")
+        self.impl = ddl.DefaultImpl.get_by_dialect(dialect)(
+            dialect,
+            self.connection,
+            self.as_sql,
+            transactional_ddl,
+            self.output_buffer,
+            opts,
+        )
+
+        self._version = self.impl.version_table_impl(
+            version_table=version_table,
+            version_table_schema=version_table_schema,
+            version_table_pk=opts.get("version_table_pk", True),
+        )
+
+        log.info("Context impl %s.", self.impl.__class__.__name__)
+        if self.as_sql:
+            log.info("Generating static SQL")
+        log.info(
+            "Will assume %s DDL.",
+            (
+                "transactional"
+                if self.impl.transactional_ddl
+                else "non-transactional"
+            ),
+        )
+
+    @classmethod
+    def configure(
+        cls,
+        connection: Optional[Connection] = None,
+        url: Optional[Union[str, URL]] = None,
+        dialect_name: Optional[str] = None,
+        dialect: Optional[Dialect] = None,
+        environment_context: Optional[EnvironmentContext] = None,
+        dialect_opts: Optional[Dict[str, str]] = None,
+        opts: Optional[Any] = None,
+    ) -> MigrationContext:
+        """Create a new :class:`.MigrationContext`.
+
+        This is a factory method usually called
+        by :meth:`.EnvironmentContext.configure`.
+
+        :param connection: a :class:`~sqlalchemy.engine.Connection`
+         to use for SQL execution in "online" mode.  When present,
+         is also used to determine the type of dialect in use.
+        :param url: a string database url, or a
+         :class:`sqlalchemy.engine.url.URL` object.
+         The type of dialect to be used will be derived from this if
+         ``connection`` is not passed.
+        :param dialect_name: string name of a dialect, such as
+         "postgresql", "mssql", etc.  The type of dialect to be used will be
+         derived from this if ``connection`` and ``url`` are not passed.
+        :param opts: dictionary of options.  Most other options
+         accepted by :meth:`.EnvironmentContext.configure` are passed via
+         this dictionary.
+
+        """
+        if opts is None:
+            opts = {}
+        if dialect_opts is None:
+            dialect_opts = {}
+
+        if connection:
+            if isinstance(connection, Engine):
+                raise util.CommandError(
+                    "'connection' argument to configure() is expected "
+                    "to be a sqlalchemy.engine.Connection instance, "
+                    "got %r" % connection,
+                )
+
+            dialect = connection.dialect
+        elif url:
+            url_obj = sqla_url.make_url(url)
+            dialect = url_obj.get_dialect()(**dialect_opts)
+        elif dialect_name:
+            url_obj = sqla_url.make_url("%s://" % dialect_name)
+            dialect = url_obj.get_dialect()(**dialect_opts)
+        elif not dialect:
+            raise Exception("Connection, url, or dialect_name is required.")
+        assert dialect is not None
+        return MigrationContext(dialect, connection, opts, environment_context)
+
+    @contextmanager
+    def autocommit_block(self) -> Iterator[None]:
+        """Enter an "autocommit" block, for databases that support AUTOCOMMIT
+        isolation levels.
+
+        This special directive is intended to support the occasional database
+        DDL or system operation that specifically has to be run outside of
+        any kind of transaction block.   The PostgreSQL database platform
+        is the most common target for this style of operation, as many
+        of its DDL operations must be run outside of transaction blocks, even
+        though the database overall supports transactional DDL.
+
+        The method is used as a context manager within a migration script, by
+        calling on :meth:`.Operations.get_context` to retrieve the
+        :class:`.MigrationContext`, then invoking
+        :meth:`.MigrationContext.autocommit_block` using the ``with:``
+        statement::
+
+            def upgrade():
+                with op.get_context().autocommit_block():
+                    op.execute("ALTER TYPE mood ADD VALUE 'soso'")
+
+        Above, a PostgreSQL "ALTER TYPE..ADD VALUE" directive is emitted,
+        which must be run outside of a transaction block at the database level.
+        The :meth:`.MigrationContext.autocommit_block` method makes use of the
+        SQLAlchemy ``AUTOCOMMIT`` isolation level setting, which against the
+        psycogp2 DBAPI corresponds to the ``connection.autocommit`` setting,
+        to ensure that the database driver is not inside of a DBAPI level
+        transaction block.
+
+        .. warning::
+
+            As is necessary, **the database transaction preceding the block is
+            unconditionally committed**.  This means that the run of migrations
+            preceding the operation will be committed, before the overall
+            migration operation is complete.
+
+            It is recommended that when an application includes migrations with
+            "autocommit" blocks, that
+            :paramref:`.EnvironmentContext.transaction_per_migration` be used
+            so that the calling environment is tuned to expect short per-file
+            migrations whether or not one of them has an autocommit block.
+
+
+        """
+        _in_connection_transaction = self._in_connection_transaction()
+
+        if self.impl.transactional_ddl and self.as_sql:
+            self.impl.emit_commit()
+
+        elif _in_connection_transaction:
+            assert self._transaction is not None
+
+            self._transaction.commit()
+            self._transaction = None
+
+        if not self.as_sql:
+            assert self.connection is not None
+            current_level = self.connection.get_isolation_level()
+            base_connection = self.connection
+
+            # in 1.3 and 1.4 non-future mode, the connection gets switched
+            # out.  we can use the base connection with the new mode
+            # except that it will not know it's in "autocommit" and will
+            # emit deprecation warnings when an autocommit action takes
+            # place.
+            self.connection = self.impl.connection = (
+                base_connection.execution_options(isolation_level="AUTOCOMMIT")
+            )
+
+            # sqlalchemy future mode will "autobegin" in any case, so take
+            # control of that "transaction" here
+            fake_trans: Optional[Transaction] = self.connection.begin()
+        else:
+            fake_trans = None
+        try:
+            yield
+        finally:
+            if not self.as_sql:
+                assert self.connection is not None
+                if fake_trans is not None:
+                    fake_trans.commit()
+                self.connection.execution_options(
+                    isolation_level=current_level
+                )
+                self.connection = self.impl.connection = base_connection
+
+            if self.impl.transactional_ddl and self.as_sql:
+                self.impl.emit_begin()
+
+            elif _in_connection_transaction:
+                assert self.connection is not None
+                self._transaction = self.connection.begin()
+
+    def begin_transaction(
+        self, _per_migration: bool = False
+    ) -> Union[_ProxyTransaction, ContextManager[None, Optional[bool]]]:
+        """Begin a logical transaction for migration operations.
+
+        This method is used within an ``env.py`` script to demarcate where
+        the outer "transaction" for a series of migrations begins.  Example::
+
+            def run_migrations_online():
+                connectable = create_engine(...)
+
+                with connectable.connect() as connection:
+                    context.configure(
+                        connection=connection, target_metadata=target_metadata
+                    )
+
+                    with context.begin_transaction():
+                        context.run_migrations()
+
+        Above, :meth:`.MigrationContext.begin_transaction` is used to demarcate
+        where the outer logical transaction occurs around the
+        :meth:`.MigrationContext.run_migrations` operation.
+
+        A "Logical" transaction means that the operation may or may not
+        correspond to a real database transaction.   If the target database
+        supports transactional DDL (or
+        :paramref:`.EnvironmentContext.configure.transactional_ddl` is true),
+        the :paramref:`.EnvironmentContext.configure.transaction_per_migration`
+        flag is not set, and the migration is against a real database
+        connection (as opposed to using "offline" ``--sql`` mode), a real
+        transaction will be started.   If ``--sql`` mode is in effect, the
+        operation would instead correspond to a string such as "BEGIN" being
+        emitted to the string output.
+
+        The returned object is a Python context manager that should only be
+        used in the context of a ``with:`` statement as indicated above.
+        The object has no other guaranteed API features present.
+
+        .. seealso::
+
+            :meth:`.MigrationContext.autocommit_block`
+
+        """
+
+        if self._in_external_transaction:
+            return nullcontext()
+
+        if self.impl.transactional_ddl:
+            transaction_now = _per_migration == self._transaction_per_migration
+        else:
+            transaction_now = _per_migration is True
+
+        if not transaction_now:
+            return nullcontext()
+
+        elif not self.impl.transactional_ddl:
+            assert _per_migration
+
+            if self.as_sql:
+                return nullcontext()
+            else:
+                # track our own notion of a "transaction block", which must be
+                # committed when complete.   Don't rely upon whether or not the
+                # SQLAlchemy connection reports as "in transaction"; this
+                # because SQLAlchemy future connection features autobegin
+                # behavior, so it may already be in a transaction from our
+                # emitting of queries like "has_version_table", etc. While we
+                # could track these operations as well, that leaves open the
+                # possibility of new operations or other things happening in
+                # the user environment that still may be triggering
+                # "autobegin".
+
+                in_transaction = self._transaction is not None
+
+                if in_transaction:
+                    return nullcontext()
+                else:
+                    assert self.connection is not None
+                    self._transaction = (
+                        sqla_compat._safe_begin_connection_transaction(
+                            self.connection
+                        )
+                    )
+                    return _ProxyTransaction(self)
+        elif self.as_sql:
+
+            @contextmanager
+            def begin_commit():
+                self.impl.emit_begin()
+                yield
+                self.impl.emit_commit()
+
+            return begin_commit()
+        else:
+            assert self.connection is not None
+            self._transaction = sqla_compat._safe_begin_connection_transaction(
+                self.connection
+            )
+            return _ProxyTransaction(self)
+
+    def get_current_revision(self) -> Optional[str]:
+        """Return the current revision, usually that which is present
+        in the ``alembic_version`` table in the database.
+
+        This method intends to be used only for a migration stream that
+        does not contain unmerged branches in the target database;
+        if there are multiple branches present, an exception is raised.
+        The :meth:`.MigrationContext.get_current_heads` should be preferred
+        over this method going forward in order to be compatible with
+        branch migration support.
+
+        If this :class:`.MigrationContext` was configured in "offline"
+        mode, that is with ``as_sql=True``, the ``starting_rev``
+        parameter is returned instead, if any.
+
+        """
+        heads = self.get_current_heads()
+        if len(heads) == 0:
+            return None
+        elif len(heads) > 1:
+            raise util.CommandError(
+                "Version table '%s' has more than one head present; "
+                "please use get_current_heads()" % self.version_table
+            )
+        else:
+            return heads[0]
+
+    def get_current_heads(self) -> Tuple[str, ...]:
+        """Return a tuple of the current 'head versions' that are represented
+        in the target database.
+
+        For a migration stream without branches, this will be a single
+        value, synonymous with that of
+        :meth:`.MigrationContext.get_current_revision`.   However when multiple
+        unmerged branches exist within the target database, the returned tuple
+        will contain a value for each head.
+
+        If this :class:`.MigrationContext` was configured in "offline"
+        mode, that is with ``as_sql=True``, the ``starting_rev``
+        parameter is returned in a one-length tuple.
+
+        If no version table is present, or if there are no revisions
+        present, an empty tuple is returned.
+
+        """
+        if self.as_sql:
+            start_from_rev: Any = self._start_from_rev
+            if start_from_rev == "base":
+                start_from_rev = None
+            elif start_from_rev is not None and self.script:
+                start_from_rev = [
+                    self.script.get_revision(sfr).revision
+                    for sfr in util.to_list(start_from_rev)
+                    if sfr not in (None, "base")
+                ]
+            return util.to_tuple(start_from_rev, default=())
+        else:
+            if self._start_from_rev:
+                raise util.CommandError(
+                    "Can't specify current_rev to context "
+                    "when using a database connection"
+                )
+            if not self._has_version_table():
+                return ()
+        assert self.connection is not None
+        return tuple(
+            row[0]
+            for row in self.connection.execute(
+                select(self._version.c.version_num)
+            )
+        )
+
+    def _ensure_version_table(self, purge: bool = False) -> None:
+        with sqla_compat._ensure_scope_for_ddl(self.connection):
+            assert self.connection is not None
+            self._version.create(self.connection, checkfirst=True)
+            if purge:
+                assert self.connection is not None
+                self.connection.execute(self._version.delete())
+
+    def _has_version_table(self) -> bool:
+        assert self.connection is not None
+        return sqla_compat._connectable_has_table(
+            self.connection, self.version_table, self.version_table_schema
+        )
+
+    def stamp(self, script_directory: ScriptDirectory, revision: str) -> None:
+        """Stamp the version table with a specific revision.
+
+        This method calculates those branches to which the given revision
+        can apply, and updates those branches as though they were migrated
+        towards that revision (either up or down).  If no current branches
+        include the revision, it is added as a new branch head.
+
+        """
+        heads = self.get_current_heads()
+        if not self.as_sql and not heads:
+            self._ensure_version_table()
+        head_maintainer = HeadMaintainer(self, heads)
+        for step in script_directory._stamp_revs(revision, heads):
+            head_maintainer.update_to_step(step)
+
+    def run_migrations(self, **kw: Any) -> None:
+        r"""Run the migration scripts established for this
+        :class:`.MigrationContext`, if any.
+
+        The commands in :mod:`alembic.command` will set up a function
+        that is ultimately passed to the :class:`.MigrationContext`
+        as the ``fn`` argument.  This function represents the "work"
+        that will be done when :meth:`.MigrationContext.run_migrations`
+        is called, typically from within the ``env.py`` script of the
+        migration environment.  The "work function" then provides an iterable
+        of version callables and other version information which
+        in the case of the ``upgrade`` or ``downgrade`` commands are the
+        list of version scripts to invoke.  Other commands yield nothing,
+        in the case that a command wants to run some other operation
+        against the database such as the ``current`` or ``stamp`` commands.
+
+        :param \**kw: keyword arguments here will be passed to each
+         migration callable, that is the ``upgrade()`` or ``downgrade()``
+         method within revision scripts.
+
+        """
+        self.impl.start_migrations()
+
+        heads: Tuple[str, ...]
+        if self.purge:
+            if self.as_sql:
+                raise util.CommandError("Can't use --purge with --sql mode")
+            self._ensure_version_table(purge=True)
+            heads = ()
+        else:
+            heads = self.get_current_heads()
+
+            dont_mutate = self.opts.get("dont_mutate", False)
+
+            if not self.as_sql and not heads and not dont_mutate:
+                self._ensure_version_table()
+
+        head_maintainer = HeadMaintainer(self, heads)
+
+        assert self._migrations_fn is not None
+        for step in self._migrations_fn(heads, self):
+            with self.begin_transaction(_per_migration=True):
+                if self.as_sql and not head_maintainer.heads:
+                    # for offline mode, include a CREATE TABLE from
+                    # the base
+                    assert self.connection is not None
+                    self._version.create(self.connection)
+                log.info("Running %s", step)
+                if self.as_sql:
+                    self.impl.static_output(
+                        "-- Running %s" % (step.short_log,)
+                    )
+                step.migration_fn(**kw)
+
+                # previously, we wouldn't stamp per migration
+                # if we were in a transaction, however given the more
+                # complex model that involves any number of inserts
+                # and row-targeted updates and deletes, it's simpler for now
+                # just to run the operations on every version
+                head_maintainer.update_to_step(step)
+                for callback in self.on_version_apply_callbacks:
+                    callback(
+                        ctx=self,
+                        step=step.info,
+                        heads=set(head_maintainer.heads),
+                        run_args=kw,
+                    )
+
+        if self.as_sql and not head_maintainer.heads:
+            assert self.connection is not None
+            self._version.drop(self.connection)
+
+    def _in_connection_transaction(self) -> bool:
+        try:
+            meth = self.connection.in_transaction  # type:ignore[union-attr]
+        except AttributeError:
+            return False
+        else:
+            return meth()
+
+    def execute(
+        self,
+        sql: Union[Executable, str],
+        execution_options: Optional[Dict[str, Any]] = None,
+    ) -> None:
+        """Execute a SQL construct or string statement.
+
+        The underlying execution mechanics are used, that is
+        if this is "offline mode" the SQL is written to the
+        output buffer, otherwise the SQL is emitted on
+        the current SQLAlchemy connection.
+
+        """
+        self.impl._exec(sql, execution_options)
+
+    def _stdout_connection(
+        self, connection: Optional[Connection]
+    ) -> MockConnection:
+        def dump(construct, *multiparams, **params):
+            self.impl._exec(construct)
+
+        return MockEngineStrategy.MockConnection(self.dialect, dump)
+
+    @property
+    def bind(self) -> Optional[Connection]:
+        """Return the current "bind".
+
+        In online mode, this is an instance of
+        :class:`sqlalchemy.engine.Connection`, and is suitable
+        for ad-hoc execution of any kind of usage described
+        in SQLAlchemy Core documentation as well as
+        for usage with the :meth:`sqlalchemy.schema.Table.create`
+        and :meth:`sqlalchemy.schema.MetaData.create_all` methods
+        of :class:`~sqlalchemy.schema.Table`,
+        :class:`~sqlalchemy.schema.MetaData`.
+
+        Note that when "standard output" mode is enabled,
+        this bind will be a "mock" connection handler that cannot
+        return results and is only appropriate for a very limited
+        subset of commands.
+
+        """
+        return self.connection
+
+    @property
+    def config(self) -> Optional[Config]:
+        """Return the :class:`.Config` used by the current environment,
+        if any."""
+
+        if self.environment_context:
+            return self.environment_context.config
+        else:
+            return None
+
+    def _compare_type(
+        self, inspector_column: Column[Any], metadata_column: Column
+    ) -> bool:
+        if self._user_compare_type is False:
+            return False
+
+        if callable(self._user_compare_type):
+            user_value = self._user_compare_type(
+                self,
+                inspector_column,
+                metadata_column,
+                inspector_column.type,
+                metadata_column.type,
+            )
+            if user_value is not None:
+                return user_value
+
+        return self.impl.compare_type(inspector_column, metadata_column)
+
+    def _compare_server_default(
+        self,
+        inspector_column: Column[Any],
+        metadata_column: Column[Any],
+        rendered_metadata_default: Optional[str],
+        rendered_column_default: Optional[str],
+    ) -> bool:
+        if self._user_compare_server_default is False:
+            return False
+
+        if callable(self._user_compare_server_default):
+            user_value = self._user_compare_server_default(
+                self,
+                inspector_column,
+                metadata_column,
+                rendered_column_default,
+                metadata_column.server_default,
+                rendered_metadata_default,
+            )
+            if user_value is not None:
+                return user_value
+
+        return self.impl.compare_server_default(
+            inspector_column,
+            metadata_column,
+            rendered_metadata_default,
+            rendered_column_default,
+        )
+
+
+class HeadMaintainer:
+    def __init__(self, context: MigrationContext, heads: Any) -> None:
+        self.context = context
+        self.heads = set(heads)
+
+    def _insert_version(self, version: str) -> None:
+        assert version not in self.heads
+        self.heads.add(version)
+
+        self.context.impl._exec(
+            self.context._version.insert().values(
+                version_num=literal_column("'%s'" % version)
+            )
+        )
+
+    def _delete_version(self, version: str) -> None:
+        self.heads.remove(version)
+
+        ret = self.context.impl._exec(
+            self.context._version.delete().where(
+                self.context._version.c.version_num
+                == literal_column("'%s'" % version)
+            )
+        )
+
+        if (
+            not self.context.as_sql
+            and self.context.dialect.supports_sane_rowcount
+            and ret is not None
+            and ret.rowcount != 1
+        ):
+            raise util.CommandError(
+                "Online migration expected to match one "
+                "row when deleting '%s' in '%s'; "
+                "%d found"
+                % (version, self.context.version_table, ret.rowcount)
+            )
+
+    def _update_version(self, from_: str, to_: str) -> None:
+        assert to_ not in self.heads
+        self.heads.remove(from_)
+        self.heads.add(to_)
+
+        ret = self.context.impl._exec(
+            self.context._version.update()
+            .values(version_num=literal_column("'%s'" % to_))
+            .where(
+                self.context._version.c.version_num
+                == literal_column("'%s'" % from_)
+            )
+        )
+
+        if (
+            not self.context.as_sql
+            and self.context.dialect.supports_sane_rowcount
+            and ret is not None
+            and ret.rowcount != 1
+        ):
+            raise util.CommandError(
+                "Online migration expected to match one "
+                "row when updating '%s' to '%s' in '%s'; "
+                "%d found"
+                % (from_, to_, self.context.version_table, ret.rowcount)
+            )
+
+    def update_to_step(self, step: Union[RevisionStep, StampStep]) -> None:
+        if step.should_delete_branch(self.heads):
+            vers = step.delete_version_num
+            log.debug("branch delete %s", vers)
+            self._delete_version(vers)
+        elif step.should_create_branch(self.heads):
+            vers = step.insert_version_num
+            log.debug("new branch insert %s", vers)
+            self._insert_version(vers)
+        elif step.should_merge_branches(self.heads):
+            # delete revs, update from rev, update to rev
+            (
+                delete_revs,
+                update_from_rev,
+                update_to_rev,
+            ) = step.merge_branch_idents(self.heads)
+            log.debug(
+                "merge, delete %s, update %s to %s",
+                delete_revs,
+                update_from_rev,
+                update_to_rev,
+            )
+            for delrev in delete_revs:
+                self._delete_version(delrev)
+            self._update_version(update_from_rev, update_to_rev)
+        elif step.should_unmerge_branches(self.heads):
+            (
+                update_from_rev,
+                update_to_rev,
+                insert_revs,
+            ) = step.unmerge_branch_idents(self.heads)
+            log.debug(
+                "unmerge, insert %s, update %s to %s",
+                insert_revs,
+                update_from_rev,
+                update_to_rev,
+            )
+            for insrev in insert_revs:
+                self._insert_version(insrev)
+            self._update_version(update_from_rev, update_to_rev)
+        else:
+            from_, to_ = step.update_version_num(self.heads)
+            log.debug("update %s to %s", from_, to_)
+            self._update_version(from_, to_)
+
+
+class MigrationInfo:
+    """Exposes information about a migration step to a callback listener.
+
+    The :class:`.MigrationInfo` object is available exclusively for the
+    benefit of the :paramref:`.EnvironmentContext.on_version_apply`
+    callback hook.
+
+    """
+
+    is_upgrade: bool
+    """True/False: indicates whether this operation ascends or descends the
+    version tree."""
+
+    is_stamp: bool
+    """True/False: indicates whether this operation is a stamp (i.e. whether
+    it results in any actual database operations)."""
+
+    up_revision_id: Optional[str]
+    """Version string corresponding to :attr:`.Revision.revision`.
+
+    In the case of a stamp operation, it is advised to use the
+    :attr:`.MigrationInfo.up_revision_ids` tuple as a stamp operation can
+    make a single movement from one or more branches down to a single
+    branchpoint, in which case there will be multiple "up" revisions.
+
+    .. seealso::
+
+        :attr:`.MigrationInfo.up_revision_ids`
+
+    """
+
+    up_revision_ids: Tuple[str, ...]
+    """Tuple of version strings corresponding to :attr:`.Revision.revision`.
+
+    In the majority of cases, this tuple will be a single value, synonymous
+    with the scalar value of :attr:`.MigrationInfo.up_revision_id`.
+    It can be multiple revision identifiers only in the case of an
+    ``alembic stamp`` operation which is moving downwards from multiple
+    branches down to their common branch point.
+
+    """
+
+    down_revision_ids: Tuple[str, ...]
+    """Tuple of strings representing the base revisions of this migration step.
+
+    If empty, this represents a root revision; otherwise, the first item
+    corresponds to :attr:`.Revision.down_revision`, and the rest are inferred
+    from dependencies.
+    """
+
+    revision_map: RevisionMap
+    """The revision map inside of which this operation occurs."""
+
+    def __init__(
+        self,
+        revision_map: RevisionMap,
+        is_upgrade: bool,
+        is_stamp: bool,
+        up_revisions: Union[str, Tuple[str, ...]],
+        down_revisions: Union[str, Tuple[str, ...]],
+    ) -> None:
+        self.revision_map = revision_map
+        self.is_upgrade = is_upgrade
+        self.is_stamp = is_stamp
+        self.up_revision_ids = util.to_tuple(up_revisions, default=())
+        if self.up_revision_ids:
+            self.up_revision_id = self.up_revision_ids[0]
+        else:
+            # this should never be the case with
+            # "upgrade", "downgrade", or "stamp" as we are always
+            # measuring movement in terms of at least one upgrade version
+            self.up_revision_id = None
+        self.down_revision_ids = util.to_tuple(down_revisions, default=())
+
+    @property
+    def is_migration(self) -> bool:
+        """True/False: indicates whether this operation is a migration.
+
+        At present this is true if and only the migration is not a stamp.
+        If other operation types are added in the future, both this attribute
+        and :attr:`~.MigrationInfo.is_stamp` will be false.
+        """
+        return not self.is_stamp
+
+    @property
+    def source_revision_ids(self) -> Tuple[str, ...]:
+        """Active revisions before this migration step is applied."""
+        return (
+            self.down_revision_ids if self.is_upgrade else self.up_revision_ids
+        )
+
+    @property
+    def destination_revision_ids(self) -> Tuple[str, ...]:
+        """Active revisions after this migration step is applied."""
+        return (
+            self.up_revision_ids if self.is_upgrade else self.down_revision_ids
+        )
+
+    @property
+    def up_revision(self) -> Optional[Revision]:
+        """Get :attr:`~.MigrationInfo.up_revision_id` as
+        a :class:`.Revision`.
+
+        """
+        return self.revision_map.get_revision(self.up_revision_id)
+
+    @property
+    def up_revisions(self) -> Tuple[Optional[_RevisionOrBase], ...]:
+        """Get :attr:`~.MigrationInfo.up_revision_ids` as a
+        :class:`.Revision`."""
+        return self.revision_map.get_revisions(self.up_revision_ids)
+
+    @property
+    def down_revisions(self) -> Tuple[Optional[_RevisionOrBase], ...]:
+        """Get :attr:`~.MigrationInfo.down_revision_ids` as a tuple of
+        :class:`Revisions <.Revision>`."""
+        return self.revision_map.get_revisions(self.down_revision_ids)
+
+    @property
+    def source_revisions(self) -> Tuple[Optional[_RevisionOrBase], ...]:
+        """Get :attr:`~MigrationInfo.source_revision_ids` as a tuple of
+        :class:`Revisions <.Revision>`."""
+        return self.revision_map.get_revisions(self.source_revision_ids)
+
+    @property
+    def destination_revisions(self) -> Tuple[Optional[_RevisionOrBase], ...]:
+        """Get :attr:`~MigrationInfo.destination_revision_ids` as a tuple of
+        :class:`Revisions <.Revision>`."""
+        return self.revision_map.get_revisions(self.destination_revision_ids)
+
+
+class MigrationStep:
+    from_revisions_no_deps: Tuple[str, ...]
+    to_revisions_no_deps: Tuple[str, ...]
+    is_upgrade: bool
+    migration_fn: Any
+
+    if TYPE_CHECKING:
+
+        @property
+        def doc(self) -> Optional[str]: ...
+
+    @property
+    def name(self) -> str:
+        return self.migration_fn.__name__
+
+    @classmethod
+    def upgrade_from_script(
+        cls, revision_map: RevisionMap, script: Script
+    ) -> RevisionStep:
+        return RevisionStep(revision_map, script, True)
+
+    @classmethod
+    def downgrade_from_script(
+        cls, revision_map: RevisionMap, script: Script
+    ) -> RevisionStep:
+        return RevisionStep(revision_map, script, False)
+
+    @property
+    def is_downgrade(self) -> bool:
+        return not self.is_upgrade
+
+    @property
+    def short_log(self) -> str:
+        return "%s %s -> %s" % (
+            self.name,
+            util.format_as_comma(self.from_revisions_no_deps),
+            util.format_as_comma(self.to_revisions_no_deps),
+        )
+
+    def __str__(self):
+        if self.doc:
+            return "%s %s -> %s, %s" % (
+                self.name,
+                util.format_as_comma(self.from_revisions_no_deps),
+                util.format_as_comma(self.to_revisions_no_deps),
+                self.doc,
+            )
+        else:
+            return self.short_log
+
+
+class RevisionStep(MigrationStep):
+    def __init__(
+        self, revision_map: RevisionMap, revision: Script, is_upgrade: bool
+    ) -> None:
+        self.revision_map = revision_map
+        self.revision = revision
+        self.is_upgrade = is_upgrade
+        if is_upgrade:
+            self.migration_fn = revision.module.upgrade
+        else:
+            self.migration_fn = revision.module.downgrade
+
+    def __repr__(self):
+        return "RevisionStep(%r, is_upgrade=%r)" % (
+            self.revision.revision,
+            self.is_upgrade,
+        )
+
+    def __eq__(self, other: object) -> bool:
+        return (
+            isinstance(other, RevisionStep)
+            and other.revision == self.revision
+            and self.is_upgrade == other.is_upgrade
+        )
+
+    @property
+    def doc(self) -> Optional[str]:
+        return self.revision.doc
+
+    @property
+    def from_revisions(self) -> Tuple[str, ...]:
+        if self.is_upgrade:
+            return self.revision._normalized_down_revisions
+        else:
+            return (self.revision.revision,)
+
+    @property
+    def from_revisions_no_deps(  # type:ignore[override]
+        self,
+    ) -> Tuple[str, ...]:
+        if self.is_upgrade:
+            return self.revision._versioned_down_revisions
+        else:
+            return (self.revision.revision,)
+
+    @property
+    def to_revisions(self) -> Tuple[str, ...]:
+        if self.is_upgrade:
+            return (self.revision.revision,)
+        else:
+            return self.revision._normalized_down_revisions
+
+    @property
+    def to_revisions_no_deps(  # type:ignore[override]
+        self,
+    ) -> Tuple[str, ...]:
+        if self.is_upgrade:
+            return (self.revision.revision,)
+        else:
+            return self.revision._versioned_down_revisions
+
+    @property
+    def _has_scalar_down_revision(self) -> bool:
+        return len(self.revision._normalized_down_revisions) == 1
+
+    def should_delete_branch(self, heads: Set[str]) -> bool:
+        """A delete is when we are a. in a downgrade and b.
+        we are going to the "base" or we are going to a version that
+        is implied as a dependency on another version that is remaining.
+
+        """
+        if not self.is_downgrade:
+            return False
+
+        if self.revision.revision not in heads:
+            return False
+
+        downrevs = self.revision._normalized_down_revisions
+
+        if not downrevs:
+            # is a base
+            return True
+        else:
+            # determine what the ultimate "to_revisions" for an
+            # unmerge would be.  If there are none, then we're a delete.
+            to_revisions = self._unmerge_to_revisions(heads)
+            return not to_revisions
+
+    def merge_branch_idents(
+        self, heads: Set[str]
+    ) -> Tuple[List[str], str, str]:
+        other_heads = set(heads).difference(self.from_revisions)
+
+        if other_heads:
+            ancestors = {
+                r.revision
+                for r in self.revision_map._get_ancestor_nodes(
+                    self.revision_map.get_revisions(other_heads), check=False
+                )
+            }
+            from_revisions = list(
+                set(self.from_revisions).difference(ancestors)
+            )
+        else:
+            from_revisions = list(self.from_revisions)
+
+        return (
+            # delete revs, update from rev, update to rev
+            list(from_revisions[0:-1]),
+            from_revisions[-1],
+            self.to_revisions[0],
+        )
+
+    def _unmerge_to_revisions(self, heads: Set[str]) -> Tuple[str, ...]:
+        other_heads = set(heads).difference([self.revision.revision])
+        if other_heads:
+            ancestors = {
+                r.revision
+                for r in self.revision_map._get_ancestor_nodes(
+                    self.revision_map.get_revisions(other_heads), check=False
+                )
+            }
+            return tuple(set(self.to_revisions).difference(ancestors))
+        else:
+            # for each revision we plan to return, compute its ancestors
+            # (excluding self), and remove those from the final output since
+            # they are already accounted for.
+            ancestors = {
+                r.revision
+                for to_revision in self.to_revisions
+                for r in self.revision_map._get_ancestor_nodes(
+                    self.revision_map.get_revisions(to_revision), check=False
+                )
+                if r.revision != to_revision
+            }
+            return tuple(set(self.to_revisions).difference(ancestors))
+
+    def unmerge_branch_idents(
+        self, heads: Set[str]
+    ) -> Tuple[str, str, Tuple[str, ...]]:
+        to_revisions = self._unmerge_to_revisions(heads)
+
+        return (
+            # update from rev, update to rev, insert revs
+            self.from_revisions[0],
+            to_revisions[-1],
+            to_revisions[0:-1],
+        )
+
+    def should_create_branch(self, heads: Set[str]) -> bool:
+        if not self.is_upgrade:
+            return False
+
+        downrevs = self.revision._normalized_down_revisions
+
+        if not downrevs:
+            # is a base
+            return True
+        else:
+            # none of our downrevs are present, so...
+            # we have to insert our version.   This is true whether
+            # or not there is only one downrev, or multiple (in the latter
+            # case, we're a merge point.)
+            if not heads.intersection(downrevs):
+                return True
+            else:
+                return False
+
+    def should_merge_branches(self, heads: Set[str]) -> bool:
+        if not self.is_upgrade:
+            return False
+
+        downrevs = self.revision._normalized_down_revisions
+
+        if len(downrevs) > 1 and len(heads.intersection(downrevs)) > 1:
+            return True
+
+        return False
+
+    def should_unmerge_branches(self, heads: Set[str]) -> bool:
+        if not self.is_downgrade:
+            return False
+
+        downrevs = self.revision._normalized_down_revisions
+
+        if self.revision.revision in heads and len(downrevs) > 1:
+            return True
+
+        return False
+
+    def update_version_num(self, heads: Set[str]) -> Tuple[str, str]:
+        if not self._has_scalar_down_revision:
+            downrev = heads.intersection(
+                self.revision._normalized_down_revisions
+            )
+            assert (
+                len(downrev) == 1
+            ), "Can't do an UPDATE because downrevision is ambiguous"
+            down_revision = list(downrev)[0]
+        else:
+            down_revision = self.revision._normalized_down_revisions[0]
+
+        if self.is_upgrade:
+            return down_revision, self.revision.revision
+        else:
+            return self.revision.revision, down_revision
+
+    @property
+    def delete_version_num(self) -> str:
+        return self.revision.revision
+
+    @property
+    def insert_version_num(self) -> str:
+        return self.revision.revision
+
+    @property
+    def info(self) -> MigrationInfo:
+        return MigrationInfo(
+            revision_map=self.revision_map,
+            up_revisions=self.revision.revision,
+            down_revisions=self.revision._normalized_down_revisions,
+            is_upgrade=self.is_upgrade,
+            is_stamp=False,
+        )
+
+
+class StampStep(MigrationStep):
+    def __init__(
+        self,
+        from_: Optional[Union[str, Collection[str]]],
+        to_: Optional[Union[str, Collection[str]]],
+        is_upgrade: bool,
+        branch_move: bool,
+        revision_map: Optional[RevisionMap] = None,
+    ) -> None:
+        self.from_: Tuple[str, ...] = util.to_tuple(from_, default=())
+        self.to_: Tuple[str, ...] = util.to_tuple(to_, default=())
+        self.is_upgrade = is_upgrade
+        self.branch_move = branch_move
+        self.migration_fn = self.stamp_revision
+        self.revision_map = revision_map
+
+    doc: Optional[str] = None
+
+    def stamp_revision(self, **kw: Any) -> None:
+        return None
+
+    def __eq__(self, other):
+        return (
+            isinstance(other, StampStep)
+            and other.from_revisions == self.from_revisions
+            and other.to_revisions == self.to_revisions
+            and other.branch_move == self.branch_move
+            and self.is_upgrade == other.is_upgrade
+        )
+
+    @property
+    def from_revisions(self):
+        return self.from_
+
+    @property
+    def to_revisions(self) -> Tuple[str, ...]:
+        return self.to_
+
+    @property
+    def from_revisions_no_deps(  # type:ignore[override]
+        self,
+    ) -> Tuple[str, ...]:
+        return self.from_
+
+    @property
+    def to_revisions_no_deps(  # type:ignore[override]
+        self,
+    ) -> Tuple[str, ...]:
+        return self.to_
+
+    @property
+    def delete_version_num(self) -> str:
+        assert len(self.from_) == 1
+        return self.from_[0]
+
+    @property
+    def insert_version_num(self) -> str:
+        assert len(self.to_) == 1
+        return self.to_[0]
+
+    def update_version_num(self, heads: Set[str]) -> Tuple[str, str]:
+        assert len(self.from_) == 1
+        assert len(self.to_) == 1
+        return self.from_[0], self.to_[0]
+
+    def merge_branch_idents(
+        self, heads: Union[Set[str], List[str]]
+    ) -> Union[Tuple[List[Any], str, str], Tuple[List[str], str, str]]:
+        return (
+            # delete revs, update from rev, update to rev
+            list(self.from_[0:-1]),
+            self.from_[-1],
+            self.to_[0],
+        )
+
+    def unmerge_branch_idents(
+        self, heads: Set[str]
+    ) -> Tuple[str, str, List[str]]:
+        return (
+            # update from rev, update to rev, insert revs
+            self.from_[0],
+            self.to_[-1],
+            list(self.to_[0:-1]),
+        )
+
+    def should_delete_branch(self, heads: Set[str]) -> bool:
+        # TODO: we probably need to look for self.to_ inside of heads,
+        # in a similar manner as should_create_branch, however we have
+        # no tests for this yet (stamp downgrades w/ branches)
+        return self.is_downgrade and self.branch_move
+
+    def should_create_branch(self, heads: Set[str]) -> Union[Set[str], bool]:
+        return (
+            self.is_upgrade
+            and (self.branch_move or set(self.from_).difference(heads))
+            and set(self.to_).difference(heads)
+        )
+
+    def should_merge_branches(self, heads: Set[str]) -> bool:
+        return len(self.from_) > 1
+
+    def should_unmerge_branches(self, heads: Set[str]) -> bool:
+        return len(self.to_) > 1
+
+    @property
+    def info(self) -> MigrationInfo:
+        up, down = (
+            (self.to_, self.from_)
+            if self.is_upgrade
+            else (self.from_, self.to_)
+        )
+        assert self.revision_map is not None
+        return MigrationInfo(
+            revision_map=self.revision_map,
+            up_revisions=up,
+            down_revisions=down,
+            is_upgrade=self.is_upgrade,
+            is_stamp=True,
+        )