aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/alembic/runtime
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/runtime')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/runtime/__init__.py0
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/runtime/environment.py1051
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/runtime/migration.py1391
3 files changed, 2442 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/runtime/__init__.py b/.venv/lib/python3.12/site-packages/alembic/runtime/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/runtime/__init__.py
diff --git a/.venv/lib/python3.12/site-packages/alembic/runtime/environment.py b/.venv/lib/python3.12/site-packages/alembic/runtime/environment.py
new file mode 100644
index 00000000..1ff71eef
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/runtime/environment.py
@@ -0,0 +1,1051 @@
+from __future__ import annotations
+
+from typing import Any
+from typing import Callable
+from typing import Collection
+from typing import Dict
+from typing import List
+from typing import Mapping
+from typing import MutableMapping
+from typing import Optional
+from typing import overload
+from typing import Sequence
+from typing import TextIO
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy.sql.schema import Column
+from sqlalchemy.sql.schema import FetchedValue
+from typing_extensions import ContextManager
+from typing_extensions import Literal
+
+from .migration import _ProxyTransaction
+from .migration import MigrationContext
+from .. import util
+from ..operations import Operations
+from ..script.revision import _GetRevArg
+
+if TYPE_CHECKING:
+ from sqlalchemy.engine import URL
+ from sqlalchemy.engine.base import Connection
+ from sqlalchemy.sql import Executable
+ from sqlalchemy.sql.schema import MetaData
+ from sqlalchemy.sql.schema import SchemaItem
+ from sqlalchemy.sql.type_api import TypeEngine
+
+ from .migration import MigrationInfo
+ from ..autogenerate.api import AutogenContext
+ from ..config import Config
+ from ..ddl import DefaultImpl
+ from ..operations.ops import MigrationScript
+ from ..script.base import ScriptDirectory
+
+_RevNumber = Optional[Union[str, Tuple[str, ...]]]
+
+ProcessRevisionDirectiveFn = Callable[
+ [MigrationContext, _GetRevArg, List["MigrationScript"]], None
+]
+
+RenderItemFn = Callable[
+ [str, Any, "AutogenContext"], Union[str, Literal[False]]
+]
+
+NameFilterType = Literal[
+ "schema",
+ "table",
+ "column",
+ "index",
+ "unique_constraint",
+ "foreign_key_constraint",
+]
+NameFilterParentNames = MutableMapping[
+ Literal["schema_name", "table_name", "schema_qualified_table_name"],
+ Optional[str],
+]
+IncludeNameFn = Callable[
+ [Optional[str], NameFilterType, NameFilterParentNames], bool
+]
+
+IncludeObjectFn = Callable[
+ [
+ "SchemaItem",
+ Optional[str],
+ NameFilterType,
+ bool,
+ Optional["SchemaItem"],
+ ],
+ bool,
+]
+
+OnVersionApplyFn = Callable[
+ [MigrationContext, "MigrationInfo", Collection[Any], Mapping[str, Any]],
+ None,
+]
+
+CompareServerDefault = Callable[
+ [
+ MigrationContext,
+ "Column[Any]",
+ "Column[Any]",
+ Optional[str],
+ Optional[FetchedValue],
+ Optional[str],
+ ],
+ Optional[bool],
+]
+
+CompareType = Callable[
+ [
+ MigrationContext,
+ "Column[Any]",
+ "Column[Any]",
+ "TypeEngine[Any]",
+ "TypeEngine[Any]",
+ ],
+ Optional[bool],
+]
+
+
+class EnvironmentContext(util.ModuleClsProxy):
+ """A configurational facade made available in an ``env.py`` script.
+
+ The :class:`.EnvironmentContext` acts as a *facade* to the more
+ nuts-and-bolts objects of :class:`.MigrationContext` as well as certain
+ aspects of :class:`.Config`,
+ within the context of the ``env.py`` script that is invoked by
+ most Alembic commands.
+
+ :class:`.EnvironmentContext` is normally instantiated
+ when a command in :mod:`alembic.command` is run. It then makes
+ itself available in the ``alembic.context`` module for the scope
+ of the command. From within an ``env.py`` script, the current
+ :class:`.EnvironmentContext` is available by importing this module.
+
+ :class:`.EnvironmentContext` also supports programmatic usage.
+ At this level, it acts as a Python context manager, that is, is
+ intended to be used using the
+ ``with:`` statement. A typical use of :class:`.EnvironmentContext`::
+
+ from alembic.config import Config
+ from alembic.script import ScriptDirectory
+
+ config = Config()
+ config.set_main_option("script_location", "myapp:migrations")
+ script = ScriptDirectory.from_config(config)
+
+
+ def my_function(rev, context):
+ '''do something with revision "rev", which
+ will be the current database revision,
+ and "context", which is the MigrationContext
+ that the env.py will create'''
+
+
+ with EnvironmentContext(
+ config,
+ script,
+ fn=my_function,
+ as_sql=False,
+ starting_rev="base",
+ destination_rev="head",
+ tag="sometag",
+ ):
+ script.run_env()
+
+ The above script will invoke the ``env.py`` script
+ within the migration environment. If and when ``env.py``
+ calls :meth:`.MigrationContext.run_migrations`, the
+ ``my_function()`` function above will be called
+ by the :class:`.MigrationContext`, given the context
+ itself as well as the current revision in the database.
+
+ .. note::
+
+ For most API usages other than full blown
+ invocation of migration scripts, the :class:`.MigrationContext`
+ and :class:`.ScriptDirectory` objects can be created and
+ used directly. The :class:`.EnvironmentContext` object
+ is *only* needed when you need to actually invoke the
+ ``env.py`` module present in the migration environment.
+
+ """
+
+ _migration_context: Optional[MigrationContext] = None
+
+ config: Config = None # type:ignore[assignment]
+ """An instance of :class:`.Config` representing the
+ configuration file contents as well as other variables
+ set programmatically within it."""
+
+ script: ScriptDirectory = None # type:ignore[assignment]
+ """An instance of :class:`.ScriptDirectory` which provides
+ programmatic access to version files within the ``versions/``
+ directory.
+
+ """
+
+ def __init__(
+ self, config: Config, script: ScriptDirectory, **kw: Any
+ ) -> None:
+ r"""Construct a new :class:`.EnvironmentContext`.
+
+ :param config: a :class:`.Config` instance.
+ :param script: a :class:`.ScriptDirectory` instance.
+ :param \**kw: keyword options that will be ultimately
+ passed along to the :class:`.MigrationContext` when
+ :meth:`.EnvironmentContext.configure` is called.
+
+ """
+ self.config = config
+ self.script = script
+ self.context_opts = kw
+
+ def __enter__(self) -> EnvironmentContext:
+ """Establish a context which provides a
+ :class:`.EnvironmentContext` object to
+ env.py scripts.
+
+ The :class:`.EnvironmentContext` will
+ be made available as ``from alembic import context``.
+
+ """
+ self._install_proxy()
+ return self
+
+ def __exit__(self, *arg: Any, **kw: Any) -> None:
+ self._remove_proxy()
+
+ def is_offline_mode(self) -> bool:
+ """Return True if the current migrations environment
+ is running in "offline mode".
+
+ This is ``True`` or ``False`` depending
+ on the ``--sql`` flag passed.
+
+ This function does not require that the :class:`.MigrationContext`
+ has been configured.
+
+ """
+ return self.context_opts.get("as_sql", False) # type: ignore[no-any-return] # noqa: E501
+
+ def is_transactional_ddl(self) -> bool:
+ """Return True if the context is configured to expect a
+ transactional DDL capable backend.
+
+ This defaults to the type of database in use, and
+ can be overridden by the ``transactional_ddl`` argument
+ to :meth:`.configure`
+
+ This function requires that a :class:`.MigrationContext`
+ has first been made available via :meth:`.configure`.
+
+ """
+ return self.get_context().impl.transactional_ddl
+
+ def requires_connection(self) -> bool:
+ return not self.is_offline_mode()
+
+ def get_head_revision(self) -> _RevNumber:
+ """Return the hex identifier of the 'head' script revision.
+
+ If the script directory has multiple heads, this
+ method raises a :class:`.CommandError`;
+ :meth:`.EnvironmentContext.get_head_revisions` should be preferred.
+
+ This function does not require that the :class:`.MigrationContext`
+ has been configured.
+
+ .. seealso:: :meth:`.EnvironmentContext.get_head_revisions`
+
+ """
+ return self.script.as_revision_number("head")
+
+ def get_head_revisions(self) -> _RevNumber:
+ """Return the hex identifier of the 'heads' script revision(s).
+
+ This returns a tuple containing the version number of all
+ heads in the script directory.
+
+ This function does not require that the :class:`.MigrationContext`
+ has been configured.
+
+ """
+ return self.script.as_revision_number("heads")
+
+ def get_starting_revision_argument(self) -> _RevNumber:
+ """Return the 'starting revision' argument,
+ if the revision was passed using ``start:end``.
+
+ This is only meaningful in "offline" mode.
+ Returns ``None`` if no value is available
+ or was configured.
+
+ This function does not require that the :class:`.MigrationContext`
+ has been configured.
+
+ """
+ if self._migration_context is not None:
+ return self.script.as_revision_number(
+ self.get_context()._start_from_rev
+ )
+ elif "starting_rev" in self.context_opts:
+ return self.script.as_revision_number(
+ self.context_opts["starting_rev"]
+ )
+ else:
+ # this should raise only in the case that a command
+ # is being run where the "starting rev" is never applicable;
+ # this is to catch scripts which rely upon this in
+ # non-sql mode or similar
+ raise util.CommandError(
+ "No starting revision argument is available."
+ )
+
+ def get_revision_argument(self) -> _RevNumber:
+ """Get the 'destination' revision argument.
+
+ This is typically the argument passed to the
+ ``upgrade`` or ``downgrade`` command.
+
+ If it was specified as ``head``, the actual
+ version number is returned; if specified
+ as ``base``, ``None`` is returned.
+
+ This function does not require that the :class:`.MigrationContext`
+ has been configured.
+
+ """
+ return self.script.as_revision_number(
+ self.context_opts["destination_rev"]
+ )
+
+ def get_tag_argument(self) -> Optional[str]:
+ """Return the value passed for the ``--tag`` argument, if any.
+
+ The ``--tag`` argument is not used directly by Alembic,
+ but is available for custom ``env.py`` configurations that
+ wish to use it; particularly for offline generation scripts
+ that wish to generate tagged filenames.
+
+ This function does not require that the :class:`.MigrationContext`
+ has been configured.
+
+ .. seealso::
+
+ :meth:`.EnvironmentContext.get_x_argument` - a newer and more
+ open ended system of extending ``env.py`` scripts via the command
+ line.
+
+ """
+ return self.context_opts.get("tag", None) # type: ignore[no-any-return] # noqa: E501
+
+ @overload
+ def get_x_argument(self, as_dictionary: Literal[False]) -> List[str]: ...
+
+ @overload
+ def get_x_argument(
+ self, as_dictionary: Literal[True]
+ ) -> Dict[str, str]: ...
+
+ @overload
+ def get_x_argument(
+ self, as_dictionary: bool = ...
+ ) -> Union[List[str], Dict[str, str]]: ...
+
+ def get_x_argument(
+ self, as_dictionary: bool = False
+ ) -> Union[List[str], Dict[str, str]]:
+ """Return the value(s) passed for the ``-x`` argument, if any.
+
+ The ``-x`` argument is an open ended flag that allows any user-defined
+ value or values to be passed on the command line, then available
+ here for consumption by a custom ``env.py`` script.
+
+ The return value is a list, returned directly from the ``argparse``
+ structure. If ``as_dictionary=True`` is passed, the ``x`` arguments
+ are parsed using ``key=value`` format into a dictionary that is
+ then returned. If there is no ``=`` in the argument, value is an empty
+ string.
+
+ .. versionchanged:: 1.13.1 Support ``as_dictionary=True`` when
+ arguments are passed without the ``=`` symbol.
+
+ For example, to support passing a database URL on the command line,
+ the standard ``env.py`` script can be modified like this::
+
+ cmd_line_url = context.get_x_argument(
+ as_dictionary=True).get('dbname')
+ if cmd_line_url:
+ engine = create_engine(cmd_line_url)
+ else:
+ engine = engine_from_config(
+ config.get_section(config.config_ini_section),
+ prefix='sqlalchemy.',
+ poolclass=pool.NullPool)
+
+ This then takes effect by running the ``alembic`` script as::
+
+ alembic -x dbname=postgresql://user:pass@host/dbname upgrade head
+
+ This function does not require that the :class:`.MigrationContext`
+ has been configured.
+
+ .. seealso::
+
+ :meth:`.EnvironmentContext.get_tag_argument`
+
+ :attr:`.Config.cmd_opts`
+
+ """
+ if self.config.cmd_opts is not None:
+ value = self.config.cmd_opts.x or []
+ else:
+ value = []
+ if as_dictionary:
+ dict_value = {}
+ for arg in value:
+ x_key, _, x_value = arg.partition("=")
+ dict_value[x_key] = x_value
+ value = dict_value
+
+ return value
+
+ def configure(
+ self,
+ connection: Optional[Connection] = None,
+ url: Optional[Union[str, URL]] = None,
+ dialect_name: Optional[str] = None,
+ dialect_opts: Optional[Dict[str, Any]] = None,
+ transactional_ddl: Optional[bool] = None,
+ transaction_per_migration: bool = False,
+ output_buffer: Optional[TextIO] = None,
+ starting_rev: Optional[str] = None,
+ tag: Optional[str] = None,
+ template_args: Optional[Dict[str, Any]] = None,
+ render_as_batch: bool = False,
+ target_metadata: Union[MetaData, Sequence[MetaData], None] = None,
+ include_name: Optional[IncludeNameFn] = None,
+ include_object: Optional[IncludeObjectFn] = None,
+ include_schemas: bool = False,
+ process_revision_directives: Optional[
+ ProcessRevisionDirectiveFn
+ ] = None,
+ compare_type: Union[bool, CompareType] = True,
+ compare_server_default: Union[bool, CompareServerDefault] = False,
+ render_item: Optional[RenderItemFn] = None,
+ literal_binds: bool = False,
+ upgrade_token: str = "upgrades",
+ downgrade_token: str = "downgrades",
+ alembic_module_prefix: str = "op.",
+ sqlalchemy_module_prefix: str = "sa.",
+ user_module_prefix: Optional[str] = None,
+ on_version_apply: Optional[OnVersionApplyFn] = None,
+ **kw: Any,
+ ) -> None:
+ """Configure a :class:`.MigrationContext` within this
+ :class:`.EnvironmentContext` which will provide database
+ connectivity and other configuration to a series of
+ migration scripts.
+
+ Many methods on :class:`.EnvironmentContext` require that
+ this method has been called in order to function, as they
+ ultimately need to have database access or at least access
+ to the dialect in use. Those which do are documented as such.
+
+ The important thing needed by :meth:`.configure` is a
+ means to determine what kind of database dialect is in use.
+ An actual connection to that database is needed only if
+ the :class:`.MigrationContext` is to be used in
+ "online" mode.
+
+ If the :meth:`.is_offline_mode` function returns ``True``,
+ then no connection is needed here. Otherwise, the
+ ``connection`` parameter should be present as an
+ instance of :class:`sqlalchemy.engine.Connection`.
+
+ This function is typically called from the ``env.py``
+ script within a migration environment. It can be called
+ multiple times for an invocation. The most recent
+ :class:`~sqlalchemy.engine.Connection`
+ for which it was called is the one that will be operated upon
+ by the next call to :meth:`.run_migrations`.
+
+ General parameters:
+
+ :param connection: a :class:`~sqlalchemy.engine.Connection`
+ to use
+ for SQL execution in "online" mode. When present, is also
+ used to determine the type of dialect in use.
+ :param url: a string database url, or a
+ :class:`sqlalchemy.engine.url.URL` object.
+ The type of dialect to be used will be derived from this if
+ ``connection`` is not passed.
+ :param dialect_name: string name of a dialect, such as
+ "postgresql", "mssql", etc.
+ The type of dialect to be used will be derived from this if
+ ``connection`` and ``url`` are not passed.
+ :param dialect_opts: dictionary of options to be passed to dialect
+ constructor.
+ :param transactional_ddl: Force the usage of "transactional"
+ DDL on or off;
+ this otherwise defaults to whether or not the dialect in
+ use supports it.
+ :param transaction_per_migration: if True, nest each migration script
+ in a transaction rather than the full series of migrations to
+ run.
+ :param output_buffer: a file-like object that will be used
+ for textual output
+ when the ``--sql`` option is used to generate SQL scripts.
+ Defaults to
+ ``sys.stdout`` if not passed here and also not present on
+ the :class:`.Config`
+ object. The value here overrides that of the :class:`.Config`
+ object.
+ :param output_encoding: when using ``--sql`` to generate SQL
+ scripts, apply this encoding to the string output.
+ :param literal_binds: when using ``--sql`` to generate SQL
+ scripts, pass through the ``literal_binds`` flag to the compiler
+ so that any literal values that would ordinarily be bound
+ parameters are converted to plain strings.
+
+ .. warning:: Dialects can typically only handle simple datatypes
+ like strings and numbers for auto-literal generation. Datatypes
+ like dates, intervals, and others may still require manual
+ formatting, typically using :meth:`.Operations.inline_literal`.
+
+ .. note:: the ``literal_binds`` flag is ignored on SQLAlchemy
+ versions prior to 0.8 where this feature is not supported.
+
+ .. seealso::
+
+ :meth:`.Operations.inline_literal`
+
+ :param starting_rev: Override the "starting revision" argument
+ when using ``--sql`` mode.
+ :param tag: a string tag for usage by custom ``env.py`` scripts.
+ Set via the ``--tag`` option, can be overridden here.
+ :param template_args: dictionary of template arguments which
+ will be added to the template argument environment when
+ running the "revision" command. Note that the script environment
+ is only run within the "revision" command if the --autogenerate
+ option is used, or if the option "revision_environment=true"
+ is present in the alembic.ini file.
+
+ :param version_table: The name of the Alembic version table.
+ The default is ``'alembic_version'``.
+ :param version_table_schema: Optional schema to place version
+ table within.
+ :param version_table_pk: boolean, whether the Alembic version table
+ should use a primary key constraint for the "value" column; this
+ only takes effect when the table is first created.
+ Defaults to True; setting to False should not be necessary and is
+ here for backwards compatibility reasons.
+ :param on_version_apply: a callable or collection of callables to be
+ run for each migration step.
+ The callables will be run in the order they are given, once for
+ each migration step, after the respective operation has been
+ applied but before its transaction is finalized.
+ Each callable accepts no positional arguments and the following
+ keyword arguments:
+
+ * ``ctx``: the :class:`.MigrationContext` running the migration,
+ * ``step``: a :class:`.MigrationInfo` representing the
+ step currently being applied,
+ * ``heads``: a collection of version strings representing the
+ current heads,
+ * ``run_args``: the ``**kwargs`` passed to :meth:`.run_migrations`.
+
+ Parameters specific to the autogenerate feature, when
+ ``alembic revision`` is run with the ``--autogenerate`` feature:
+
+ :param target_metadata: a :class:`sqlalchemy.schema.MetaData`
+ object, or a sequence of :class:`~sqlalchemy.schema.MetaData`
+ objects, that will be consulted during autogeneration.
+ The tables present in each :class:`~sqlalchemy.schema.MetaData`
+ will be compared against
+ what is locally available on the target
+ :class:`~sqlalchemy.engine.Connection`
+ to produce candidate upgrade/downgrade operations.
+ :param compare_type: Indicates type comparison behavior during
+ an autogenerate
+ operation. Defaults to ``True`` turning on type comparison, which
+ has good accuracy on most backends. See :ref:`compare_types`
+ for an example as well as information on other type
+ comparison options. Set to ``False`` which disables type
+ comparison. A callable can also be passed to provide custom type
+ comparison, see :ref:`compare_types` for additional details.
+
+ .. versionchanged:: 1.12.0 The default value of
+ :paramref:`.EnvironmentContext.configure.compare_type` has been
+ changed to ``True``.
+
+ .. seealso::
+
+ :ref:`compare_types`
+
+ :paramref:`.EnvironmentContext.configure.compare_server_default`
+
+ :param compare_server_default: Indicates server default comparison
+ behavior during
+ an autogenerate operation. Defaults to ``False`` which disables
+ server default
+ comparison. Set to ``True`` to turn on server default comparison,
+ which has
+ varied accuracy depending on backend.
+
+ To customize server default comparison behavior, a callable may
+ be specified
+ which can filter server default comparisons during an
+ autogenerate operation.
+ defaults during an autogenerate operation. The format of this
+ callable is::
+
+ def my_compare_server_default(context, inspected_column,
+ metadata_column, inspected_default, metadata_default,
+ rendered_metadata_default):
+ # return True if the defaults are different,
+ # False if not, or None to allow the default implementation
+ # to compare these defaults
+ return None
+
+ context.configure(
+ # ...
+ compare_server_default = my_compare_server_default
+ )
+
+ ``inspected_column`` is a dictionary structure as returned by
+ :meth:`sqlalchemy.engine.reflection.Inspector.get_columns`, whereas
+ ``metadata_column`` is a :class:`sqlalchemy.schema.Column` from
+ the local model environment.
+
+ A return value of ``None`` indicates to allow default server default
+ comparison
+ to proceed. Note that some backends such as Postgresql actually
+ execute
+ the two defaults on the database side to compare for equivalence.
+
+ .. seealso::
+
+ :paramref:`.EnvironmentContext.configure.compare_type`
+
+ :param include_name: A callable function which is given
+ the chance to return ``True`` or ``False`` for any database reflected
+ object based on its name, including database schema names when
+ the :paramref:`.EnvironmentContext.configure.include_schemas` flag
+ is set to ``True``.
+
+ The function accepts the following positional arguments:
+
+ * ``name``: the name of the object, such as schema name or table name.
+ Will be ``None`` when indicating the default schema name of the
+ database connection.
+ * ``type``: a string describing the type of object; currently
+ ``"schema"``, ``"table"``, ``"column"``, ``"index"``,
+ ``"unique_constraint"``, or ``"foreign_key_constraint"``
+ * ``parent_names``: a dictionary of "parent" object names, that are
+ relative to the name being given. Keys in this dictionary may
+ include: ``"schema_name"``, ``"table_name"`` or
+ ``"schema_qualified_table_name"``.
+
+ E.g.::
+
+ def include_name(name, type_, parent_names):
+ if type_ == "schema":
+ return name in ["schema_one", "schema_two"]
+ else:
+ return True
+
+ context.configure(
+ # ...
+ include_schemas = True,
+ include_name = include_name
+ )
+
+ .. seealso::
+
+ :ref:`autogenerate_include_hooks`
+
+ :paramref:`.EnvironmentContext.configure.include_object`
+
+ :paramref:`.EnvironmentContext.configure.include_schemas`
+
+
+ :param include_object: A callable function which is given
+ the chance to return ``True`` or ``False`` for any object,
+ indicating if the given object should be considered in the
+ autogenerate sweep.
+
+ The function accepts the following positional arguments:
+
+ * ``object``: a :class:`~sqlalchemy.schema.SchemaItem` object such
+ as a :class:`~sqlalchemy.schema.Table`,
+ :class:`~sqlalchemy.schema.Column`,
+ :class:`~sqlalchemy.schema.Index`
+ :class:`~sqlalchemy.schema.UniqueConstraint`,
+ or :class:`~sqlalchemy.schema.ForeignKeyConstraint` object
+ * ``name``: the name of the object. This is typically available
+ via ``object.name``.
+ * ``type``: a string describing the type of object; currently
+ ``"table"``, ``"column"``, ``"index"``, ``"unique_constraint"``,
+ or ``"foreign_key_constraint"``
+ * ``reflected``: ``True`` if the given object was produced based on
+ table reflection, ``False`` if it's from a local :class:`.MetaData`
+ object.
+ * ``compare_to``: the object being compared against, if available,
+ else ``None``.
+
+ E.g.::
+
+ def include_object(object, name, type_, reflected, compare_to):
+ if (type_ == "column" and
+ not reflected and
+ object.info.get("skip_autogenerate", False)):
+ return False
+ else:
+ return True
+
+ context.configure(
+ # ...
+ include_object = include_object
+ )
+
+ For the use case of omitting specific schemas from a target database
+ when :paramref:`.EnvironmentContext.configure.include_schemas` is
+ set to ``True``, the :attr:`~sqlalchemy.schema.Table.schema`
+ attribute can be checked for each :class:`~sqlalchemy.schema.Table`
+ object passed to the hook, however it is much more efficient
+ to filter on schemas before reflection of objects takes place
+ using the :paramref:`.EnvironmentContext.configure.include_name`
+ hook.
+
+ .. seealso::
+
+ :ref:`autogenerate_include_hooks`
+
+ :paramref:`.EnvironmentContext.configure.include_name`
+
+ :paramref:`.EnvironmentContext.configure.include_schemas`
+
+ :param render_as_batch: if True, commands which alter elements
+ within a table will be placed under a ``with batch_alter_table():``
+ directive, so that batch migrations will take place.
+
+ .. seealso::
+
+ :ref:`batch_migrations`
+
+ :param include_schemas: If True, autogenerate will scan across
+ all schemas located by the SQLAlchemy
+ :meth:`~sqlalchemy.engine.reflection.Inspector.get_schema_names`
+ method, and include all differences in tables found across all
+ those schemas. When using this option, you may want to also
+ use the :paramref:`.EnvironmentContext.configure.include_name`
+ parameter to specify a callable which
+ can filter the tables/schemas that get included.
+
+ .. seealso::
+
+ :ref:`autogenerate_include_hooks`
+
+ :paramref:`.EnvironmentContext.configure.include_name`
+
+ :paramref:`.EnvironmentContext.configure.include_object`
+
+ :param render_item: Callable that can be used to override how
+ any schema item, i.e. column, constraint, type,
+ etc., is rendered for autogenerate. The callable receives a
+ string describing the type of object, the object, and
+ the autogen context. If it returns False, the
+ default rendering method will be used. If it returns None,
+ the item will not be rendered in the context of a Table
+ construct, that is, can be used to skip columns or constraints
+ within op.create_table()::
+
+ def my_render_column(type_, col, autogen_context):
+ if type_ == "column" and isinstance(col, MySpecialCol):
+ return repr(col)
+ else:
+ return False
+
+ context.configure(
+ # ...
+ render_item = my_render_column
+ )
+
+ Available values for the type string include: ``"column"``,
+ ``"primary_key"``, ``"foreign_key"``, ``"unique"``, ``"check"``,
+ ``"type"``, ``"server_default"``.
+
+ .. seealso::
+
+ :ref:`autogen_render_types`
+
+ :param upgrade_token: When autogenerate completes, the text of the
+ candidate upgrade operations will be present in this template
+ variable when ``script.py.mako`` is rendered. Defaults to
+ ``upgrades``.
+ :param downgrade_token: When autogenerate completes, the text of the
+ candidate downgrade operations will be present in this
+ template variable when ``script.py.mako`` is rendered. Defaults to
+ ``downgrades``.
+
+ :param alembic_module_prefix: When autogenerate refers to Alembic
+ :mod:`alembic.operations` constructs, this prefix will be used
+ (i.e. ``op.create_table``) Defaults to "``op.``".
+ Can be ``None`` to indicate no prefix.
+
+ :param sqlalchemy_module_prefix: When autogenerate refers to
+ SQLAlchemy
+ :class:`~sqlalchemy.schema.Column` or type classes, this prefix
+ will be used
+ (i.e. ``sa.Column("somename", sa.Integer)``) Defaults to "``sa.``".
+ Can be ``None`` to indicate no prefix.
+ Note that when dialect-specific types are rendered, autogenerate
+ will render them using the dialect module name, i.e. ``mssql.BIT()``,
+ ``postgresql.UUID()``.
+
+ :param user_module_prefix: When autogenerate refers to a SQLAlchemy
+ type (e.g. :class:`.TypeEngine`) where the module name is not
+ under the ``sqlalchemy`` namespace, this prefix will be used
+ within autogenerate. If left at its default of
+ ``None``, the ``__module__`` attribute of the type is used to
+ render the import module. It's a good practice to set this
+ and to have all custom types be available from a fixed module space,
+ in order to future-proof migration files against reorganizations
+ in modules.
+
+ .. seealso::
+
+ :ref:`autogen_module_prefix`
+
+ :param process_revision_directives: a callable function that will
+ be passed a structure representing the end result of an autogenerate
+ or plain "revision" operation, which can be manipulated to affect
+ how the ``alembic revision`` command ultimately outputs new
+ revision scripts. The structure of the callable is::
+
+ def process_revision_directives(context, revision, directives):
+ pass
+
+ The ``directives`` parameter is a Python list containing
+ a single :class:`.MigrationScript` directive, which represents
+ the revision file to be generated. This list as well as its
+ contents may be freely modified to produce any set of commands.
+ The section :ref:`customizing_revision` shows an example of
+ doing this. The ``context`` parameter is the
+ :class:`.MigrationContext` in use,
+ and ``revision`` is a tuple of revision identifiers representing the
+ current revision of the database.
+
+ The callable is invoked at all times when the ``--autogenerate``
+ option is passed to ``alembic revision``. If ``--autogenerate``
+ is not passed, the callable is invoked only if the
+ ``revision_environment`` variable is set to True in the Alembic
+ configuration, in which case the given ``directives`` collection
+ will contain empty :class:`.UpgradeOps` and :class:`.DowngradeOps`
+ collections for ``.upgrade_ops`` and ``.downgrade_ops``. The
+ ``--autogenerate`` option itself can be inferred by inspecting
+ ``context.config.cmd_opts.autogenerate``.
+
+ The callable function may optionally be an instance of
+ a :class:`.Rewriter` object. This is a helper object that
+ assists in the production of autogenerate-stream rewriter functions.
+
+ .. seealso::
+
+ :ref:`customizing_revision`
+
+ :ref:`autogen_rewriter`
+
+ :paramref:`.command.revision.process_revision_directives`
+
+ Parameters specific to individual backends:
+
+ :param mssql_batch_separator: The "batch separator" which will
+ be placed between each statement when generating offline SQL Server
+ migrations. Defaults to ``GO``. Note this is in addition to the
+ customary semicolon ``;`` at the end of each statement; SQL Server
+ considers the "batch separator" to denote the end of an
+ individual statement execution, and cannot group certain
+ dependent operations in one step.
+ :param oracle_batch_separator: The "batch separator" which will
+ be placed between each statement when generating offline
+ Oracle migrations. Defaults to ``/``. Oracle doesn't add a
+ semicolon between statements like most other backends.
+
+ """
+ opts = self.context_opts
+ if transactional_ddl is not None:
+ opts["transactional_ddl"] = transactional_ddl
+ if output_buffer is not None:
+ opts["output_buffer"] = output_buffer
+ elif self.config.output_buffer is not None:
+ opts["output_buffer"] = self.config.output_buffer
+ if starting_rev:
+ opts["starting_rev"] = starting_rev
+ if tag:
+ opts["tag"] = tag
+ if template_args and "template_args" in opts:
+ opts["template_args"].update(template_args)
+ opts["transaction_per_migration"] = transaction_per_migration
+ opts["target_metadata"] = target_metadata
+ opts["include_name"] = include_name
+ opts["include_object"] = include_object
+ opts["include_schemas"] = include_schemas
+ opts["render_as_batch"] = render_as_batch
+ opts["upgrade_token"] = upgrade_token
+ opts["downgrade_token"] = downgrade_token
+ opts["sqlalchemy_module_prefix"] = sqlalchemy_module_prefix
+ opts["alembic_module_prefix"] = alembic_module_prefix
+ opts["user_module_prefix"] = user_module_prefix
+ opts["literal_binds"] = literal_binds
+ opts["process_revision_directives"] = process_revision_directives
+ opts["on_version_apply"] = util.to_tuple(on_version_apply, default=())
+
+ if render_item is not None:
+ opts["render_item"] = render_item
+ opts["compare_type"] = compare_type
+ if compare_server_default is not None:
+ opts["compare_server_default"] = compare_server_default
+ opts["script"] = self.script
+
+ opts.update(kw)
+
+ self._migration_context = MigrationContext.configure(
+ connection=connection,
+ url=url,
+ dialect_name=dialect_name,
+ environment_context=self,
+ dialect_opts=dialect_opts,
+ opts=opts,
+ )
+
+ def run_migrations(self, **kw: Any) -> None:
+ """Run migrations as determined by the current command line
+ configuration
+ as well as versioning information present (or not) in the current
+ database connection (if one is present).
+
+ The function accepts optional ``**kw`` arguments. If these are
+ passed, they are sent directly to the ``upgrade()`` and
+ ``downgrade()``
+ functions within each target revision file. By modifying the
+ ``script.py.mako`` file so that the ``upgrade()`` and ``downgrade()``
+ functions accept arguments, parameters can be passed here so that
+ contextual information, usually information to identify a particular
+ database in use, can be passed from a custom ``env.py`` script
+ to the migration functions.
+
+ This function requires that a :class:`.MigrationContext` has
+ first been made available via :meth:`.configure`.
+
+ """
+ assert self._migration_context is not None
+ with Operations.context(self._migration_context):
+ self.get_context().run_migrations(**kw)
+
+ def execute(
+ self,
+ sql: Union[Executable, str],
+ execution_options: Optional[Dict[str, Any]] = None,
+ ) -> None:
+ """Execute the given SQL using the current change context.
+
+ The behavior of :meth:`.execute` is the same
+ as that of :meth:`.Operations.execute`. Please see that
+ function's documentation for full detail including
+ caveats and limitations.
+
+ This function requires that a :class:`.MigrationContext` has
+ first been made available via :meth:`.configure`.
+
+ """
+ self.get_context().execute(sql, execution_options=execution_options)
+
+ def static_output(self, text: str) -> None:
+ """Emit text directly to the "offline" SQL stream.
+
+ Typically this is for emitting comments that
+ start with --. The statement is not treated
+ as a SQL execution, no ; or batch separator
+ is added, etc.
+
+ """
+ self.get_context().impl.static_output(text)
+
+ def begin_transaction(
+ self,
+ ) -> Union[_ProxyTransaction, ContextManager[None, Optional[bool]]]:
+ """Return a context manager that will
+ enclose an operation within a "transaction",
+ as defined by the environment's offline
+ and transactional DDL settings.
+
+ e.g.::
+
+ with context.begin_transaction():
+ context.run_migrations()
+
+ :meth:`.begin_transaction` is intended to
+ "do the right thing" regardless of
+ calling context:
+
+ * If :meth:`.is_transactional_ddl` is ``False``,
+ returns a "do nothing" context manager
+ which otherwise produces no transactional
+ state or directives.
+ * If :meth:`.is_offline_mode` is ``True``,
+ returns a context manager that will
+ invoke the :meth:`.DefaultImpl.emit_begin`
+ and :meth:`.DefaultImpl.emit_commit`
+ methods, which will produce the string
+ directives ``BEGIN`` and ``COMMIT`` on
+ the output stream, as rendered by the
+ target backend (e.g. SQL Server would
+ emit ``BEGIN TRANSACTION``).
+ * Otherwise, calls :meth:`sqlalchemy.engine.Connection.begin`
+ on the current online connection, which
+ returns a :class:`sqlalchemy.engine.Transaction`
+ object. This object demarcates a real
+ transaction and is itself a context manager,
+ which will roll back if an exception
+ is raised.
+
+ Note that a custom ``env.py`` script which
+ has more specific transactional needs can of course
+ manipulate the :class:`~sqlalchemy.engine.Connection`
+ directly to produce transactional state in "online"
+ mode.
+
+ """
+
+ return self.get_context().begin_transaction()
+
+ def get_context(self) -> MigrationContext:
+ """Return the current :class:`.MigrationContext` object.
+
+ If :meth:`.EnvironmentContext.configure` has not been
+ called yet, raises an exception.
+
+ """
+
+ if self._migration_context is None:
+ raise Exception("No context has been configured yet.")
+ return self._migration_context
+
+ def get_bind(self) -> Connection:
+ """Return the current 'bind'.
+
+ In "online" mode, this is the
+ :class:`sqlalchemy.engine.Connection` currently being used
+ to emit SQL to the database.
+
+ This function requires that a :class:`.MigrationContext`
+ has first been made available via :meth:`.configure`.
+
+ """
+ return self.get_context().bind # type: ignore[return-value]
+
+ def get_impl(self) -> DefaultImpl:
+ return self.get_context().impl
diff --git a/.venv/lib/python3.12/site-packages/alembic/runtime/migration.py b/.venv/lib/python3.12/site-packages/alembic/runtime/migration.py
new file mode 100644
index 00000000..ac431a62
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/runtime/migration.py
@@ -0,0 +1,1391 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from __future__ import annotations
+
+from contextlib import contextmanager
+from contextlib import nullcontext
+import logging
+import sys
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Collection
+from typing import Dict
+from typing import Iterable
+from typing import Iterator
+from typing import List
+from typing import Optional
+from typing import Set
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy import Column
+from sqlalchemy import literal_column
+from sqlalchemy import select
+from sqlalchemy.engine import Engine
+from sqlalchemy.engine import url as sqla_url
+from sqlalchemy.engine.strategies import MockEngineStrategy
+from typing_extensions import ContextManager
+
+from .. import ddl
+from .. import util
+from ..util import sqla_compat
+from ..util.compat import EncodedIO
+
+if TYPE_CHECKING:
+ from sqlalchemy.engine import Dialect
+ from sqlalchemy.engine import URL
+ from sqlalchemy.engine.base import Connection
+ from sqlalchemy.engine.base import Transaction
+ from sqlalchemy.engine.mock import MockConnection
+ from sqlalchemy.sql import Executable
+
+ from .environment import EnvironmentContext
+ from ..config import Config
+ from ..script.base import Script
+ from ..script.base import ScriptDirectory
+ from ..script.revision import _RevisionOrBase
+ from ..script.revision import Revision
+ from ..script.revision import RevisionMap
+
+log = logging.getLogger(__name__)
+
+
+class _ProxyTransaction:
+ def __init__(self, migration_context: MigrationContext) -> None:
+ self.migration_context = migration_context
+
+ @property
+ def _proxied_transaction(self) -> Optional[Transaction]:
+ return self.migration_context._transaction
+
+ def rollback(self) -> None:
+ t = self._proxied_transaction
+ assert t is not None
+ t.rollback()
+ self.migration_context._transaction = None
+
+ def commit(self) -> None:
+ t = self._proxied_transaction
+ assert t is not None
+ t.commit()
+ self.migration_context._transaction = None
+
+ def __enter__(self) -> _ProxyTransaction:
+ return self
+
+ def __exit__(self, type_: Any, value: Any, traceback: Any) -> None:
+ if self._proxied_transaction is not None:
+ self._proxied_transaction.__exit__(type_, value, traceback)
+ self.migration_context._transaction = None
+
+
+class MigrationContext:
+ """Represent the database state made available to a migration
+ script.
+
+ :class:`.MigrationContext` is the front end to an actual
+ database connection, or alternatively a string output
+ stream given a particular database dialect,
+ from an Alembic perspective.
+
+ When inside the ``env.py`` script, the :class:`.MigrationContext`
+ is available via the
+ :meth:`.EnvironmentContext.get_context` method,
+ which is available at ``alembic.context``::
+
+ # from within env.py script
+ from alembic import context
+
+ migration_context = context.get_context()
+
+ For usage outside of an ``env.py`` script, such as for
+ utility routines that want to check the current version
+ in the database, the :meth:`.MigrationContext.configure`
+ method to create new :class:`.MigrationContext` objects.
+ For example, to get at the current revision in the
+ database using :meth:`.MigrationContext.get_current_revision`::
+
+ # in any application, outside of an env.py script
+ from alembic.migration import MigrationContext
+ from sqlalchemy import create_engine
+
+ engine = create_engine("postgresql://mydatabase")
+ conn = engine.connect()
+
+ context = MigrationContext.configure(conn)
+ current_rev = context.get_current_revision()
+
+ The above context can also be used to produce
+ Alembic migration operations with an :class:`.Operations`
+ instance::
+
+ # in any application, outside of the normal Alembic environment
+ from alembic.operations import Operations
+
+ op = Operations(context)
+ op.alter_column("mytable", "somecolumn", nullable=True)
+
+ """
+
+ def __init__(
+ self,
+ dialect: Dialect,
+ connection: Optional[Connection],
+ opts: Dict[str, Any],
+ environment_context: Optional[EnvironmentContext] = None,
+ ) -> None:
+ self.environment_context = environment_context
+ self.opts = opts
+ self.dialect = dialect
+ self.script: Optional[ScriptDirectory] = opts.get("script")
+ as_sql: bool = opts.get("as_sql", False)
+ transactional_ddl = opts.get("transactional_ddl")
+ self._transaction_per_migration = opts.get(
+ "transaction_per_migration", False
+ )
+ self.on_version_apply_callbacks = opts.get("on_version_apply", ())
+ self._transaction: Optional[Transaction] = None
+
+ if as_sql:
+ self.connection = cast(
+ Optional["Connection"], self._stdout_connection(connection)
+ )
+ assert self.connection is not None
+ self._in_external_transaction = False
+ else:
+ self.connection = connection
+ self._in_external_transaction = (
+ sqla_compat._get_connection_in_transaction(connection)
+ )
+
+ self._migrations_fn: Optional[
+ Callable[..., Iterable[RevisionStep]]
+ ] = opts.get("fn")
+ self.as_sql = as_sql
+
+ self.purge = opts.get("purge", False)
+
+ if "output_encoding" in opts:
+ self.output_buffer = EncodedIO(
+ opts.get("output_buffer")
+ or sys.stdout, # type:ignore[arg-type]
+ opts["output_encoding"],
+ )
+ else:
+ self.output_buffer = opts.get("output_buffer", sys.stdout)
+
+ self._user_compare_type = opts.get("compare_type", True)
+ self._user_compare_server_default = opts.get(
+ "compare_server_default", False
+ )
+ self.version_table = version_table = opts.get(
+ "version_table", "alembic_version"
+ )
+ self.version_table_schema = version_table_schema = opts.get(
+ "version_table_schema", None
+ )
+
+ self._start_from_rev: Optional[str] = opts.get("starting_rev")
+ self.impl = ddl.DefaultImpl.get_by_dialect(dialect)(
+ dialect,
+ self.connection,
+ self.as_sql,
+ transactional_ddl,
+ self.output_buffer,
+ opts,
+ )
+
+ self._version = self.impl.version_table_impl(
+ version_table=version_table,
+ version_table_schema=version_table_schema,
+ version_table_pk=opts.get("version_table_pk", True),
+ )
+
+ log.info("Context impl %s.", self.impl.__class__.__name__)
+ if self.as_sql:
+ log.info("Generating static SQL")
+ log.info(
+ "Will assume %s DDL.",
+ (
+ "transactional"
+ if self.impl.transactional_ddl
+ else "non-transactional"
+ ),
+ )
+
+ @classmethod
+ def configure(
+ cls,
+ connection: Optional[Connection] = None,
+ url: Optional[Union[str, URL]] = None,
+ dialect_name: Optional[str] = None,
+ dialect: Optional[Dialect] = None,
+ environment_context: Optional[EnvironmentContext] = None,
+ dialect_opts: Optional[Dict[str, str]] = None,
+ opts: Optional[Any] = None,
+ ) -> MigrationContext:
+ """Create a new :class:`.MigrationContext`.
+
+ This is a factory method usually called
+ by :meth:`.EnvironmentContext.configure`.
+
+ :param connection: a :class:`~sqlalchemy.engine.Connection`
+ to use for SQL execution in "online" mode. When present,
+ is also used to determine the type of dialect in use.
+ :param url: a string database url, or a
+ :class:`sqlalchemy.engine.url.URL` object.
+ The type of dialect to be used will be derived from this if
+ ``connection`` is not passed.
+ :param dialect_name: string name of a dialect, such as
+ "postgresql", "mssql", etc. The type of dialect to be used will be
+ derived from this if ``connection`` and ``url`` are not passed.
+ :param opts: dictionary of options. Most other options
+ accepted by :meth:`.EnvironmentContext.configure` are passed via
+ this dictionary.
+
+ """
+ if opts is None:
+ opts = {}
+ if dialect_opts is None:
+ dialect_opts = {}
+
+ if connection:
+ if isinstance(connection, Engine):
+ raise util.CommandError(
+ "'connection' argument to configure() is expected "
+ "to be a sqlalchemy.engine.Connection instance, "
+ "got %r" % connection,
+ )
+
+ dialect = connection.dialect
+ elif url:
+ url_obj = sqla_url.make_url(url)
+ dialect = url_obj.get_dialect()(**dialect_opts)
+ elif dialect_name:
+ url_obj = sqla_url.make_url("%s://" % dialect_name)
+ dialect = url_obj.get_dialect()(**dialect_opts)
+ elif not dialect:
+ raise Exception("Connection, url, or dialect_name is required.")
+ assert dialect is not None
+ return MigrationContext(dialect, connection, opts, environment_context)
+
+ @contextmanager
+ def autocommit_block(self) -> Iterator[None]:
+ """Enter an "autocommit" block, for databases that support AUTOCOMMIT
+ isolation levels.
+
+ This special directive is intended to support the occasional database
+ DDL or system operation that specifically has to be run outside of
+ any kind of transaction block. The PostgreSQL database platform
+ is the most common target for this style of operation, as many
+ of its DDL operations must be run outside of transaction blocks, even
+ though the database overall supports transactional DDL.
+
+ The method is used as a context manager within a migration script, by
+ calling on :meth:`.Operations.get_context` to retrieve the
+ :class:`.MigrationContext`, then invoking
+ :meth:`.MigrationContext.autocommit_block` using the ``with:``
+ statement::
+
+ def upgrade():
+ with op.get_context().autocommit_block():
+ op.execute("ALTER TYPE mood ADD VALUE 'soso'")
+
+ Above, a PostgreSQL "ALTER TYPE..ADD VALUE" directive is emitted,
+ which must be run outside of a transaction block at the database level.
+ The :meth:`.MigrationContext.autocommit_block` method makes use of the
+ SQLAlchemy ``AUTOCOMMIT`` isolation level setting, which against the
+ psycogp2 DBAPI corresponds to the ``connection.autocommit`` setting,
+ to ensure that the database driver is not inside of a DBAPI level
+ transaction block.
+
+ .. warning::
+
+ As is necessary, **the database transaction preceding the block is
+ unconditionally committed**. This means that the run of migrations
+ preceding the operation will be committed, before the overall
+ migration operation is complete.
+
+ It is recommended that when an application includes migrations with
+ "autocommit" blocks, that
+ :paramref:`.EnvironmentContext.transaction_per_migration` be used
+ so that the calling environment is tuned to expect short per-file
+ migrations whether or not one of them has an autocommit block.
+
+
+ """
+ _in_connection_transaction = self._in_connection_transaction()
+
+ if self.impl.transactional_ddl and self.as_sql:
+ self.impl.emit_commit()
+
+ elif _in_connection_transaction:
+ assert self._transaction is not None
+
+ self._transaction.commit()
+ self._transaction = None
+
+ if not self.as_sql:
+ assert self.connection is not None
+ current_level = self.connection.get_isolation_level()
+ base_connection = self.connection
+
+ # in 1.3 and 1.4 non-future mode, the connection gets switched
+ # out. we can use the base connection with the new mode
+ # except that it will not know it's in "autocommit" and will
+ # emit deprecation warnings when an autocommit action takes
+ # place.
+ self.connection = self.impl.connection = (
+ base_connection.execution_options(isolation_level="AUTOCOMMIT")
+ )
+
+ # sqlalchemy future mode will "autobegin" in any case, so take
+ # control of that "transaction" here
+ fake_trans: Optional[Transaction] = self.connection.begin()
+ else:
+ fake_trans = None
+ try:
+ yield
+ finally:
+ if not self.as_sql:
+ assert self.connection is not None
+ if fake_trans is not None:
+ fake_trans.commit()
+ self.connection.execution_options(
+ isolation_level=current_level
+ )
+ self.connection = self.impl.connection = base_connection
+
+ if self.impl.transactional_ddl and self.as_sql:
+ self.impl.emit_begin()
+
+ elif _in_connection_transaction:
+ assert self.connection is not None
+ self._transaction = self.connection.begin()
+
+ def begin_transaction(
+ self, _per_migration: bool = False
+ ) -> Union[_ProxyTransaction, ContextManager[None, Optional[bool]]]:
+ """Begin a logical transaction for migration operations.
+
+ This method is used within an ``env.py`` script to demarcate where
+ the outer "transaction" for a series of migrations begins. Example::
+
+ def run_migrations_online():
+ connectable = create_engine(...)
+
+ with connectable.connect() as connection:
+ context.configure(
+ connection=connection, target_metadata=target_metadata
+ )
+
+ with context.begin_transaction():
+ context.run_migrations()
+
+ Above, :meth:`.MigrationContext.begin_transaction` is used to demarcate
+ where the outer logical transaction occurs around the
+ :meth:`.MigrationContext.run_migrations` operation.
+
+ A "Logical" transaction means that the operation may or may not
+ correspond to a real database transaction. If the target database
+ supports transactional DDL (or
+ :paramref:`.EnvironmentContext.configure.transactional_ddl` is true),
+ the :paramref:`.EnvironmentContext.configure.transaction_per_migration`
+ flag is not set, and the migration is against a real database
+ connection (as opposed to using "offline" ``--sql`` mode), a real
+ transaction will be started. If ``--sql`` mode is in effect, the
+ operation would instead correspond to a string such as "BEGIN" being
+ emitted to the string output.
+
+ The returned object is a Python context manager that should only be
+ used in the context of a ``with:`` statement as indicated above.
+ The object has no other guaranteed API features present.
+
+ .. seealso::
+
+ :meth:`.MigrationContext.autocommit_block`
+
+ """
+
+ if self._in_external_transaction:
+ return nullcontext()
+
+ if self.impl.transactional_ddl:
+ transaction_now = _per_migration == self._transaction_per_migration
+ else:
+ transaction_now = _per_migration is True
+
+ if not transaction_now:
+ return nullcontext()
+
+ elif not self.impl.transactional_ddl:
+ assert _per_migration
+
+ if self.as_sql:
+ return nullcontext()
+ else:
+ # track our own notion of a "transaction block", which must be
+ # committed when complete. Don't rely upon whether or not the
+ # SQLAlchemy connection reports as "in transaction"; this
+ # because SQLAlchemy future connection features autobegin
+ # behavior, so it may already be in a transaction from our
+ # emitting of queries like "has_version_table", etc. While we
+ # could track these operations as well, that leaves open the
+ # possibility of new operations or other things happening in
+ # the user environment that still may be triggering
+ # "autobegin".
+
+ in_transaction = self._transaction is not None
+
+ if in_transaction:
+ return nullcontext()
+ else:
+ assert self.connection is not None
+ self._transaction = (
+ sqla_compat._safe_begin_connection_transaction(
+ self.connection
+ )
+ )
+ return _ProxyTransaction(self)
+ elif self.as_sql:
+
+ @contextmanager
+ def begin_commit():
+ self.impl.emit_begin()
+ yield
+ self.impl.emit_commit()
+
+ return begin_commit()
+ else:
+ assert self.connection is not None
+ self._transaction = sqla_compat._safe_begin_connection_transaction(
+ self.connection
+ )
+ return _ProxyTransaction(self)
+
+ def get_current_revision(self) -> Optional[str]:
+ """Return the current revision, usually that which is present
+ in the ``alembic_version`` table in the database.
+
+ This method intends to be used only for a migration stream that
+ does not contain unmerged branches in the target database;
+ if there are multiple branches present, an exception is raised.
+ The :meth:`.MigrationContext.get_current_heads` should be preferred
+ over this method going forward in order to be compatible with
+ branch migration support.
+
+ If this :class:`.MigrationContext` was configured in "offline"
+ mode, that is with ``as_sql=True``, the ``starting_rev``
+ parameter is returned instead, if any.
+
+ """
+ heads = self.get_current_heads()
+ if len(heads) == 0:
+ return None
+ elif len(heads) > 1:
+ raise util.CommandError(
+ "Version table '%s' has more than one head present; "
+ "please use get_current_heads()" % self.version_table
+ )
+ else:
+ return heads[0]
+
+ def get_current_heads(self) -> Tuple[str, ...]:
+ """Return a tuple of the current 'head versions' that are represented
+ in the target database.
+
+ For a migration stream without branches, this will be a single
+ value, synonymous with that of
+ :meth:`.MigrationContext.get_current_revision`. However when multiple
+ unmerged branches exist within the target database, the returned tuple
+ will contain a value for each head.
+
+ If this :class:`.MigrationContext` was configured in "offline"
+ mode, that is with ``as_sql=True``, the ``starting_rev``
+ parameter is returned in a one-length tuple.
+
+ If no version table is present, or if there are no revisions
+ present, an empty tuple is returned.
+
+ """
+ if self.as_sql:
+ start_from_rev: Any = self._start_from_rev
+ if start_from_rev == "base":
+ start_from_rev = None
+ elif start_from_rev is not None and self.script:
+ start_from_rev = [
+ self.script.get_revision(sfr).revision
+ for sfr in util.to_list(start_from_rev)
+ if sfr not in (None, "base")
+ ]
+ return util.to_tuple(start_from_rev, default=())
+ else:
+ if self._start_from_rev:
+ raise util.CommandError(
+ "Can't specify current_rev to context "
+ "when using a database connection"
+ )
+ if not self._has_version_table():
+ return ()
+ assert self.connection is not None
+ return tuple(
+ row[0]
+ for row in self.connection.execute(
+ select(self._version.c.version_num)
+ )
+ )
+
+ def _ensure_version_table(self, purge: bool = False) -> None:
+ with sqla_compat._ensure_scope_for_ddl(self.connection):
+ assert self.connection is not None
+ self._version.create(self.connection, checkfirst=True)
+ if purge:
+ assert self.connection is not None
+ self.connection.execute(self._version.delete())
+
+ def _has_version_table(self) -> bool:
+ assert self.connection is not None
+ return sqla_compat._connectable_has_table(
+ self.connection, self.version_table, self.version_table_schema
+ )
+
+ def stamp(self, script_directory: ScriptDirectory, revision: str) -> None:
+ """Stamp the version table with a specific revision.
+
+ This method calculates those branches to which the given revision
+ can apply, and updates those branches as though they were migrated
+ towards that revision (either up or down). If no current branches
+ include the revision, it is added as a new branch head.
+
+ """
+ heads = self.get_current_heads()
+ if not self.as_sql and not heads:
+ self._ensure_version_table()
+ head_maintainer = HeadMaintainer(self, heads)
+ for step in script_directory._stamp_revs(revision, heads):
+ head_maintainer.update_to_step(step)
+
+ def run_migrations(self, **kw: Any) -> None:
+ r"""Run the migration scripts established for this
+ :class:`.MigrationContext`, if any.
+
+ The commands in :mod:`alembic.command` will set up a function
+ that is ultimately passed to the :class:`.MigrationContext`
+ as the ``fn`` argument. This function represents the "work"
+ that will be done when :meth:`.MigrationContext.run_migrations`
+ is called, typically from within the ``env.py`` script of the
+ migration environment. The "work function" then provides an iterable
+ of version callables and other version information which
+ in the case of the ``upgrade`` or ``downgrade`` commands are the
+ list of version scripts to invoke. Other commands yield nothing,
+ in the case that a command wants to run some other operation
+ against the database such as the ``current`` or ``stamp`` commands.
+
+ :param \**kw: keyword arguments here will be passed to each
+ migration callable, that is the ``upgrade()`` or ``downgrade()``
+ method within revision scripts.
+
+ """
+ self.impl.start_migrations()
+
+ heads: Tuple[str, ...]
+ if self.purge:
+ if self.as_sql:
+ raise util.CommandError("Can't use --purge with --sql mode")
+ self._ensure_version_table(purge=True)
+ heads = ()
+ else:
+ heads = self.get_current_heads()
+
+ dont_mutate = self.opts.get("dont_mutate", False)
+
+ if not self.as_sql and not heads and not dont_mutate:
+ self._ensure_version_table()
+
+ head_maintainer = HeadMaintainer(self, heads)
+
+ assert self._migrations_fn is not None
+ for step in self._migrations_fn(heads, self):
+ with self.begin_transaction(_per_migration=True):
+ if self.as_sql and not head_maintainer.heads:
+ # for offline mode, include a CREATE TABLE from
+ # the base
+ assert self.connection is not None
+ self._version.create(self.connection)
+ log.info("Running %s", step)
+ if self.as_sql:
+ self.impl.static_output(
+ "-- Running %s" % (step.short_log,)
+ )
+ step.migration_fn(**kw)
+
+ # previously, we wouldn't stamp per migration
+ # if we were in a transaction, however given the more
+ # complex model that involves any number of inserts
+ # and row-targeted updates and deletes, it's simpler for now
+ # just to run the operations on every version
+ head_maintainer.update_to_step(step)
+ for callback in self.on_version_apply_callbacks:
+ callback(
+ ctx=self,
+ step=step.info,
+ heads=set(head_maintainer.heads),
+ run_args=kw,
+ )
+
+ if self.as_sql and not head_maintainer.heads:
+ assert self.connection is not None
+ self._version.drop(self.connection)
+
+ def _in_connection_transaction(self) -> bool:
+ try:
+ meth = self.connection.in_transaction # type:ignore[union-attr]
+ except AttributeError:
+ return False
+ else:
+ return meth()
+
+ def execute(
+ self,
+ sql: Union[Executable, str],
+ execution_options: Optional[Dict[str, Any]] = None,
+ ) -> None:
+ """Execute a SQL construct or string statement.
+
+ The underlying execution mechanics are used, that is
+ if this is "offline mode" the SQL is written to the
+ output buffer, otherwise the SQL is emitted on
+ the current SQLAlchemy connection.
+
+ """
+ self.impl._exec(sql, execution_options)
+
+ def _stdout_connection(
+ self, connection: Optional[Connection]
+ ) -> MockConnection:
+ def dump(construct, *multiparams, **params):
+ self.impl._exec(construct)
+
+ return MockEngineStrategy.MockConnection(self.dialect, dump)
+
+ @property
+ def bind(self) -> Optional[Connection]:
+ """Return the current "bind".
+
+ In online mode, this is an instance of
+ :class:`sqlalchemy.engine.Connection`, and is suitable
+ for ad-hoc execution of any kind of usage described
+ in SQLAlchemy Core documentation as well as
+ for usage with the :meth:`sqlalchemy.schema.Table.create`
+ and :meth:`sqlalchemy.schema.MetaData.create_all` methods
+ of :class:`~sqlalchemy.schema.Table`,
+ :class:`~sqlalchemy.schema.MetaData`.
+
+ Note that when "standard output" mode is enabled,
+ this bind will be a "mock" connection handler that cannot
+ return results and is only appropriate for a very limited
+ subset of commands.
+
+ """
+ return self.connection
+
+ @property
+ def config(self) -> Optional[Config]:
+ """Return the :class:`.Config` used by the current environment,
+ if any."""
+
+ if self.environment_context:
+ return self.environment_context.config
+ else:
+ return None
+
+ def _compare_type(
+ self, inspector_column: Column[Any], metadata_column: Column
+ ) -> bool:
+ if self._user_compare_type is False:
+ return False
+
+ if callable(self._user_compare_type):
+ user_value = self._user_compare_type(
+ self,
+ inspector_column,
+ metadata_column,
+ inspector_column.type,
+ metadata_column.type,
+ )
+ if user_value is not None:
+ return user_value
+
+ return self.impl.compare_type(inspector_column, metadata_column)
+
+ def _compare_server_default(
+ self,
+ inspector_column: Column[Any],
+ metadata_column: Column[Any],
+ rendered_metadata_default: Optional[str],
+ rendered_column_default: Optional[str],
+ ) -> bool:
+ if self._user_compare_server_default is False:
+ return False
+
+ if callable(self._user_compare_server_default):
+ user_value = self._user_compare_server_default(
+ self,
+ inspector_column,
+ metadata_column,
+ rendered_column_default,
+ metadata_column.server_default,
+ rendered_metadata_default,
+ )
+ if user_value is not None:
+ return user_value
+
+ return self.impl.compare_server_default(
+ inspector_column,
+ metadata_column,
+ rendered_metadata_default,
+ rendered_column_default,
+ )
+
+
+class HeadMaintainer:
+ def __init__(self, context: MigrationContext, heads: Any) -> None:
+ self.context = context
+ self.heads = set(heads)
+
+ def _insert_version(self, version: str) -> None:
+ assert version not in self.heads
+ self.heads.add(version)
+
+ self.context.impl._exec(
+ self.context._version.insert().values(
+ version_num=literal_column("'%s'" % version)
+ )
+ )
+
+ def _delete_version(self, version: str) -> None:
+ self.heads.remove(version)
+
+ ret = self.context.impl._exec(
+ self.context._version.delete().where(
+ self.context._version.c.version_num
+ == literal_column("'%s'" % version)
+ )
+ )
+
+ if (
+ not self.context.as_sql
+ and self.context.dialect.supports_sane_rowcount
+ and ret is not None
+ and ret.rowcount != 1
+ ):
+ raise util.CommandError(
+ "Online migration expected to match one "
+ "row when deleting '%s' in '%s'; "
+ "%d found"
+ % (version, self.context.version_table, ret.rowcount)
+ )
+
+ def _update_version(self, from_: str, to_: str) -> None:
+ assert to_ not in self.heads
+ self.heads.remove(from_)
+ self.heads.add(to_)
+
+ ret = self.context.impl._exec(
+ self.context._version.update()
+ .values(version_num=literal_column("'%s'" % to_))
+ .where(
+ self.context._version.c.version_num
+ == literal_column("'%s'" % from_)
+ )
+ )
+
+ if (
+ not self.context.as_sql
+ and self.context.dialect.supports_sane_rowcount
+ and ret is not None
+ and ret.rowcount != 1
+ ):
+ raise util.CommandError(
+ "Online migration expected to match one "
+ "row when updating '%s' to '%s' in '%s'; "
+ "%d found"
+ % (from_, to_, self.context.version_table, ret.rowcount)
+ )
+
+ def update_to_step(self, step: Union[RevisionStep, StampStep]) -> None:
+ if step.should_delete_branch(self.heads):
+ vers = step.delete_version_num
+ log.debug("branch delete %s", vers)
+ self._delete_version(vers)
+ elif step.should_create_branch(self.heads):
+ vers = step.insert_version_num
+ log.debug("new branch insert %s", vers)
+ self._insert_version(vers)
+ elif step.should_merge_branches(self.heads):
+ # delete revs, update from rev, update to rev
+ (
+ delete_revs,
+ update_from_rev,
+ update_to_rev,
+ ) = step.merge_branch_idents(self.heads)
+ log.debug(
+ "merge, delete %s, update %s to %s",
+ delete_revs,
+ update_from_rev,
+ update_to_rev,
+ )
+ for delrev in delete_revs:
+ self._delete_version(delrev)
+ self._update_version(update_from_rev, update_to_rev)
+ elif step.should_unmerge_branches(self.heads):
+ (
+ update_from_rev,
+ update_to_rev,
+ insert_revs,
+ ) = step.unmerge_branch_idents(self.heads)
+ log.debug(
+ "unmerge, insert %s, update %s to %s",
+ insert_revs,
+ update_from_rev,
+ update_to_rev,
+ )
+ for insrev in insert_revs:
+ self._insert_version(insrev)
+ self._update_version(update_from_rev, update_to_rev)
+ else:
+ from_, to_ = step.update_version_num(self.heads)
+ log.debug("update %s to %s", from_, to_)
+ self._update_version(from_, to_)
+
+
+class MigrationInfo:
+ """Exposes information about a migration step to a callback listener.
+
+ The :class:`.MigrationInfo` object is available exclusively for the
+ benefit of the :paramref:`.EnvironmentContext.on_version_apply`
+ callback hook.
+
+ """
+
+ is_upgrade: bool
+ """True/False: indicates whether this operation ascends or descends the
+ version tree."""
+
+ is_stamp: bool
+ """True/False: indicates whether this operation is a stamp (i.e. whether
+ it results in any actual database operations)."""
+
+ up_revision_id: Optional[str]
+ """Version string corresponding to :attr:`.Revision.revision`.
+
+ In the case of a stamp operation, it is advised to use the
+ :attr:`.MigrationInfo.up_revision_ids` tuple as a stamp operation can
+ make a single movement from one or more branches down to a single
+ branchpoint, in which case there will be multiple "up" revisions.
+
+ .. seealso::
+
+ :attr:`.MigrationInfo.up_revision_ids`
+
+ """
+
+ up_revision_ids: Tuple[str, ...]
+ """Tuple of version strings corresponding to :attr:`.Revision.revision`.
+
+ In the majority of cases, this tuple will be a single value, synonymous
+ with the scalar value of :attr:`.MigrationInfo.up_revision_id`.
+ It can be multiple revision identifiers only in the case of an
+ ``alembic stamp`` operation which is moving downwards from multiple
+ branches down to their common branch point.
+
+ """
+
+ down_revision_ids: Tuple[str, ...]
+ """Tuple of strings representing the base revisions of this migration step.
+
+ If empty, this represents a root revision; otherwise, the first item
+ corresponds to :attr:`.Revision.down_revision`, and the rest are inferred
+ from dependencies.
+ """
+
+ revision_map: RevisionMap
+ """The revision map inside of which this operation occurs."""
+
+ def __init__(
+ self,
+ revision_map: RevisionMap,
+ is_upgrade: bool,
+ is_stamp: bool,
+ up_revisions: Union[str, Tuple[str, ...]],
+ down_revisions: Union[str, Tuple[str, ...]],
+ ) -> None:
+ self.revision_map = revision_map
+ self.is_upgrade = is_upgrade
+ self.is_stamp = is_stamp
+ self.up_revision_ids = util.to_tuple(up_revisions, default=())
+ if self.up_revision_ids:
+ self.up_revision_id = self.up_revision_ids[0]
+ else:
+ # this should never be the case with
+ # "upgrade", "downgrade", or "stamp" as we are always
+ # measuring movement in terms of at least one upgrade version
+ self.up_revision_id = None
+ self.down_revision_ids = util.to_tuple(down_revisions, default=())
+
+ @property
+ def is_migration(self) -> bool:
+ """True/False: indicates whether this operation is a migration.
+
+ At present this is true if and only the migration is not a stamp.
+ If other operation types are added in the future, both this attribute
+ and :attr:`~.MigrationInfo.is_stamp` will be false.
+ """
+ return not self.is_stamp
+
+ @property
+ def source_revision_ids(self) -> Tuple[str, ...]:
+ """Active revisions before this migration step is applied."""
+ return (
+ self.down_revision_ids if self.is_upgrade else self.up_revision_ids
+ )
+
+ @property
+ def destination_revision_ids(self) -> Tuple[str, ...]:
+ """Active revisions after this migration step is applied."""
+ return (
+ self.up_revision_ids if self.is_upgrade else self.down_revision_ids
+ )
+
+ @property
+ def up_revision(self) -> Optional[Revision]:
+ """Get :attr:`~.MigrationInfo.up_revision_id` as
+ a :class:`.Revision`.
+
+ """
+ return self.revision_map.get_revision(self.up_revision_id)
+
+ @property
+ def up_revisions(self) -> Tuple[Optional[_RevisionOrBase], ...]:
+ """Get :attr:`~.MigrationInfo.up_revision_ids` as a
+ :class:`.Revision`."""
+ return self.revision_map.get_revisions(self.up_revision_ids)
+
+ @property
+ def down_revisions(self) -> Tuple[Optional[_RevisionOrBase], ...]:
+ """Get :attr:`~.MigrationInfo.down_revision_ids` as a tuple of
+ :class:`Revisions <.Revision>`."""
+ return self.revision_map.get_revisions(self.down_revision_ids)
+
+ @property
+ def source_revisions(self) -> Tuple[Optional[_RevisionOrBase], ...]:
+ """Get :attr:`~MigrationInfo.source_revision_ids` as a tuple of
+ :class:`Revisions <.Revision>`."""
+ return self.revision_map.get_revisions(self.source_revision_ids)
+
+ @property
+ def destination_revisions(self) -> Tuple[Optional[_RevisionOrBase], ...]:
+ """Get :attr:`~MigrationInfo.destination_revision_ids` as a tuple of
+ :class:`Revisions <.Revision>`."""
+ return self.revision_map.get_revisions(self.destination_revision_ids)
+
+
+class MigrationStep:
+ from_revisions_no_deps: Tuple[str, ...]
+ to_revisions_no_deps: Tuple[str, ...]
+ is_upgrade: bool
+ migration_fn: Any
+
+ if TYPE_CHECKING:
+
+ @property
+ def doc(self) -> Optional[str]: ...
+
+ @property
+ def name(self) -> str:
+ return self.migration_fn.__name__
+
+ @classmethod
+ def upgrade_from_script(
+ cls, revision_map: RevisionMap, script: Script
+ ) -> RevisionStep:
+ return RevisionStep(revision_map, script, True)
+
+ @classmethod
+ def downgrade_from_script(
+ cls, revision_map: RevisionMap, script: Script
+ ) -> RevisionStep:
+ return RevisionStep(revision_map, script, False)
+
+ @property
+ def is_downgrade(self) -> bool:
+ return not self.is_upgrade
+
+ @property
+ def short_log(self) -> str:
+ return "%s %s -> %s" % (
+ self.name,
+ util.format_as_comma(self.from_revisions_no_deps),
+ util.format_as_comma(self.to_revisions_no_deps),
+ )
+
+ def __str__(self):
+ if self.doc:
+ return "%s %s -> %s, %s" % (
+ self.name,
+ util.format_as_comma(self.from_revisions_no_deps),
+ util.format_as_comma(self.to_revisions_no_deps),
+ self.doc,
+ )
+ else:
+ return self.short_log
+
+
+class RevisionStep(MigrationStep):
+ def __init__(
+ self, revision_map: RevisionMap, revision: Script, is_upgrade: bool
+ ) -> None:
+ self.revision_map = revision_map
+ self.revision = revision
+ self.is_upgrade = is_upgrade
+ if is_upgrade:
+ self.migration_fn = revision.module.upgrade
+ else:
+ self.migration_fn = revision.module.downgrade
+
+ def __repr__(self):
+ return "RevisionStep(%r, is_upgrade=%r)" % (
+ self.revision.revision,
+ self.is_upgrade,
+ )
+
+ def __eq__(self, other: object) -> bool:
+ return (
+ isinstance(other, RevisionStep)
+ and other.revision == self.revision
+ and self.is_upgrade == other.is_upgrade
+ )
+
+ @property
+ def doc(self) -> Optional[str]:
+ return self.revision.doc
+
+ @property
+ def from_revisions(self) -> Tuple[str, ...]:
+ if self.is_upgrade:
+ return self.revision._normalized_down_revisions
+ else:
+ return (self.revision.revision,)
+
+ @property
+ def from_revisions_no_deps( # type:ignore[override]
+ self,
+ ) -> Tuple[str, ...]:
+ if self.is_upgrade:
+ return self.revision._versioned_down_revisions
+ else:
+ return (self.revision.revision,)
+
+ @property
+ def to_revisions(self) -> Tuple[str, ...]:
+ if self.is_upgrade:
+ return (self.revision.revision,)
+ else:
+ return self.revision._normalized_down_revisions
+
+ @property
+ def to_revisions_no_deps( # type:ignore[override]
+ self,
+ ) -> Tuple[str, ...]:
+ if self.is_upgrade:
+ return (self.revision.revision,)
+ else:
+ return self.revision._versioned_down_revisions
+
+ @property
+ def _has_scalar_down_revision(self) -> bool:
+ return len(self.revision._normalized_down_revisions) == 1
+
+ def should_delete_branch(self, heads: Set[str]) -> bool:
+ """A delete is when we are a. in a downgrade and b.
+ we are going to the "base" or we are going to a version that
+ is implied as a dependency on another version that is remaining.
+
+ """
+ if not self.is_downgrade:
+ return False
+
+ if self.revision.revision not in heads:
+ return False
+
+ downrevs = self.revision._normalized_down_revisions
+
+ if not downrevs:
+ # is a base
+ return True
+ else:
+ # determine what the ultimate "to_revisions" for an
+ # unmerge would be. If there are none, then we're a delete.
+ to_revisions = self._unmerge_to_revisions(heads)
+ return not to_revisions
+
+ def merge_branch_idents(
+ self, heads: Set[str]
+ ) -> Tuple[List[str], str, str]:
+ other_heads = set(heads).difference(self.from_revisions)
+
+ if other_heads:
+ ancestors = {
+ r.revision
+ for r in self.revision_map._get_ancestor_nodes(
+ self.revision_map.get_revisions(other_heads), check=False
+ )
+ }
+ from_revisions = list(
+ set(self.from_revisions).difference(ancestors)
+ )
+ else:
+ from_revisions = list(self.from_revisions)
+
+ return (
+ # delete revs, update from rev, update to rev
+ list(from_revisions[0:-1]),
+ from_revisions[-1],
+ self.to_revisions[0],
+ )
+
+ def _unmerge_to_revisions(self, heads: Set[str]) -> Tuple[str, ...]:
+ other_heads = set(heads).difference([self.revision.revision])
+ if other_heads:
+ ancestors = {
+ r.revision
+ for r in self.revision_map._get_ancestor_nodes(
+ self.revision_map.get_revisions(other_heads), check=False
+ )
+ }
+ return tuple(set(self.to_revisions).difference(ancestors))
+ else:
+ # for each revision we plan to return, compute its ancestors
+ # (excluding self), and remove those from the final output since
+ # they are already accounted for.
+ ancestors = {
+ r.revision
+ for to_revision in self.to_revisions
+ for r in self.revision_map._get_ancestor_nodes(
+ self.revision_map.get_revisions(to_revision), check=False
+ )
+ if r.revision != to_revision
+ }
+ return tuple(set(self.to_revisions).difference(ancestors))
+
+ def unmerge_branch_idents(
+ self, heads: Set[str]
+ ) -> Tuple[str, str, Tuple[str, ...]]:
+ to_revisions = self._unmerge_to_revisions(heads)
+
+ return (
+ # update from rev, update to rev, insert revs
+ self.from_revisions[0],
+ to_revisions[-1],
+ to_revisions[0:-1],
+ )
+
+ def should_create_branch(self, heads: Set[str]) -> bool:
+ if not self.is_upgrade:
+ return False
+
+ downrevs = self.revision._normalized_down_revisions
+
+ if not downrevs:
+ # is a base
+ return True
+ else:
+ # none of our downrevs are present, so...
+ # we have to insert our version. This is true whether
+ # or not there is only one downrev, or multiple (in the latter
+ # case, we're a merge point.)
+ if not heads.intersection(downrevs):
+ return True
+ else:
+ return False
+
+ def should_merge_branches(self, heads: Set[str]) -> bool:
+ if not self.is_upgrade:
+ return False
+
+ downrevs = self.revision._normalized_down_revisions
+
+ if len(downrevs) > 1 and len(heads.intersection(downrevs)) > 1:
+ return True
+
+ return False
+
+ def should_unmerge_branches(self, heads: Set[str]) -> bool:
+ if not self.is_downgrade:
+ return False
+
+ downrevs = self.revision._normalized_down_revisions
+
+ if self.revision.revision in heads and len(downrevs) > 1:
+ return True
+
+ return False
+
+ def update_version_num(self, heads: Set[str]) -> Tuple[str, str]:
+ if not self._has_scalar_down_revision:
+ downrev = heads.intersection(
+ self.revision._normalized_down_revisions
+ )
+ assert (
+ len(downrev) == 1
+ ), "Can't do an UPDATE because downrevision is ambiguous"
+ down_revision = list(downrev)[0]
+ else:
+ down_revision = self.revision._normalized_down_revisions[0]
+
+ if self.is_upgrade:
+ return down_revision, self.revision.revision
+ else:
+ return self.revision.revision, down_revision
+
+ @property
+ def delete_version_num(self) -> str:
+ return self.revision.revision
+
+ @property
+ def insert_version_num(self) -> str:
+ return self.revision.revision
+
+ @property
+ def info(self) -> MigrationInfo:
+ return MigrationInfo(
+ revision_map=self.revision_map,
+ up_revisions=self.revision.revision,
+ down_revisions=self.revision._normalized_down_revisions,
+ is_upgrade=self.is_upgrade,
+ is_stamp=False,
+ )
+
+
+class StampStep(MigrationStep):
+ def __init__(
+ self,
+ from_: Optional[Union[str, Collection[str]]],
+ to_: Optional[Union[str, Collection[str]]],
+ is_upgrade: bool,
+ branch_move: bool,
+ revision_map: Optional[RevisionMap] = None,
+ ) -> None:
+ self.from_: Tuple[str, ...] = util.to_tuple(from_, default=())
+ self.to_: Tuple[str, ...] = util.to_tuple(to_, default=())
+ self.is_upgrade = is_upgrade
+ self.branch_move = branch_move
+ self.migration_fn = self.stamp_revision
+ self.revision_map = revision_map
+
+ doc: Optional[str] = None
+
+ def stamp_revision(self, **kw: Any) -> None:
+ return None
+
+ def __eq__(self, other):
+ return (
+ isinstance(other, StampStep)
+ and other.from_revisions == self.from_revisions
+ and other.to_revisions == self.to_revisions
+ and other.branch_move == self.branch_move
+ and self.is_upgrade == other.is_upgrade
+ )
+
+ @property
+ def from_revisions(self):
+ return self.from_
+
+ @property
+ def to_revisions(self) -> Tuple[str, ...]:
+ return self.to_
+
+ @property
+ def from_revisions_no_deps( # type:ignore[override]
+ self,
+ ) -> Tuple[str, ...]:
+ return self.from_
+
+ @property
+ def to_revisions_no_deps( # type:ignore[override]
+ self,
+ ) -> Tuple[str, ...]:
+ return self.to_
+
+ @property
+ def delete_version_num(self) -> str:
+ assert len(self.from_) == 1
+ return self.from_[0]
+
+ @property
+ def insert_version_num(self) -> str:
+ assert len(self.to_) == 1
+ return self.to_[0]
+
+ def update_version_num(self, heads: Set[str]) -> Tuple[str, str]:
+ assert len(self.from_) == 1
+ assert len(self.to_) == 1
+ return self.from_[0], self.to_[0]
+
+ def merge_branch_idents(
+ self, heads: Union[Set[str], List[str]]
+ ) -> Union[Tuple[List[Any], str, str], Tuple[List[str], str, str]]:
+ return (
+ # delete revs, update from rev, update to rev
+ list(self.from_[0:-1]),
+ self.from_[-1],
+ self.to_[0],
+ )
+
+ def unmerge_branch_idents(
+ self, heads: Set[str]
+ ) -> Tuple[str, str, List[str]]:
+ return (
+ # update from rev, update to rev, insert revs
+ self.from_[0],
+ self.to_[-1],
+ list(self.to_[0:-1]),
+ )
+
+ def should_delete_branch(self, heads: Set[str]) -> bool:
+ # TODO: we probably need to look for self.to_ inside of heads,
+ # in a similar manner as should_create_branch, however we have
+ # no tests for this yet (stamp downgrades w/ branches)
+ return self.is_downgrade and self.branch_move
+
+ def should_create_branch(self, heads: Set[str]) -> Union[Set[str], bool]:
+ return (
+ self.is_upgrade
+ and (self.branch_move or set(self.from_).difference(heads))
+ and set(self.to_).difference(heads)
+ )
+
+ def should_merge_branches(self, heads: Set[str]) -> bool:
+ return len(self.from_) > 1
+
+ def should_unmerge_branches(self, heads: Set[str]) -> bool:
+ return len(self.to_) > 1
+
+ @property
+ def info(self) -> MigrationInfo:
+ up, down = (
+ (self.to_, self.from_)
+ if self.is_upgrade
+ else (self.from_, self.to_)
+ )
+ assert self.revision_map is not None
+ return MigrationInfo(
+ revision_map=self.revision_map,
+ up_revisions=up,
+ down_revisions=down,
+ is_upgrade=self.is_upgrade,
+ is_stamp=True,
+ )