about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/alembic/operations
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/operations')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/operations/__init__.py15
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/operations/base.py1906
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/operations/batch.py718
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/operations/ops.py2799
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/operations/schemaobj.py290
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/operations/toimpl.py225
6 files changed, 5953 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/operations/__init__.py b/.venv/lib/python3.12/site-packages/alembic/operations/__init__.py
new file mode 100644
index 00000000..26197cbe
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/operations/__init__.py
@@ -0,0 +1,15 @@
+from . import toimpl
+from .base import AbstractOperations
+from .base import BatchOperations
+from .base import Operations
+from .ops import MigrateOperation
+from .ops import MigrationScript
+
+
+__all__ = [
+    "AbstractOperations",
+    "Operations",
+    "BatchOperations",
+    "MigrateOperation",
+    "MigrationScript",
+]
diff --git a/.venv/lib/python3.12/site-packages/alembic/operations/base.py b/.venv/lib/python3.12/site-packages/alembic/operations/base.py
new file mode 100644
index 00000000..456d1c75
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/operations/base.py
@@ -0,0 +1,1906 @@
+# mypy: allow-untyped-calls
+
+from __future__ import annotations
+
+from contextlib import contextmanager
+import re
+import textwrap
+from typing import Any
+from typing import Awaitable
+from typing import Callable
+from typing import Dict
+from typing import Iterator
+from typing import List  # noqa
+from typing import Mapping
+from typing import NoReturn
+from typing import Optional
+from typing import overload
+from typing import Sequence  # noqa
+from typing import Tuple
+from typing import Type  # noqa
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from sqlalchemy.sql.elements import conv
+
+from . import batch
+from . import schemaobj
+from .. import util
+from ..util import sqla_compat
+from ..util.compat import formatannotation_fwdref
+from ..util.compat import inspect_formatargspec
+from ..util.compat import inspect_getfullargspec
+from ..util.sqla_compat import _literal_bindparam
+
+
+if TYPE_CHECKING:
+    from typing import Literal
+
+    from sqlalchemy import Table
+    from sqlalchemy.engine import Connection
+    from sqlalchemy.sql import Executable
+    from sqlalchemy.sql.expression import ColumnElement
+    from sqlalchemy.sql.expression import TableClause
+    from sqlalchemy.sql.expression import TextClause
+    from sqlalchemy.sql.schema import Column
+    from sqlalchemy.sql.schema import Computed
+    from sqlalchemy.sql.schema import Identity
+    from sqlalchemy.sql.schema import SchemaItem
+    from sqlalchemy.types import TypeEngine
+
+    from .batch import BatchOperationsImpl
+    from .ops import AddColumnOp
+    from .ops import AddConstraintOp
+    from .ops import AlterColumnOp
+    from .ops import AlterTableOp
+    from .ops import BulkInsertOp
+    from .ops import CreateIndexOp
+    from .ops import CreateTableCommentOp
+    from .ops import CreateTableOp
+    from .ops import DropColumnOp
+    from .ops import DropConstraintOp
+    from .ops import DropIndexOp
+    from .ops import DropTableCommentOp
+    from .ops import DropTableOp
+    from .ops import ExecuteSQLOp
+    from .ops import MigrateOperation
+    from ..ddl import DefaultImpl
+    from ..runtime.migration import MigrationContext
+__all__ = ("Operations", "BatchOperations")
+_T = TypeVar("_T")
+
+_C = TypeVar("_C", bound=Callable[..., Any])
+
+
+class AbstractOperations(util.ModuleClsProxy):
+    """Base class for Operations and BatchOperations.
+
+    .. versionadded:: 1.11.0
+
+    """
+
+    impl: Union[DefaultImpl, BatchOperationsImpl]
+    _to_impl = util.Dispatcher()
+
+    def __init__(
+        self,
+        migration_context: MigrationContext,
+        impl: Optional[BatchOperationsImpl] = None,
+    ) -> None:
+        """Construct a new :class:`.Operations`
+
+        :param migration_context: a :class:`.MigrationContext`
+         instance.
+
+        """
+        self.migration_context = migration_context
+        if impl is None:
+            self.impl = migration_context.impl
+        else:
+            self.impl = impl
+
+        self.schema_obj = schemaobj.SchemaObjects(migration_context)
+
+    @classmethod
+    def register_operation(
+        cls, name: str, sourcename: Optional[str] = None
+    ) -> Callable[[Type[_T]], Type[_T]]:
+        """Register a new operation for this class.
+
+        This method is normally used to add new operations
+        to the :class:`.Operations` class, and possibly the
+        :class:`.BatchOperations` class as well.   All Alembic migration
+        operations are implemented via this system, however the system
+        is also available as a public API to facilitate adding custom
+        operations.
+
+        .. seealso::
+
+            :ref:`operation_plugins`
+
+
+        """
+
+        def register(op_cls: Type[_T]) -> Type[_T]:
+            if sourcename is None:
+                fn = getattr(op_cls, name)
+                source_name = fn.__name__
+            else:
+                fn = getattr(op_cls, sourcename)
+                source_name = fn.__name__
+
+            spec = inspect_getfullargspec(fn)
+
+            name_args = spec[0]
+            assert name_args[0:2] == ["cls", "operations"]
+
+            name_args[0:2] = ["self"]
+
+            args = inspect_formatargspec(
+                *spec, formatannotation=formatannotation_fwdref
+            )
+            num_defaults = len(spec[3]) if spec[3] else 0
+
+            defaulted_vals: Tuple[Any, ...]
+
+            if num_defaults:
+                defaulted_vals = tuple(name_args[0 - num_defaults :])
+            else:
+                defaulted_vals = ()
+
+            defaulted_vals += tuple(spec[4])
+            # here, we are using formatargspec in a different way in order
+            # to get a string that will re-apply incoming arguments to a new
+            # function call
+
+            apply_kw = inspect_formatargspec(
+                name_args + spec[4],
+                spec[1],
+                spec[2],
+                defaulted_vals,
+                formatvalue=lambda x: "=" + x,
+                formatannotation=formatannotation_fwdref,
+            )
+
+            args = re.sub(
+                r'[_]?ForwardRef\(([\'"].+?[\'"])\)',
+                lambda m: m.group(1),
+                args,
+            )
+
+            func_text = textwrap.dedent(
+                """\
+            def %(name)s%(args)s:
+                %(doc)r
+                return op_cls.%(source_name)s%(apply_kw)s
+            """
+                % {
+                    "name": name,
+                    "source_name": source_name,
+                    "args": args,
+                    "apply_kw": apply_kw,
+                    "doc": fn.__doc__,
+                }
+            )
+
+            globals_ = dict(globals())
+            globals_.update({"op_cls": op_cls})
+            lcl: Dict[str, Any] = {}
+
+            exec(func_text, globals_, lcl)
+            setattr(cls, name, lcl[name])
+            fn.__func__.__doc__ = (
+                "This method is proxied on "
+                "the :class:`.%s` class, via the :meth:`.%s.%s` method."
+                % (cls.__name__, cls.__name__, name)
+            )
+            if hasattr(fn, "_legacy_translations"):
+                lcl[name]._legacy_translations = fn._legacy_translations
+            return op_cls
+
+        return register
+
+    @classmethod
+    def implementation_for(cls, op_cls: Any) -> Callable[[_C], _C]:
+        """Register an implementation for a given :class:`.MigrateOperation`.
+
+        This is part of the operation extensibility API.
+
+        .. seealso::
+
+            :ref:`operation_plugins` - example of use
+
+        """
+
+        def decorate(fn: _C) -> _C:
+            cls._to_impl.dispatch_for(op_cls)(fn)
+            return fn
+
+        return decorate
+
+    @classmethod
+    @contextmanager
+    def context(
+        cls, migration_context: MigrationContext
+    ) -> Iterator[Operations]:
+        op = Operations(migration_context)
+        op._install_proxy()
+        yield op
+        op._remove_proxy()
+
+    @contextmanager
+    def batch_alter_table(
+        self,
+        table_name: str,
+        schema: Optional[str] = None,
+        recreate: Literal["auto", "always", "never"] = "auto",
+        partial_reordering: Optional[Tuple[Any, ...]] = None,
+        copy_from: Optional[Table] = None,
+        table_args: Tuple[Any, ...] = (),
+        table_kwargs: Mapping[str, Any] = util.immutabledict(),
+        reflect_args: Tuple[Any, ...] = (),
+        reflect_kwargs: Mapping[str, Any] = util.immutabledict(),
+        naming_convention: Optional[Dict[str, str]] = None,
+    ) -> Iterator[BatchOperations]:
+        """Invoke a series of per-table migrations in batch.
+
+        Batch mode allows a series of operations specific to a table
+        to be syntactically grouped together, and allows for alternate
+        modes of table migration, in particular the "recreate" style of
+        migration required by SQLite.
+
+        "recreate" style is as follows:
+
+        1. A new table is created with the new specification, based on the
+           migration directives within the batch, using a temporary name.
+
+        2. the data copied from the existing table to the new table.
+
+        3. the existing table is dropped.
+
+        4. the new table is renamed to the existing table name.
+
+        The directive by default will only use "recreate" style on the
+        SQLite backend, and only if directives are present which require
+        this form, e.g. anything other than ``add_column()``.   The batch
+        operation on other backends will proceed using standard ALTER TABLE
+        operations.
+
+        The method is used as a context manager, which returns an instance
+        of :class:`.BatchOperations`; this object is the same as
+        :class:`.Operations` except that table names and schema names
+        are omitted.  E.g.::
+
+            with op.batch_alter_table("some_table") as batch_op:
+                batch_op.add_column(Column("foo", Integer))
+                batch_op.drop_column("bar")
+
+        The operations within the context manager are invoked at once
+        when the context is ended.   When run against SQLite, if the
+        migrations include operations not supported by SQLite's ALTER TABLE,
+        the entire table will be copied to a new one with the new
+        specification, moving all data across as well.
+
+        The copy operation by default uses reflection to retrieve the current
+        structure of the table, and therefore :meth:`.batch_alter_table`
+        in this mode requires that the migration is run in "online" mode.
+        The ``copy_from`` parameter may be passed which refers to an existing
+        :class:`.Table` object, which will bypass this reflection step.
+
+        .. note::  The table copy operation will currently not copy
+           CHECK constraints, and may not copy UNIQUE constraints that are
+           unnamed, as is possible on SQLite.   See the section
+           :ref:`sqlite_batch_constraints` for workarounds.
+
+        :param table_name: name of table
+        :param schema: optional schema name.
+        :param recreate: under what circumstances the table should be
+         recreated. At its default of ``"auto"``, the SQLite dialect will
+         recreate the table if any operations other than ``add_column()``,
+         ``create_index()``, or ``drop_index()`` are
+         present. Other options include ``"always"`` and ``"never"``.
+        :param copy_from: optional :class:`~sqlalchemy.schema.Table` object
+         that will act as the structure of the table being copied.  If omitted,
+         table reflection is used to retrieve the structure of the table.
+
+         .. seealso::
+
+            :ref:`batch_offline_mode`
+
+            :paramref:`~.Operations.batch_alter_table.reflect_args`
+
+            :paramref:`~.Operations.batch_alter_table.reflect_kwargs`
+
+        :param reflect_args: a sequence of additional positional arguments that
+         will be applied to the table structure being reflected / copied;
+         this may be used to pass column and constraint overrides to the
+         table that will be reflected, in lieu of passing the whole
+         :class:`~sqlalchemy.schema.Table` using
+         :paramref:`~.Operations.batch_alter_table.copy_from`.
+        :param reflect_kwargs: a dictionary of additional keyword arguments
+         that will be applied to the table structure being copied; this may be
+         used to pass additional table and reflection options to the table that
+         will be reflected, in lieu of passing the whole
+         :class:`~sqlalchemy.schema.Table` using
+         :paramref:`~.Operations.batch_alter_table.copy_from`.
+        :param table_args: a sequence of additional positional arguments that
+         will be applied to the new :class:`~sqlalchemy.schema.Table` when
+         created, in addition to those copied from the source table.
+         This may be used to provide additional constraints such as CHECK
+         constraints that may not be reflected.
+        :param table_kwargs: a dictionary of additional keyword arguments
+         that will be applied to the new :class:`~sqlalchemy.schema.Table`
+         when created, in addition to those copied from the source table.
+         This may be used to provide for additional table options that may
+         not be reflected.
+        :param naming_convention: a naming convention dictionary of the form
+         described at :ref:`autogen_naming_conventions` which will be applied
+         to the :class:`~sqlalchemy.schema.MetaData` during the reflection
+         process.  This is typically required if one wants to drop SQLite
+         constraints, as these constraints will not have names when
+         reflected on this backend.  Requires SQLAlchemy **0.9.4** or greater.
+
+         .. seealso::
+
+            :ref:`dropping_sqlite_foreign_keys`
+
+        :param partial_reordering: a list of tuples, each suggesting a desired
+         ordering of two or more columns in the newly created table.  Requires
+         that :paramref:`.batch_alter_table.recreate` is set to ``"always"``.
+         Examples, given a table with columns "a", "b", "c", and "d":
+
+         Specify the order of all columns::
+
+            with op.batch_alter_table(
+                "some_table",
+                recreate="always",
+                partial_reordering=[("c", "d", "a", "b")],
+            ) as batch_op:
+                pass
+
+         Ensure "d" appears before "c", and "b", appears before "a"::
+
+            with op.batch_alter_table(
+                "some_table",
+                recreate="always",
+                partial_reordering=[("d", "c"), ("b", "a")],
+            ) as batch_op:
+                pass
+
+         The ordering of columns not included in the partial_reordering
+         set is undefined.   Therefore it is best to specify the complete
+         ordering of all columns for best results.
+
+        .. note:: batch mode requires SQLAlchemy 0.8 or above.
+
+        .. seealso::
+
+            :ref:`batch_migrations`
+
+        """
+        impl = batch.BatchOperationsImpl(
+            self,
+            table_name,
+            schema,
+            recreate,
+            copy_from,
+            table_args,
+            table_kwargs,
+            reflect_args,
+            reflect_kwargs,
+            naming_convention,
+            partial_reordering,
+        )
+        batch_op = BatchOperations(self.migration_context, impl=impl)
+        yield batch_op
+        impl.flush()
+
+    def get_context(self) -> MigrationContext:
+        """Return the :class:`.MigrationContext` object that's
+        currently in use.
+
+        """
+
+        return self.migration_context
+
+    @overload
+    def invoke(self, operation: CreateTableOp) -> Table: ...
+
+    @overload
+    def invoke(
+        self,
+        operation: Union[
+            AddConstraintOp,
+            DropConstraintOp,
+            CreateIndexOp,
+            DropIndexOp,
+            AddColumnOp,
+            AlterColumnOp,
+            AlterTableOp,
+            CreateTableCommentOp,
+            DropTableCommentOp,
+            DropColumnOp,
+            BulkInsertOp,
+            DropTableOp,
+            ExecuteSQLOp,
+        ],
+    ) -> None: ...
+
+    @overload
+    def invoke(self, operation: MigrateOperation) -> Any: ...
+
+    def invoke(self, operation: MigrateOperation) -> Any:
+        """Given a :class:`.MigrateOperation`, invoke it in terms of
+        this :class:`.Operations` instance.
+
+        """
+        fn = self._to_impl.dispatch(
+            operation, self.migration_context.impl.__dialect__
+        )
+        return fn(self, operation)
+
+    def f(self, name: str) -> conv:
+        """Indicate a string name that has already had a naming convention
+        applied to it.
+
+        This feature combines with the SQLAlchemy ``naming_convention`` feature
+        to disambiguate constraint names that have already had naming
+        conventions applied to them, versus those that have not.  This is
+        necessary in the case that the ``"%(constraint_name)s"`` token
+        is used within a naming convention, so that it can be identified
+        that this particular name should remain fixed.
+
+        If the :meth:`.Operations.f` is used on a constraint, the naming
+        convention will not take effect::
+
+            op.add_column("t", "x", Boolean(name=op.f("ck_bool_t_x")))
+
+        Above, the CHECK constraint generated will have the name
+        ``ck_bool_t_x`` regardless of whether or not a naming convention is
+        in use.
+
+        Alternatively, if a naming convention is in use, and 'f' is not used,
+        names will be converted along conventions.  If the ``target_metadata``
+        contains the naming convention
+        ``{"ck": "ck_bool_%(table_name)s_%(constraint_name)s"}``, then the
+        output of the following:
+
+            op.add_column("t", "x", Boolean(name="x"))
+
+        will be::
+
+            CONSTRAINT ck_bool_t_x CHECK (x in (1, 0)))
+
+        The function is rendered in the output of autogenerate when
+        a particular constraint name is already converted.
+
+        """
+        return conv(name)
+
+    def inline_literal(
+        self, value: Union[str, int], type_: Optional[TypeEngine[Any]] = None
+    ) -> _literal_bindparam:
+        r"""Produce an 'inline literal' expression, suitable for
+        using in an INSERT, UPDATE, or DELETE statement.
+
+        When using Alembic in "offline" mode, CRUD operations
+        aren't compatible with SQLAlchemy's default behavior surrounding
+        literal values,
+        which is that they are converted into bound values and passed
+        separately into the ``execute()`` method of the DBAPI cursor.
+        An offline SQL
+        script needs to have these rendered inline.  While it should
+        always be noted that inline literal values are an **enormous**
+        security hole in an application that handles untrusted input,
+        a schema migration is not run in this context, so
+        literals are safe to render inline, with the caveat that
+        advanced types like dates may not be supported directly
+        by SQLAlchemy.
+
+        See :meth:`.Operations.execute` for an example usage of
+        :meth:`.Operations.inline_literal`.
+
+        The environment can also be configured to attempt to render
+        "literal" values inline automatically, for those simple types
+        that are supported by the dialect; see
+        :paramref:`.EnvironmentContext.configure.literal_binds` for this
+        more recently added feature.
+
+        :param value: The value to render.  Strings, integers, and simple
+         numerics should be supported.   Other types like boolean,
+         dates, etc. may or may not be supported yet by various
+         backends.
+        :param type\_: optional - a :class:`sqlalchemy.types.TypeEngine`
+         subclass stating the type of this value.  In SQLAlchemy
+         expressions, this is usually derived automatically
+         from the Python type of the value itself, as well as
+         based on the context in which the value is used.
+
+        .. seealso::
+
+            :paramref:`.EnvironmentContext.configure.literal_binds`
+
+        """
+        return sqla_compat._literal_bindparam(None, value, type_=type_)
+
+    def get_bind(self) -> Connection:
+        """Return the current 'bind'.
+
+        Under normal circumstances, this is the
+        :class:`~sqlalchemy.engine.Connection` currently being used
+        to emit SQL to the database.
+
+        In a SQL script context, this value is ``None``. [TODO: verify this]
+
+        """
+        return self.migration_context.impl.bind  # type: ignore[return-value]
+
+    def run_async(
+        self,
+        async_function: Callable[..., Awaitable[_T]],
+        *args: Any,
+        **kw_args: Any,
+    ) -> _T:
+        """Invoke the given asynchronous callable, passing an asynchronous
+        :class:`~sqlalchemy.ext.asyncio.AsyncConnection` as the first
+        argument.
+
+        This method allows calling async functions from within the
+        synchronous ``upgrade()`` or ``downgrade()`` alembic migration
+        method.
+
+        The async connection passed to the callable shares the same
+        transaction as the connection running in the migration context.
+
+        Any additional arg or kw_arg passed to this function are passed
+        to the provided async function.
+
+        .. versionadded: 1.11
+
+        .. note::
+
+            This method can be called only when alembic is called using
+            an async dialect.
+        """
+        if not sqla_compat.sqla_14_18:
+            raise NotImplementedError("SQLAlchemy 1.4.18+ required")
+        sync_conn = self.get_bind()
+        if sync_conn is None:
+            raise NotImplementedError("Cannot call run_async in SQL mode")
+        if not sync_conn.dialect.is_async:
+            raise ValueError("Cannot call run_async with a sync engine")
+        from sqlalchemy.ext.asyncio import AsyncConnection
+        from sqlalchemy.util import await_only
+
+        async_conn = AsyncConnection._retrieve_proxy_for_target(sync_conn)
+        return await_only(async_function(async_conn, *args, **kw_args))
+
+
+class Operations(AbstractOperations):
+    """Define high level migration operations.
+
+    Each operation corresponds to some schema migration operation,
+    executed against a particular :class:`.MigrationContext`
+    which in turn represents connectivity to a database,
+    or a file output stream.
+
+    While :class:`.Operations` is normally configured as
+    part of the :meth:`.EnvironmentContext.run_migrations`
+    method called from an ``env.py`` script, a standalone
+    :class:`.Operations` instance can be
+    made for use cases external to regular Alembic
+    migrations by passing in a :class:`.MigrationContext`::
+
+        from alembic.migration import MigrationContext
+        from alembic.operations import Operations
+
+        conn = myengine.connect()
+        ctx = MigrationContext.configure(conn)
+        op = Operations(ctx)
+
+        op.alter_column("t", "c", nullable=True)
+
+    Note that as of 0.8, most of the methods on this class are produced
+    dynamically using the :meth:`.Operations.register_operation`
+    method.
+
+    """
+
+    if TYPE_CHECKING:
+        # START STUB FUNCTIONS: op_cls
+        # ### the following stubs are generated by tools/write_pyi.py ###
+        # ### do not edit ###
+
+        def add_column(
+            self,
+            table_name: str,
+            column: Column[Any],
+            *,
+            schema: Optional[str] = None,
+        ) -> None:
+            """Issue an "add column" instruction using the current
+            migration context.
+
+            e.g.::
+
+                from alembic import op
+                from sqlalchemy import Column, String
+
+                op.add_column("organization", Column("name", String()))
+
+            The :meth:`.Operations.add_column` method typically corresponds
+            to the SQL command "ALTER TABLE... ADD COLUMN".    Within the scope
+            of this command, the column's name, datatype, nullability,
+            and optional server-generated defaults may be indicated.
+
+            .. note::
+
+                With the exception of NOT NULL constraints or single-column FOREIGN
+                KEY constraints, other kinds of constraints such as PRIMARY KEY,
+                UNIQUE or CHECK constraints **cannot** be generated using this
+                method; for these constraints, refer to operations such as
+                :meth:`.Operations.create_primary_key` and
+                :meth:`.Operations.create_check_constraint`. In particular, the
+                following :class:`~sqlalchemy.schema.Column` parameters are
+                **ignored**:
+
+                * :paramref:`~sqlalchemy.schema.Column.primary_key` - SQL databases
+                  typically do not support an ALTER operation that can add
+                  individual columns one at a time to an existing primary key
+                  constraint, therefore it's less ambiguous to use the
+                  :meth:`.Operations.create_primary_key` method, which assumes no
+                  existing primary key constraint is present.
+                * :paramref:`~sqlalchemy.schema.Column.unique` - use the
+                  :meth:`.Operations.create_unique_constraint` method
+                * :paramref:`~sqlalchemy.schema.Column.index` - use the
+                  :meth:`.Operations.create_index` method
+
+
+            The provided :class:`~sqlalchemy.schema.Column` object may include a
+            :class:`~sqlalchemy.schema.ForeignKey` constraint directive,
+            referencing a remote table name. For this specific type of constraint,
+            Alembic will automatically emit a second ALTER statement in order to
+            add the single-column FOREIGN KEY constraint separately::
+
+                from alembic import op
+                from sqlalchemy import Column, INTEGER, ForeignKey
+
+                op.add_column(
+                    "organization",
+                    Column("account_id", INTEGER, ForeignKey("accounts.id")),
+                )
+
+            The column argument passed to :meth:`.Operations.add_column` is a
+            :class:`~sqlalchemy.schema.Column` construct, used in the same way it's
+            used in SQLAlchemy. In particular, values or functions to be indicated
+            as producing the column's default value on the database side are
+            specified using the ``server_default`` parameter, and not ``default``
+            which only specifies Python-side defaults::
+
+                from alembic import op
+                from sqlalchemy import Column, TIMESTAMP, func
+
+                # specify "DEFAULT NOW" along with the column add
+                op.add_column(
+                    "account",
+                    Column("timestamp", TIMESTAMP, server_default=func.now()),
+                )
+
+            :param table_name: String name of the parent table.
+            :param column: a :class:`sqlalchemy.schema.Column` object
+             representing the new column.
+            :param schema: Optional schema name to operate within.  To control
+             quoting of the schema outside of the default behavior, use
+             the SQLAlchemy construct
+             :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+            """  # noqa: E501
+            ...
+
+        def alter_column(
+            self,
+            table_name: str,
+            column_name: str,
+            *,
+            nullable: Optional[bool] = None,
+            comment: Union[str, Literal[False], None] = False,
+            server_default: Any = False,
+            new_column_name: Optional[str] = None,
+            type_: Union[TypeEngine[Any], Type[TypeEngine[Any]], None] = None,
+            existing_type: Union[
+                TypeEngine[Any], Type[TypeEngine[Any]], None
+            ] = None,
+            existing_server_default: Union[
+                str, bool, Identity, Computed, None
+            ] = False,
+            existing_nullable: Optional[bool] = None,
+            existing_comment: Optional[str] = None,
+            schema: Optional[str] = None,
+            **kw: Any,
+        ) -> None:
+            r"""Issue an "alter column" instruction using the
+            current migration context.
+
+            Generally, only that aspect of the column which
+            is being changed, i.e. name, type, nullability,
+            default, needs to be specified.  Multiple changes
+            can also be specified at once and the backend should
+            "do the right thing", emitting each change either
+            separately or together as the backend allows.
+
+            MySQL has special requirements here, since MySQL
+            cannot ALTER a column without a full specification.
+            When producing MySQL-compatible migration files,
+            it is recommended that the ``existing_type``,
+            ``existing_server_default``, and ``existing_nullable``
+            parameters be present, if not being altered.
+
+            Type changes which are against the SQLAlchemy
+            "schema" types :class:`~sqlalchemy.types.Boolean`
+            and  :class:`~sqlalchemy.types.Enum` may also
+            add or drop constraints which accompany those
+            types on backends that don't support them natively.
+            The ``existing_type`` argument is
+            used in this case to identify and remove a previous
+            constraint that was bound to the type object.
+
+            :param table_name: string name of the target table.
+            :param column_name: string name of the target column,
+             as it exists before the operation begins.
+            :param nullable: Optional; specify ``True`` or ``False``
+             to alter the column's nullability.
+            :param server_default: Optional; specify a string
+             SQL expression, :func:`~sqlalchemy.sql.expression.text`,
+             or :class:`~sqlalchemy.schema.DefaultClause` to indicate
+             an alteration to the column's default value.
+             Set to ``None`` to have the default removed.
+            :param comment: optional string text of a new comment to add to the
+             column.
+            :param new_column_name: Optional; specify a string name here to
+             indicate the new name within a column rename operation.
+            :param type\_: Optional; a :class:`~sqlalchemy.types.TypeEngine`
+             type object to specify a change to the column's type.
+             For SQLAlchemy types that also indicate a constraint (i.e.
+             :class:`~sqlalchemy.types.Boolean`, :class:`~sqlalchemy.types.Enum`),
+             the constraint is also generated.
+            :param autoincrement: set the ``AUTO_INCREMENT`` flag of the column;
+             currently understood by the MySQL dialect.
+            :param existing_type: Optional; a
+             :class:`~sqlalchemy.types.TypeEngine`
+             type object to specify the previous type.   This
+             is required for all MySQL column alter operations that
+             don't otherwise specify a new type, as well as for
+             when nullability is being changed on a SQL Server
+             column.  It is also used if the type is a so-called
+             SQLAlchemy "schema" type which may define a constraint (i.e.
+             :class:`~sqlalchemy.types.Boolean`,
+             :class:`~sqlalchemy.types.Enum`),
+             so that the constraint can be dropped.
+            :param existing_server_default: Optional; The existing
+             default value of the column.   Required on MySQL if
+             an existing default is not being changed; else MySQL
+             removes the default.
+            :param existing_nullable: Optional; the existing nullability
+             of the column.  Required on MySQL if the existing nullability
+             is not being changed; else MySQL sets this to NULL.
+            :param existing_autoincrement: Optional; the existing autoincrement
+             of the column.  Used for MySQL's system of altering a column
+             that specifies ``AUTO_INCREMENT``.
+            :param existing_comment: string text of the existing comment on the
+             column to be maintained.  Required on MySQL if the existing comment
+             on the column is not being changed.
+            :param schema: Optional schema name to operate within.  To control
+             quoting of the schema outside of the default behavior, use
+             the SQLAlchemy construct
+             :class:`~sqlalchemy.sql.elements.quoted_name`.
+            :param postgresql_using: String argument which will indicate a
+             SQL expression to render within the Postgresql-specific USING clause
+             within ALTER COLUMN.    This string is taken directly as raw SQL which
+             must explicitly include any necessary quoting or escaping of tokens
+             within the expression.
+
+            """  # noqa: E501
+            ...
+
+        def bulk_insert(
+            self,
+            table: Union[Table, TableClause],
+            rows: List[Dict[str, Any]],
+            *,
+            multiinsert: bool = True,
+        ) -> None:
+            """Issue a "bulk insert" operation using the current
+            migration context.
+
+            This provides a means of representing an INSERT of multiple rows
+            which works equally well in the context of executing on a live
+            connection as well as that of generating a SQL script.   In the
+            case of a SQL script, the values are rendered inline into the
+            statement.
+
+            e.g.::
+
+                from alembic import op
+                from datetime import date
+                from sqlalchemy.sql import table, column
+                from sqlalchemy import String, Integer, Date
+
+                # Create an ad-hoc table to use for the insert statement.
+                accounts_table = table(
+                    "account",
+                    column("id", Integer),
+                    column("name", String),
+                    column("create_date", Date),
+                )
+
+                op.bulk_insert(
+                    accounts_table,
+                    [
+                        {
+                            "id": 1,
+                            "name": "John Smith",
+                            "create_date": date(2010, 10, 5),
+                        },
+                        {
+                            "id": 2,
+                            "name": "Ed Williams",
+                            "create_date": date(2007, 5, 27),
+                        },
+                        {
+                            "id": 3,
+                            "name": "Wendy Jones",
+                            "create_date": date(2008, 8, 15),
+                        },
+                    ],
+                )
+
+            When using --sql mode, some datatypes may not render inline
+            automatically, such as dates and other special types.   When this
+            issue is present, :meth:`.Operations.inline_literal` may be used::
+
+                op.bulk_insert(
+                    accounts_table,
+                    [
+                        {
+                            "id": 1,
+                            "name": "John Smith",
+                            "create_date": op.inline_literal("2010-10-05"),
+                        },
+                        {
+                            "id": 2,
+                            "name": "Ed Williams",
+                            "create_date": op.inline_literal("2007-05-27"),
+                        },
+                        {
+                            "id": 3,
+                            "name": "Wendy Jones",
+                            "create_date": op.inline_literal("2008-08-15"),
+                        },
+                    ],
+                    multiinsert=False,
+                )
+
+            When using :meth:`.Operations.inline_literal` in conjunction with
+            :meth:`.Operations.bulk_insert`, in order for the statement to work
+            in "online" (e.g. non --sql) mode, the
+            :paramref:`~.Operations.bulk_insert.multiinsert`
+            flag should be set to ``False``, which will have the effect of
+            individual INSERT statements being emitted to the database, each
+            with a distinct VALUES clause, so that the "inline" values can
+            still be rendered, rather than attempting to pass the values
+            as bound parameters.
+
+            :param table: a table object which represents the target of the INSERT.
+
+            :param rows: a list of dictionaries indicating rows.
+
+            :param multiinsert: when at its default of True and --sql mode is not
+               enabled, the INSERT statement will be executed using
+               "executemany()" style, where all elements in the list of
+               dictionaries are passed as bound parameters in a single
+               list.   Setting this to False results in individual INSERT
+               statements being emitted per parameter set, and is needed
+               in those cases where non-literal values are present in the
+               parameter sets.
+
+            """  # noqa: E501
+            ...
+
+        def create_check_constraint(
+            self,
+            constraint_name: Optional[str],
+            table_name: str,
+            condition: Union[str, ColumnElement[bool], TextClause],
+            *,
+            schema: Optional[str] = None,
+            **kw: Any,
+        ) -> None:
+            """Issue a "create check constraint" instruction using the
+            current migration context.
+
+            e.g.::
+
+                from alembic import op
+                from sqlalchemy.sql import column, func
+
+                op.create_check_constraint(
+                    "ck_user_name_len",
+                    "user",
+                    func.len(column("name")) > 5,
+                )
+
+            CHECK constraints are usually against a SQL expression, so ad-hoc
+            table metadata is usually needed.   The function will convert the given
+            arguments into a :class:`sqlalchemy.schema.CheckConstraint` bound
+            to an anonymous table in order to emit the CREATE statement.
+
+            :param name: Name of the check constraint.  The name is necessary
+             so that an ALTER statement can be emitted.  For setups that
+             use an automated naming scheme such as that described at
+             :ref:`sqla:constraint_naming_conventions`,
+             ``name`` here can be ``None``, as the event listener will
+             apply the name to the constraint object when it is associated
+             with the table.
+            :param table_name: String name of the source table.
+            :param condition: SQL expression that's the condition of the
+             constraint. Can be a string or SQLAlchemy expression language
+             structure.
+            :param deferrable: optional bool. If set, emit DEFERRABLE or
+             NOT DEFERRABLE when issuing DDL for this constraint.
+            :param initially: optional string. If set, emit INITIALLY <value>
+             when issuing DDL for this constraint.
+            :param schema: Optional schema name to operate within.  To control
+             quoting of the schema outside of the default behavior, use
+             the SQLAlchemy construct
+             :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+            """  # noqa: E501
+            ...
+
+        def create_exclude_constraint(
+            self,
+            constraint_name: str,
+            table_name: str,
+            *elements: Any,
+            **kw: Any,
+        ) -> Optional[Table]:
+            """Issue an alter to create an EXCLUDE constraint using the
+            current migration context.
+
+            .. note::  This method is Postgresql specific, and additionally
+               requires at least SQLAlchemy 1.0.
+
+            e.g.::
+
+                from alembic import op
+
+                op.create_exclude_constraint(
+                    "user_excl",
+                    "user",
+                    ("period", "&&"),
+                    ("group", "="),
+                    where=("group != 'some group'"),
+                )
+
+            Note that the expressions work the same way as that of
+            the ``ExcludeConstraint`` object itself; if plain strings are
+            passed, quoting rules must be applied manually.
+
+            :param name: Name of the constraint.
+            :param table_name: String name of the source table.
+            :param elements: exclude conditions.
+            :param where: SQL expression or SQL string with optional WHERE
+             clause.
+            :param deferrable: optional bool. If set, emit DEFERRABLE or
+             NOT DEFERRABLE when issuing DDL for this constraint.
+            :param initially: optional string. If set, emit INITIALLY <value>
+             when issuing DDL for this constraint.
+            :param schema: Optional schema name to operate within.
+
+            """  # noqa: E501
+            ...
+
+        def create_foreign_key(
+            self,
+            constraint_name: Optional[str],
+            source_table: str,
+            referent_table: str,
+            local_cols: List[str],
+            remote_cols: List[str],
+            *,
+            onupdate: Optional[str] = None,
+            ondelete: Optional[str] = None,
+            deferrable: Optional[bool] = None,
+            initially: Optional[str] = None,
+            match: Optional[str] = None,
+            source_schema: Optional[str] = None,
+            referent_schema: Optional[str] = None,
+            **dialect_kw: Any,
+        ) -> None:
+            """Issue a "create foreign key" instruction using the
+            current migration context.
+
+            e.g.::
+
+                from alembic import op
+
+                op.create_foreign_key(
+                    "fk_user_address",
+                    "address",
+                    "user",
+                    ["user_id"],
+                    ["id"],
+                )
+
+            This internally generates a :class:`~sqlalchemy.schema.Table` object
+            containing the necessary columns, then generates a new
+            :class:`~sqlalchemy.schema.ForeignKeyConstraint`
+            object which it then associates with the
+            :class:`~sqlalchemy.schema.Table`.
+            Any event listeners associated with this action will be fired
+            off normally.   The :class:`~sqlalchemy.schema.AddConstraint`
+            construct is ultimately used to generate the ALTER statement.
+
+            :param constraint_name: Name of the foreign key constraint.  The name
+             is necessary so that an ALTER statement can be emitted.  For setups
+             that use an automated naming scheme such as that described at
+             :ref:`sqla:constraint_naming_conventions`,
+             ``name`` here can be ``None``, as the event listener will
+             apply the name to the constraint object when it is associated
+             with the table.
+            :param source_table: String name of the source table.
+            :param referent_table: String name of the destination table.
+            :param local_cols: a list of string column names in the
+             source table.
+            :param remote_cols: a list of string column names in the
+             remote table.
+            :param onupdate: Optional string. If set, emit ON UPDATE <value> when
+             issuing DDL for this constraint. Typical values include CASCADE,
+             DELETE and RESTRICT.
+            :param ondelete: Optional string. If set, emit ON DELETE <value> when
+             issuing DDL for this constraint. Typical values include CASCADE,
+             DELETE and RESTRICT.
+            :param deferrable: optional bool. If set, emit DEFERRABLE or NOT
+             DEFERRABLE when issuing DDL for this constraint.
+            :param source_schema: Optional schema name of the source table.
+            :param referent_schema: Optional schema name of the destination table.
+
+            """  # noqa: E501
+            ...
+
+        def create_index(
+            self,
+            index_name: Optional[str],
+            table_name: str,
+            columns: Sequence[Union[str, TextClause, ColumnElement[Any]]],
+            *,
+            schema: Optional[str] = None,
+            unique: bool = False,
+            if_not_exists: Optional[bool] = None,
+            **kw: Any,
+        ) -> None:
+            r"""Issue a "create index" instruction using the current
+            migration context.
+
+            e.g.::
+
+                from alembic import op
+
+                op.create_index("ik_test", "t1", ["foo", "bar"])
+
+            Functional indexes can be produced by using the
+            :func:`sqlalchemy.sql.expression.text` construct::
+
+                from alembic import op
+                from sqlalchemy import text
+
+                op.create_index("ik_test", "t1", [text("lower(foo)")])
+
+            :param index_name: name of the index.
+            :param table_name: name of the owning table.
+            :param columns: a list consisting of string column names and/or
+             :func:`~sqlalchemy.sql.expression.text` constructs.
+            :param schema: Optional schema name to operate within.  To control
+             quoting of the schema outside of the default behavior, use
+             the SQLAlchemy construct
+             :class:`~sqlalchemy.sql.elements.quoted_name`.
+            :param unique: If True, create a unique index.
+
+            :param quote: Force quoting of this column's name on or off,
+             corresponding to ``True`` or ``False``. When left at its default
+             of ``None``, the column identifier will be quoted according to
+             whether the name is case sensitive (identifiers with at least one
+             upper case character are treated as case sensitive), or if it's a
+             reserved word. This flag is only needed to force quoting of a
+             reserved word which is not known by the SQLAlchemy dialect.
+
+            :param if_not_exists: If True, adds IF NOT EXISTS operator when
+             creating the new index.
+
+             .. versionadded:: 1.12.0
+
+            :param \**kw: Additional keyword arguments not mentioned above are
+             dialect specific, and passed in the form
+             ``<dialectname>_<argname>``.
+             See the documentation regarding an individual dialect at
+             :ref:`dialect_toplevel` for detail on documented arguments.
+
+            """  # noqa: E501
+            ...
+
+        def create_primary_key(
+            self,
+            constraint_name: Optional[str],
+            table_name: str,
+            columns: List[str],
+            *,
+            schema: Optional[str] = None,
+        ) -> None:
+            """Issue a "create primary key" instruction using the current
+            migration context.
+
+            e.g.::
+
+                from alembic import op
+
+                op.create_primary_key("pk_my_table", "my_table", ["id", "version"])
+
+            This internally generates a :class:`~sqlalchemy.schema.Table` object
+            containing the necessary columns, then generates a new
+            :class:`~sqlalchemy.schema.PrimaryKeyConstraint`
+            object which it then associates with the
+            :class:`~sqlalchemy.schema.Table`.
+            Any event listeners associated with this action will be fired
+            off normally.   The :class:`~sqlalchemy.schema.AddConstraint`
+            construct is ultimately used to generate the ALTER statement.
+
+            :param constraint_name: Name of the primary key constraint.  The name
+             is necessary so that an ALTER statement can be emitted.  For setups
+             that use an automated naming scheme such as that described at
+             :ref:`sqla:constraint_naming_conventions`
+             ``name`` here can be ``None``, as the event listener will
+             apply the name to the constraint object when it is associated
+             with the table.
+            :param table_name: String name of the target table.
+            :param columns: a list of string column names to be applied to the
+             primary key constraint.
+            :param schema: Optional schema name to operate within.  To control
+             quoting of the schema outside of the default behavior, use
+             the SQLAlchemy construct
+             :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+            """  # noqa: E501
+            ...
+
+        def create_table(
+            self,
+            table_name: str,
+            *columns: SchemaItem,
+            if_not_exists: Optional[bool] = None,
+            **kw: Any,
+        ) -> Table:
+            r"""Issue a "create table" instruction using the current migration
+            context.
+
+            This directive receives an argument list similar to that of the
+            traditional :class:`sqlalchemy.schema.Table` construct, but without the
+            metadata::
+
+                from sqlalchemy import INTEGER, VARCHAR, NVARCHAR, Column
+                from alembic import op
+
+                op.create_table(
+                    "account",
+                    Column("id", INTEGER, primary_key=True),
+                    Column("name", VARCHAR(50), nullable=False),
+                    Column("description", NVARCHAR(200)),
+                    Column("timestamp", TIMESTAMP, server_default=func.now()),
+                )
+
+            Note that :meth:`.create_table` accepts
+            :class:`~sqlalchemy.schema.Column`
+            constructs directly from the SQLAlchemy library.  In particular,
+            default values to be created on the database side are
+            specified using the ``server_default`` parameter, and not
+            ``default`` which only specifies Python-side defaults::
+
+                from alembic import op
+                from sqlalchemy import Column, TIMESTAMP, func
+
+                # specify "DEFAULT NOW" along with the "timestamp" column
+                op.create_table(
+                    "account",
+                    Column("id", INTEGER, primary_key=True),
+                    Column("timestamp", TIMESTAMP, server_default=func.now()),
+                )
+
+            The function also returns a newly created
+            :class:`~sqlalchemy.schema.Table` object, corresponding to the table
+            specification given, which is suitable for
+            immediate SQL operations, in particular
+            :meth:`.Operations.bulk_insert`::
+
+                from sqlalchemy import INTEGER, VARCHAR, NVARCHAR, Column
+                from alembic import op
+
+                account_table = op.create_table(
+                    "account",
+                    Column("id", INTEGER, primary_key=True),
+                    Column("name", VARCHAR(50), nullable=False),
+                    Column("description", NVARCHAR(200)),
+                    Column("timestamp", TIMESTAMP, server_default=func.now()),
+                )
+
+                op.bulk_insert(
+                    account_table,
+                    [
+                        {"name": "A1", "description": "account 1"},
+                        {"name": "A2", "description": "account 2"},
+                    ],
+                )
+
+            :param table_name: Name of the table
+            :param \*columns: collection of :class:`~sqlalchemy.schema.Column`
+             objects within
+             the table, as well as optional :class:`~sqlalchemy.schema.Constraint`
+             objects
+             and :class:`~.sqlalchemy.schema.Index` objects.
+            :param schema: Optional schema name to operate within.  To control
+             quoting of the schema outside of the default behavior, use
+             the SQLAlchemy construct
+             :class:`~sqlalchemy.sql.elements.quoted_name`.
+            :param if_not_exists: If True, adds IF NOT EXISTS operator when
+             creating the new table.
+
+             .. versionadded:: 1.13.3
+            :param \**kw: Other keyword arguments are passed to the underlying
+             :class:`sqlalchemy.schema.Table` object created for the command.
+
+            :return: the :class:`~sqlalchemy.schema.Table` object corresponding
+             to the parameters given.
+
+            """  # noqa: E501
+            ...
+
+        def create_table_comment(
+            self,
+            table_name: str,
+            comment: Optional[str],
+            *,
+            existing_comment: Optional[str] = None,
+            schema: Optional[str] = None,
+        ) -> None:
+            """Emit a COMMENT ON operation to set the comment for a table.
+
+            :param table_name: string name of the target table.
+            :param comment: string value of the comment being registered against
+             the specified table.
+            :param existing_comment: String value of a comment
+             already registered on the specified table, used within autogenerate
+             so that the operation is reversible, but not required for direct
+             use.
+
+            .. seealso::
+
+                :meth:`.Operations.drop_table_comment`
+
+                :paramref:`.Operations.alter_column.comment`
+
+            """  # noqa: E501
+            ...
+
+        def create_unique_constraint(
+            self,
+            constraint_name: Optional[str],
+            table_name: str,
+            columns: Sequence[str],
+            *,
+            schema: Optional[str] = None,
+            **kw: Any,
+        ) -> Any:
+            """Issue a "create unique constraint" instruction using the
+            current migration context.
+
+            e.g.::
+
+                from alembic import op
+                op.create_unique_constraint("uq_user_name", "user", ["name"])
+
+            This internally generates a :class:`~sqlalchemy.schema.Table` object
+            containing the necessary columns, then generates a new
+            :class:`~sqlalchemy.schema.UniqueConstraint`
+            object which it then associates with the
+            :class:`~sqlalchemy.schema.Table`.
+            Any event listeners associated with this action will be fired
+            off normally.   The :class:`~sqlalchemy.schema.AddConstraint`
+            construct is ultimately used to generate the ALTER statement.
+
+            :param name: Name of the unique constraint.  The name is necessary
+             so that an ALTER statement can be emitted.  For setups that
+             use an automated naming scheme such as that described at
+             :ref:`sqla:constraint_naming_conventions`,
+             ``name`` here can be ``None``, as the event listener will
+             apply the name to the constraint object when it is associated
+             with the table.
+            :param table_name: String name of the source table.
+            :param columns: a list of string column names in the
+             source table.
+            :param deferrable: optional bool. If set, emit DEFERRABLE or
+             NOT DEFERRABLE when issuing DDL for this constraint.
+            :param initially: optional string. If set, emit INITIALLY <value>
+             when issuing DDL for this constraint.
+            :param schema: Optional schema name to operate within.  To control
+             quoting of the schema outside of the default behavior, use
+             the SQLAlchemy construct
+             :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+            """  # noqa: E501
+            ...
+
+        def drop_column(
+            self,
+            table_name: str,
+            column_name: str,
+            *,
+            schema: Optional[str] = None,
+            **kw: Any,
+        ) -> None:
+            """Issue a "drop column" instruction using the current
+            migration context.
+
+            e.g.::
+
+                drop_column("organization", "account_id")
+
+            :param table_name: name of table
+            :param column_name: name of column
+            :param schema: Optional schema name to operate within.  To control
+             quoting of the schema outside of the default behavior, use
+             the SQLAlchemy construct
+             :class:`~sqlalchemy.sql.elements.quoted_name`.
+            :param mssql_drop_check: Optional boolean.  When ``True``, on
+             Microsoft SQL Server only, first
+             drop the CHECK constraint on the column using a
+             SQL-script-compatible
+             block that selects into a @variable from sys.check_constraints,
+             then exec's a separate DROP CONSTRAINT for that constraint.
+            :param mssql_drop_default: Optional boolean.  When ``True``, on
+             Microsoft SQL Server only, first
+             drop the DEFAULT constraint on the column using a
+             SQL-script-compatible
+             block that selects into a @variable from sys.default_constraints,
+             then exec's a separate DROP CONSTRAINT for that default.
+            :param mssql_drop_foreign_key: Optional boolean.  When ``True``, on
+             Microsoft SQL Server only, first
+             drop a single FOREIGN KEY constraint on the column using a
+             SQL-script-compatible
+             block that selects into a @variable from
+             sys.foreign_keys/sys.foreign_key_columns,
+             then exec's a separate DROP CONSTRAINT for that default.  Only
+             works if the column has exactly one FK constraint which refers to
+             it, at the moment.
+
+            """  # noqa: E501
+            ...
+
+        def drop_constraint(
+            self,
+            constraint_name: str,
+            table_name: str,
+            type_: Optional[str] = None,
+            *,
+            schema: Optional[str] = None,
+        ) -> None:
+            r"""Drop a constraint of the given name, typically via DROP CONSTRAINT.
+
+            :param constraint_name: name of the constraint.
+            :param table_name: table name.
+            :param type\_: optional, required on MySQL.  can be
+             'foreignkey', 'primary', 'unique', or 'check'.
+            :param schema: Optional schema name to operate within.  To control
+             quoting of the schema outside of the default behavior, use
+             the SQLAlchemy construct
+             :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+            """  # noqa: E501
+            ...
+
+        def drop_index(
+            self,
+            index_name: str,
+            table_name: Optional[str] = None,
+            *,
+            schema: Optional[str] = None,
+            if_exists: Optional[bool] = None,
+            **kw: Any,
+        ) -> None:
+            r"""Issue a "drop index" instruction using the current
+            migration context.
+
+            e.g.::
+
+                drop_index("accounts")
+
+            :param index_name: name of the index.
+            :param table_name: name of the owning table.  Some
+             backends such as Microsoft SQL Server require this.
+            :param schema: Optional schema name to operate within.  To control
+             quoting of the schema outside of the default behavior, use
+             the SQLAlchemy construct
+             :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+            :param if_exists: If True, adds IF EXISTS operator when
+             dropping the index.
+
+             .. versionadded:: 1.12.0
+
+            :param \**kw: Additional keyword arguments not mentioned above are
+             dialect specific, and passed in the form
+             ``<dialectname>_<argname>``.
+             See the documentation regarding an individual dialect at
+             :ref:`dialect_toplevel` for detail on documented arguments.
+
+            """  # noqa: E501
+            ...
+
+        def drop_table(
+            self,
+            table_name: str,
+            *,
+            schema: Optional[str] = None,
+            if_exists: Optional[bool] = None,
+            **kw: Any,
+        ) -> None:
+            r"""Issue a "drop table" instruction using the current
+            migration context.
+
+
+            e.g.::
+
+                drop_table("accounts")
+
+            :param table_name: Name of the table
+            :param schema: Optional schema name to operate within.  To control
+             quoting of the schema outside of the default behavior, use
+             the SQLAlchemy construct
+             :class:`~sqlalchemy.sql.elements.quoted_name`.
+            :param if_exists: If True, adds IF EXISTS operator when
+             dropping the table.
+
+             .. versionadded:: 1.13.3
+            :param \**kw: Other keyword arguments are passed to the underlying
+             :class:`sqlalchemy.schema.Table` object created for the command.
+
+            """  # noqa: E501
+            ...
+
+        def drop_table_comment(
+            self,
+            table_name: str,
+            *,
+            existing_comment: Optional[str] = None,
+            schema: Optional[str] = None,
+        ) -> None:
+            """Issue a "drop table comment" operation to
+            remove an existing comment set on a table.
+
+            :param table_name: string name of the target table.
+            :param existing_comment: An optional string value of a comment already
+             registered on the specified table.
+
+            .. seealso::
+
+                :meth:`.Operations.create_table_comment`
+
+                :paramref:`.Operations.alter_column.comment`
+
+            """  # noqa: E501
+            ...
+
+        def execute(
+            self,
+            sqltext: Union[Executable, str],
+            *,
+            execution_options: Optional[dict[str, Any]] = None,
+        ) -> None:
+            r"""Execute the given SQL using the current migration context.
+
+            The given SQL can be a plain string, e.g.::
+
+                op.execute("INSERT INTO table (foo) VALUES ('some value')")
+
+            Or it can be any kind of Core SQL Expression construct, such as
+            below where we use an update construct::
+
+                from sqlalchemy.sql import table, column
+                from sqlalchemy import String
+                from alembic import op
+
+                account = table("account", column("name", String))
+                op.execute(
+                    account.update()
+                    .where(account.c.name == op.inline_literal("account 1"))
+                    .values({"name": op.inline_literal("account 2")})
+                )
+
+            Above, we made use of the SQLAlchemy
+            :func:`sqlalchemy.sql.expression.table` and
+            :func:`sqlalchemy.sql.expression.column` constructs to make a brief,
+            ad-hoc table construct just for our UPDATE statement.  A full
+            :class:`~sqlalchemy.schema.Table` construct of course works perfectly
+            fine as well, though note it's a recommended practice to at least
+            ensure the definition of a table is self-contained within the migration
+            script, rather than imported from a module that may break compatibility
+            with older migrations.
+
+            In a SQL script context, the statement is emitted directly to the
+            output stream.   There is *no* return result, however, as this
+            function is oriented towards generating a change script
+            that can run in "offline" mode.     Additionally, parameterized
+            statements are discouraged here, as they *will not work* in offline
+            mode.  Above, we use :meth:`.inline_literal` where parameters are
+            to be used.
+
+            For full interaction with a connected database where parameters can
+            also be used normally, use the "bind" available from the context::
+
+                from alembic import op
+
+                connection = op.get_bind()
+
+                connection.execute(
+                    account.update()
+                    .where(account.c.name == "account 1")
+                    .values({"name": "account 2"})
+                )
+
+            Additionally, when passing the statement as a plain string, it is first
+            coerced into a :func:`sqlalchemy.sql.expression.text` construct
+            before being passed along.  In the less likely case that the
+            literal SQL string contains a colon, it must be escaped with a
+            backslash, as::
+
+               op.execute(r"INSERT INTO table (foo) VALUES ('\:colon_value')")
+
+
+            :param sqltext: Any legal SQLAlchemy expression, including:
+
+            * a string
+            * a :func:`sqlalchemy.sql.expression.text` construct.
+            * a :func:`sqlalchemy.sql.expression.insert` construct.
+            * a :func:`sqlalchemy.sql.expression.update` construct.
+            * a :func:`sqlalchemy.sql.expression.delete` construct.
+            * Any "executable" described in SQLAlchemy Core documentation,
+              noting that no result set is returned.
+
+            .. note::  when passing a plain string, the statement is coerced into
+               a :func:`sqlalchemy.sql.expression.text` construct. This construct
+               considers symbols with colons, e.g. ``:foo`` to be bound parameters.
+               To avoid this, ensure that colon symbols are escaped, e.g.
+               ``\:foo``.
+
+            :param execution_options: Optional dictionary of
+             execution options, will be passed to
+             :meth:`sqlalchemy.engine.Connection.execution_options`.
+            """  # noqa: E501
+            ...
+
+        def rename_table(
+            self,
+            old_table_name: str,
+            new_table_name: str,
+            *,
+            schema: Optional[str] = None,
+        ) -> None:
+            """Emit an ALTER TABLE to rename a table.
+
+            :param old_table_name: old name.
+            :param new_table_name: new name.
+            :param schema: Optional schema name to operate within.  To control
+             quoting of the schema outside of the default behavior, use
+             the SQLAlchemy construct
+             :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+            """  # noqa: E501
+            ...
+
+        # END STUB FUNCTIONS: op_cls
+
+
+class BatchOperations(AbstractOperations):
+    """Modifies the interface :class:`.Operations` for batch mode.
+
+    This basically omits the ``table_name`` and ``schema`` parameters
+    from associated methods, as these are a given when running under batch
+    mode.
+
+    .. seealso::
+
+        :meth:`.Operations.batch_alter_table`
+
+    Note that as of 0.8, most of the methods on this class are produced
+    dynamically using the :meth:`.Operations.register_operation`
+    method.
+
+    """
+
+    impl: BatchOperationsImpl
+
+    def _noop(self, operation: Any) -> NoReturn:
+        raise NotImplementedError(
+            "The %s method does not apply to a batch table alter operation."
+            % operation
+        )
+
+    if TYPE_CHECKING:
+        # START STUB FUNCTIONS: batch_op
+        # ### the following stubs are generated by tools/write_pyi.py ###
+        # ### do not edit ###
+
+        def add_column(
+            self,
+            column: Column[Any],
+            *,
+            insert_before: Optional[str] = None,
+            insert_after: Optional[str] = None,
+        ) -> None:
+            """Issue an "add column" instruction using the current
+            batch migration context.
+
+            .. seealso::
+
+                :meth:`.Operations.add_column`
+
+            """  # noqa: E501
+            ...
+
+        def alter_column(
+            self,
+            column_name: str,
+            *,
+            nullable: Optional[bool] = None,
+            comment: Union[str, Literal[False], None] = False,
+            server_default: Any = False,
+            new_column_name: Optional[str] = None,
+            type_: Union[TypeEngine[Any], Type[TypeEngine[Any]], None] = None,
+            existing_type: Union[
+                TypeEngine[Any], Type[TypeEngine[Any]], None
+            ] = None,
+            existing_server_default: Union[
+                str, bool, Identity, Computed, None
+            ] = False,
+            existing_nullable: Optional[bool] = None,
+            existing_comment: Optional[str] = None,
+            insert_before: Optional[str] = None,
+            insert_after: Optional[str] = None,
+            **kw: Any,
+        ) -> None:
+            """Issue an "alter column" instruction using the current
+            batch migration context.
+
+            Parameters are the same as that of :meth:`.Operations.alter_column`,
+            as well as the following option(s):
+
+            :param insert_before: String name of an existing column which this
+             column should be placed before, when creating the new table.
+
+            :param insert_after: String name of an existing column which this
+             column should be placed after, when creating the new table.  If
+             both :paramref:`.BatchOperations.alter_column.insert_before`
+             and :paramref:`.BatchOperations.alter_column.insert_after` are
+             omitted, the column is inserted after the last existing column
+             in the table.
+
+            .. seealso::
+
+                :meth:`.Operations.alter_column`
+
+
+            """  # noqa: E501
+            ...
+
+        def create_check_constraint(
+            self,
+            constraint_name: str,
+            condition: Union[str, ColumnElement[bool], TextClause],
+            **kw: Any,
+        ) -> None:
+            """Issue a "create check constraint" instruction using the
+            current batch migration context.
+
+            The batch form of this call omits the ``source`` and ``schema``
+            arguments from the call.
+
+            .. seealso::
+
+                :meth:`.Operations.create_check_constraint`
+
+            """  # noqa: E501
+            ...
+
+        def create_exclude_constraint(
+            self, constraint_name: str, *elements: Any, **kw: Any
+        ) -> Optional[Table]:
+            """Issue a "create exclude constraint" instruction using the
+            current batch migration context.
+
+            .. note::  This method is Postgresql specific, and additionally
+               requires at least SQLAlchemy 1.0.
+
+            .. seealso::
+
+                :meth:`.Operations.create_exclude_constraint`
+
+            """  # noqa: E501
+            ...
+
+        def create_foreign_key(
+            self,
+            constraint_name: Optional[str],
+            referent_table: str,
+            local_cols: List[str],
+            remote_cols: List[str],
+            *,
+            referent_schema: Optional[str] = None,
+            onupdate: Optional[str] = None,
+            ondelete: Optional[str] = None,
+            deferrable: Optional[bool] = None,
+            initially: Optional[str] = None,
+            match: Optional[str] = None,
+            **dialect_kw: Any,
+        ) -> None:
+            """Issue a "create foreign key" instruction using the
+            current batch migration context.
+
+            The batch form of this call omits the ``source`` and ``source_schema``
+            arguments from the call.
+
+            e.g.::
+
+                with batch_alter_table("address") as batch_op:
+                    batch_op.create_foreign_key(
+                        "fk_user_address",
+                        "user",
+                        ["user_id"],
+                        ["id"],
+                    )
+
+            .. seealso::
+
+                :meth:`.Operations.create_foreign_key`
+
+            """  # noqa: E501
+            ...
+
+        def create_index(
+            self, index_name: str, columns: List[str], **kw: Any
+        ) -> None:
+            """Issue a "create index" instruction using the
+            current batch migration context.
+
+            .. seealso::
+
+                :meth:`.Operations.create_index`
+
+            """  # noqa: E501
+            ...
+
+        def create_primary_key(
+            self, constraint_name: Optional[str], columns: List[str]
+        ) -> None:
+            """Issue a "create primary key" instruction using the
+            current batch migration context.
+
+            The batch form of this call omits the ``table_name`` and ``schema``
+            arguments from the call.
+
+            .. seealso::
+
+                :meth:`.Operations.create_primary_key`
+
+            """  # noqa: E501
+            ...
+
+        def create_table_comment(
+            self,
+            comment: Optional[str],
+            *,
+            existing_comment: Optional[str] = None,
+        ) -> None:
+            """Emit a COMMENT ON operation to set the comment for a table
+            using the current batch migration context.
+
+            :param comment: string value of the comment being registered against
+             the specified table.
+            :param existing_comment: String value of a comment
+             already registered on the specified table, used within autogenerate
+             so that the operation is reversible, but not required for direct
+             use.
+
+            """  # noqa: E501
+            ...
+
+        def create_unique_constraint(
+            self, constraint_name: str, columns: Sequence[str], **kw: Any
+        ) -> Any:
+            """Issue a "create unique constraint" instruction using the
+            current batch migration context.
+
+            The batch form of this call omits the ``source`` and ``schema``
+            arguments from the call.
+
+            .. seealso::
+
+                :meth:`.Operations.create_unique_constraint`
+
+            """  # noqa: E501
+            ...
+
+        def drop_column(self, column_name: str, **kw: Any) -> None:
+            """Issue a "drop column" instruction using the current
+            batch migration context.
+
+            .. seealso::
+
+                :meth:`.Operations.drop_column`
+
+            """  # noqa: E501
+            ...
+
+        def drop_constraint(
+            self, constraint_name: str, type_: Optional[str] = None
+        ) -> None:
+            """Issue a "drop constraint" instruction using the
+            current batch migration context.
+
+            The batch form of this call omits the ``table_name`` and ``schema``
+            arguments from the call.
+
+            .. seealso::
+
+                :meth:`.Operations.drop_constraint`
+
+            """  # noqa: E501
+            ...
+
+        def drop_index(self, index_name: str, **kw: Any) -> None:
+            """Issue a "drop index" instruction using the
+            current batch migration context.
+
+            .. seealso::
+
+                :meth:`.Operations.drop_index`
+
+            """  # noqa: E501
+            ...
+
+        def drop_table_comment(
+            self, *, existing_comment: Optional[str] = None
+        ) -> None:
+            """Issue a "drop table comment" operation to
+            remove an existing comment set on a table using the current
+            batch operations context.
+
+            :param existing_comment: An optional string value of a comment already
+             registered on the specified table.
+
+            """  # noqa: E501
+            ...
+
+        def execute(
+            self,
+            sqltext: Union[Executable, str],
+            *,
+            execution_options: Optional[dict[str, Any]] = None,
+        ) -> None:
+            """Execute the given SQL using the current migration context.
+
+            .. seealso::
+
+                :meth:`.Operations.execute`
+
+            """  # noqa: E501
+            ...
+
+        # END STUB FUNCTIONS: batch_op
diff --git a/.venv/lib/python3.12/site-packages/alembic/operations/batch.py b/.venv/lib/python3.12/site-packages/alembic/operations/batch.py
new file mode 100644
index 00000000..fe183e9c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/operations/batch.py
@@ -0,0 +1,718 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from __future__ import annotations
+
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy import CheckConstraint
+from sqlalchemy import Column
+from sqlalchemy import ForeignKeyConstraint
+from sqlalchemy import Index
+from sqlalchemy import MetaData
+from sqlalchemy import PrimaryKeyConstraint
+from sqlalchemy import schema as sql_schema
+from sqlalchemy import select
+from sqlalchemy import Table
+from sqlalchemy import types as sqltypes
+from sqlalchemy.sql.schema import SchemaEventTarget
+from sqlalchemy.util import OrderedDict
+from sqlalchemy.util import topological
+
+from ..util import exc
+from ..util.sqla_compat import _columns_for_constraint
+from ..util.sqla_compat import _copy
+from ..util.sqla_compat import _copy_expression
+from ..util.sqla_compat import _ensure_scope_for_ddl
+from ..util.sqla_compat import _fk_is_self_referential
+from ..util.sqla_compat import _idx_table_bound_expressions
+from ..util.sqla_compat import _is_type_bound
+from ..util.sqla_compat import _remove_column_from_collection
+from ..util.sqla_compat import _resolve_for_variant
+from ..util.sqla_compat import constraint_name_defined
+from ..util.sqla_compat import constraint_name_string
+
+if TYPE_CHECKING:
+    from typing import Literal
+
+    from sqlalchemy.engine import Dialect
+    from sqlalchemy.sql.elements import ColumnClause
+    from sqlalchemy.sql.elements import quoted_name
+    from sqlalchemy.sql.functions import Function
+    from sqlalchemy.sql.schema import Constraint
+    from sqlalchemy.sql.type_api import TypeEngine
+
+    from ..ddl.impl import DefaultImpl
+
+
+class BatchOperationsImpl:
+    def __init__(
+        self,
+        operations,
+        table_name,
+        schema,
+        recreate,
+        copy_from,
+        table_args,
+        table_kwargs,
+        reflect_args,
+        reflect_kwargs,
+        naming_convention,
+        partial_reordering,
+    ):
+        self.operations = operations
+        self.table_name = table_name
+        self.schema = schema
+        if recreate not in ("auto", "always", "never"):
+            raise ValueError(
+                "recreate may be one of 'auto', 'always', or 'never'."
+            )
+        self.recreate = recreate
+        self.copy_from = copy_from
+        self.table_args = table_args
+        self.table_kwargs = dict(table_kwargs)
+        self.reflect_args = reflect_args
+        self.reflect_kwargs = dict(reflect_kwargs)
+        self.reflect_kwargs.setdefault(
+            "listeners", list(self.reflect_kwargs.get("listeners", ()))
+        )
+        self.reflect_kwargs["listeners"].append(
+            ("column_reflect", operations.impl.autogen_column_reflect)
+        )
+        self.naming_convention = naming_convention
+        self.partial_reordering = partial_reordering
+        self.batch = []
+
+    @property
+    def dialect(self) -> Dialect:
+        return self.operations.impl.dialect
+
+    @property
+    def impl(self) -> DefaultImpl:
+        return self.operations.impl
+
+    def _should_recreate(self) -> bool:
+        if self.recreate == "auto":
+            return self.operations.impl.requires_recreate_in_batch(self)
+        elif self.recreate == "always":
+            return True
+        else:
+            return False
+
+    def flush(self) -> None:
+        should_recreate = self._should_recreate()
+
+        with _ensure_scope_for_ddl(self.impl.connection):
+            if not should_recreate:
+                for opname, arg, kw in self.batch:
+                    fn = getattr(self.operations.impl, opname)
+                    fn(*arg, **kw)
+            else:
+                if self.naming_convention:
+                    m1 = MetaData(naming_convention=self.naming_convention)
+                else:
+                    m1 = MetaData()
+
+                if self.copy_from is not None:
+                    existing_table = self.copy_from
+                    reflected = False
+                else:
+                    if self.operations.migration_context.as_sql:
+                        raise exc.CommandError(
+                            f"This operation cannot proceed in --sql mode; "
+                            f"batch mode with dialect "
+                            f"{self.operations.migration_context.dialect.name} "  # noqa: E501
+                            f"requires a live database connection with which "
+                            f'to reflect the table "{self.table_name}". '
+                            f"To generate a batch SQL migration script using "
+                            "table "
+                            '"move and copy", a complete Table object '
+                            f'should be passed to the "copy_from" argument '
+                            "of the batch_alter_table() method so that table "
+                            "reflection can be skipped."
+                        )
+
+                    existing_table = Table(
+                        self.table_name,
+                        m1,
+                        schema=self.schema,
+                        autoload_with=self.operations.get_bind(),
+                        *self.reflect_args,
+                        **self.reflect_kwargs,
+                    )
+                    reflected = True
+
+                batch_impl = ApplyBatchImpl(
+                    self.impl,
+                    existing_table,
+                    self.table_args,
+                    self.table_kwargs,
+                    reflected,
+                    partial_reordering=self.partial_reordering,
+                )
+                for opname, arg, kw in self.batch:
+                    fn = getattr(batch_impl, opname)
+                    fn(*arg, **kw)
+
+                batch_impl._create(self.impl)
+
+    def alter_column(self, *arg, **kw) -> None:
+        self.batch.append(("alter_column", arg, kw))
+
+    def add_column(self, *arg, **kw) -> None:
+        if (
+            "insert_before" in kw or "insert_after" in kw
+        ) and not self._should_recreate():
+            raise exc.CommandError(
+                "Can't specify insert_before or insert_after when using "
+                "ALTER; please specify recreate='always'"
+            )
+        self.batch.append(("add_column", arg, kw))
+
+    def drop_column(self, *arg, **kw) -> None:
+        self.batch.append(("drop_column", arg, kw))
+
+    def add_constraint(self, const: Constraint) -> None:
+        self.batch.append(("add_constraint", (const,), {}))
+
+    def drop_constraint(self, const: Constraint) -> None:
+        self.batch.append(("drop_constraint", (const,), {}))
+
+    def rename_table(self, *arg, **kw):
+        self.batch.append(("rename_table", arg, kw))
+
+    def create_index(self, idx: Index, **kw: Any) -> None:
+        self.batch.append(("create_index", (idx,), kw))
+
+    def drop_index(self, idx: Index, **kw: Any) -> None:
+        self.batch.append(("drop_index", (idx,), kw))
+
+    def create_table_comment(self, table):
+        self.batch.append(("create_table_comment", (table,), {}))
+
+    def drop_table_comment(self, table):
+        self.batch.append(("drop_table_comment", (table,), {}))
+
+    def create_table(self, table):
+        raise NotImplementedError("Can't create table in batch mode")
+
+    def drop_table(self, table):
+        raise NotImplementedError("Can't drop table in batch mode")
+
+    def create_column_comment(self, column):
+        self.batch.append(("create_column_comment", (column,), {}))
+
+
+class ApplyBatchImpl:
+    def __init__(
+        self,
+        impl: DefaultImpl,
+        table: Table,
+        table_args: tuple,
+        table_kwargs: Dict[str, Any],
+        reflected: bool,
+        partial_reordering: tuple = (),
+    ) -> None:
+        self.impl = impl
+        self.table = table  # this is a Table object
+        self.table_args = table_args
+        self.table_kwargs = table_kwargs
+        self.temp_table_name = self._calc_temp_name(table.name)
+        self.new_table: Optional[Table] = None
+
+        self.partial_reordering = partial_reordering  # tuple of tuples
+        self.add_col_ordering: Tuple[
+            Tuple[str, str], ...
+        ] = ()  # tuple of tuples
+
+        self.column_transfers = OrderedDict(
+            (c.name, {"expr": c}) for c in self.table.c
+        )
+        self.existing_ordering = list(self.column_transfers)
+
+        self.reflected = reflected
+        self._grab_table_elements()
+
+    @classmethod
+    def _calc_temp_name(cls, tablename: Union[quoted_name, str]) -> str:
+        return ("_alembic_tmp_%s" % tablename)[0:50]
+
+    def _grab_table_elements(self) -> None:
+        schema = self.table.schema
+        self.columns: Dict[str, Column[Any]] = OrderedDict()
+        for c in self.table.c:
+            c_copy = _copy(c, schema=schema)
+            c_copy.unique = c_copy.index = False
+            # ensure that the type object was copied,
+            # as we may need to modify it in-place
+            if isinstance(c.type, SchemaEventTarget):
+                assert c_copy.type is not c.type
+            self.columns[c.name] = c_copy
+        self.named_constraints: Dict[str, Constraint] = {}
+        self.unnamed_constraints = []
+        self.col_named_constraints = {}
+        self.indexes: Dict[str, Index] = {}
+        self.new_indexes: Dict[str, Index] = {}
+
+        for const in self.table.constraints:
+            if _is_type_bound(const):
+                continue
+            elif (
+                self.reflected
+                and isinstance(const, CheckConstraint)
+                and not const.name
+            ):
+                # TODO: we are skipping unnamed reflected CheckConstraint
+                # because
+                # we have no way to determine _is_type_bound() for these.
+                pass
+            elif constraint_name_string(const.name):
+                self.named_constraints[const.name] = const
+            else:
+                self.unnamed_constraints.append(const)
+
+        if not self.reflected:
+            for col in self.table.c:
+                for const in col.constraints:
+                    if const.name:
+                        self.col_named_constraints[const.name] = (col, const)
+
+        for idx in self.table.indexes:
+            self.indexes[idx.name] = idx  # type: ignore[index]
+
+        for k in self.table.kwargs:
+            self.table_kwargs.setdefault(k, self.table.kwargs[k])
+
+    def _adjust_self_columns_for_partial_reordering(self) -> None:
+        pairs = set()
+
+        col_by_idx = list(self.columns)
+
+        if self.partial_reordering:
+            for tuple_ in self.partial_reordering:
+                for index, elem in enumerate(tuple_):
+                    if index > 0:
+                        pairs.add((tuple_[index - 1], elem))
+        else:
+            for index, elem in enumerate(self.existing_ordering):
+                if index > 0:
+                    pairs.add((col_by_idx[index - 1], elem))
+
+        pairs.update(self.add_col_ordering)
+
+        # this can happen if some columns were dropped and not removed
+        # from existing_ordering.  this should be prevented already, but
+        # conservatively making sure this didn't happen
+        pairs_list = [p for p in pairs if p[0] != p[1]]
+
+        sorted_ = list(
+            topological.sort(pairs_list, col_by_idx, deterministic_order=True)
+        )
+        self.columns = OrderedDict((k, self.columns[k]) for k in sorted_)
+        self.column_transfers = OrderedDict(
+            (k, self.column_transfers[k]) for k in sorted_
+        )
+
+    def _transfer_elements_to_new_table(self) -> None:
+        assert self.new_table is None, "Can only create new table once"
+
+        m = MetaData()
+        schema = self.table.schema
+
+        if self.partial_reordering or self.add_col_ordering:
+            self._adjust_self_columns_for_partial_reordering()
+
+        self.new_table = new_table = Table(
+            self.temp_table_name,
+            m,
+            *(list(self.columns.values()) + list(self.table_args)),
+            schema=schema,
+            **self.table_kwargs,
+        )
+
+        for const in (
+            list(self.named_constraints.values()) + self.unnamed_constraints
+        ):
+            const_columns = {c.key for c in _columns_for_constraint(const)}
+
+            if not const_columns.issubset(self.column_transfers):
+                continue
+
+            const_copy: Constraint
+            if isinstance(const, ForeignKeyConstraint):
+                if _fk_is_self_referential(const):
+                    # for self-referential constraint, refer to the
+                    # *original* table name, and not _alembic_batch_temp.
+                    # This is consistent with how we're handling
+                    # FK constraints from other tables; we assume SQLite
+                    # no foreign keys just keeps the names unchanged, so
+                    # when we rename back, they match again.
+                    const_copy = _copy(
+                        const, schema=schema, target_table=self.table
+                    )
+                else:
+                    # "target_table" for ForeignKeyConstraint.copy() is
+                    # only used if the FK is detected as being
+                    # self-referential, which we are handling above.
+                    const_copy = _copy(const, schema=schema)
+            else:
+                const_copy = _copy(
+                    const, schema=schema, target_table=new_table
+                )
+            if isinstance(const, ForeignKeyConstraint):
+                self._setup_referent(m, const)
+            new_table.append_constraint(const_copy)
+
+    def _gather_indexes_from_both_tables(self) -> List[Index]:
+        assert self.new_table is not None
+        idx: List[Index] = []
+
+        for idx_existing in self.indexes.values():
+            # this is a lift-and-move from Table.to_metadata
+
+            if idx_existing._column_flag:
+                continue
+
+            idx_copy = Index(
+                idx_existing.name,
+                unique=idx_existing.unique,
+                *[
+                    _copy_expression(expr, self.new_table)
+                    for expr in _idx_table_bound_expressions(idx_existing)
+                ],
+                _table=self.new_table,
+                **idx_existing.kwargs,
+            )
+            idx.append(idx_copy)
+
+        for index in self.new_indexes.values():
+            idx.append(
+                Index(
+                    index.name,
+                    unique=index.unique,
+                    *[self.new_table.c[col] for col in index.columns.keys()],
+                    **index.kwargs,
+                )
+            )
+        return idx
+
+    def _setup_referent(
+        self, metadata: MetaData, constraint: ForeignKeyConstraint
+    ) -> None:
+        spec = constraint.elements[0]._get_colspec()
+        parts = spec.split(".")
+        tname = parts[-2]
+        if len(parts) == 3:
+            referent_schema = parts[0]
+        else:
+            referent_schema = None
+
+        if tname != self.temp_table_name:
+            key = sql_schema._get_table_key(tname, referent_schema)
+
+            def colspec(elem: Any):
+                return elem._get_colspec()
+
+            if key in metadata.tables:
+                t = metadata.tables[key]
+                for elem in constraint.elements:
+                    colname = colspec(elem).split(".")[-1]
+                    if colname not in t.c:
+                        t.append_column(Column(colname, sqltypes.NULLTYPE))
+            else:
+                Table(
+                    tname,
+                    metadata,
+                    *[
+                        Column(n, sqltypes.NULLTYPE)
+                        for n in [
+                            colspec(elem).split(".")[-1]
+                            for elem in constraint.elements
+                        ]
+                    ],
+                    schema=referent_schema,
+                )
+
+    def _create(self, op_impl: DefaultImpl) -> None:
+        self._transfer_elements_to_new_table()
+
+        op_impl.prep_table_for_batch(self, self.table)
+        assert self.new_table is not None
+        op_impl.create_table(self.new_table)
+
+        try:
+            op_impl._exec(
+                self.new_table.insert()
+                .inline()
+                .from_select(
+                    list(
+                        k
+                        for k, transfer in self.column_transfers.items()
+                        if "expr" in transfer
+                    ),
+                    select(
+                        *[
+                            transfer["expr"]
+                            for transfer in self.column_transfers.values()
+                            if "expr" in transfer
+                        ]
+                    ),
+                )
+            )
+            op_impl.drop_table(self.table)
+        except:
+            op_impl.drop_table(self.new_table)
+            raise
+        else:
+            op_impl.rename_table(
+                self.temp_table_name, self.table.name, schema=self.table.schema
+            )
+            self.new_table.name = self.table.name
+            try:
+                for idx in self._gather_indexes_from_both_tables():
+                    op_impl.create_index(idx)
+            finally:
+                self.new_table.name = self.temp_table_name
+
+    def alter_column(
+        self,
+        table_name: str,
+        column_name: str,
+        nullable: Optional[bool] = None,
+        server_default: Optional[Union[Function[Any], str, bool]] = False,
+        name: Optional[str] = None,
+        type_: Optional[TypeEngine] = None,
+        autoincrement: Optional[Union[bool, Literal["auto"]]] = None,
+        comment: Union[str, Literal[False]] = False,
+        **kw,
+    ) -> None:
+        existing = self.columns[column_name]
+        existing_transfer: Dict[str, Any] = self.column_transfers[column_name]
+        if name is not None and name != column_name:
+            # note that we don't change '.key' - we keep referring
+            # to the renamed column by its old key in _create().  neat!
+            existing.name = name
+            existing_transfer["name"] = name
+
+            existing_type = kw.get("existing_type", None)
+            if existing_type:
+                resolved_existing_type = _resolve_for_variant(
+                    kw["existing_type"], self.impl.dialect
+                )
+
+                # pop named constraints for Boolean/Enum for rename
+                if (
+                    isinstance(resolved_existing_type, SchemaEventTarget)
+                    and resolved_existing_type.name  # type:ignore[attr-defined]  # noqa E501
+                ):
+                    self.named_constraints.pop(
+                        resolved_existing_type.name,  # type:ignore[attr-defined]  # noqa E501
+                        None,
+                    )
+
+        if type_ is not None:
+            type_ = sqltypes.to_instance(type_)
+            # old type is being discarded so turn off eventing
+            # rules. Alternatively we can
+            # erase the events set up by this type, but this is simpler.
+            # we also ignore the drop_constraint that will come here from
+            # Operations.implementation_for(alter_column)
+
+            if isinstance(existing.type, SchemaEventTarget):
+                existing.type._create_events = (  # type:ignore[attr-defined]
+                    existing.type.create_constraint  # type:ignore[attr-defined] # noqa
+                ) = False
+
+            self.impl.cast_for_batch_migrate(
+                existing, existing_transfer, type_
+            )
+
+            existing.type = type_
+
+            # we *dont* however set events for the new type, because
+            # alter_column is invoked from
+            # Operations.implementation_for(alter_column) which already
+            # will emit an add_constraint()
+
+        if nullable is not None:
+            existing.nullable = nullable
+        if server_default is not False:
+            if server_default is None:
+                existing.server_default = None
+            else:
+                sql_schema.DefaultClause(
+                    server_default  # type: ignore[arg-type]
+                )._set_parent(existing)
+        if autoincrement is not None:
+            existing.autoincrement = bool(autoincrement)
+
+        if comment is not False:
+            existing.comment = comment
+
+    def _setup_dependencies_for_add_column(
+        self,
+        colname: str,
+        insert_before: Optional[str],
+        insert_after: Optional[str],
+    ) -> None:
+        index_cols = self.existing_ordering
+        col_indexes = {name: i for i, name in enumerate(index_cols)}
+
+        if not self.partial_reordering:
+            if insert_after:
+                if not insert_before:
+                    if insert_after in col_indexes:
+                        # insert after an existing column
+                        idx = col_indexes[insert_after] + 1
+                        if idx < len(index_cols):
+                            insert_before = index_cols[idx]
+                    else:
+                        # insert after a column that is also new
+                        insert_before = dict(self.add_col_ordering)[
+                            insert_after
+                        ]
+            if insert_before:
+                if not insert_after:
+                    if insert_before in col_indexes:
+                        # insert before an existing column
+                        idx = col_indexes[insert_before] - 1
+                        if idx >= 0:
+                            insert_after = index_cols[idx]
+                    else:
+                        # insert before a column that is also new
+                        insert_after = {
+                            b: a for a, b in self.add_col_ordering
+                        }[insert_before]
+
+        if insert_before:
+            self.add_col_ordering += ((colname, insert_before),)
+        if insert_after:
+            self.add_col_ordering += ((insert_after, colname),)
+
+        if (
+            not self.partial_reordering
+            and not insert_before
+            and not insert_after
+            and col_indexes
+        ):
+            self.add_col_ordering += ((index_cols[-1], colname),)
+
+    def add_column(
+        self,
+        table_name: str,
+        column: Column[Any],
+        insert_before: Optional[str] = None,
+        insert_after: Optional[str] = None,
+        **kw,
+    ) -> None:
+        self._setup_dependencies_for_add_column(
+            column.name, insert_before, insert_after
+        )
+        # we copy the column because operations.add_column()
+        # gives us a Column that is part of a Table already.
+        self.columns[column.name] = _copy(column, schema=self.table.schema)
+        self.column_transfers[column.name] = {}
+
+    def drop_column(
+        self,
+        table_name: str,
+        column: Union[ColumnClause[Any], Column[Any]],
+        **kw,
+    ) -> None:
+        if column.name in self.table.primary_key.columns:
+            _remove_column_from_collection(
+                self.table.primary_key.columns, column
+            )
+        del self.columns[column.name]
+        del self.column_transfers[column.name]
+        self.existing_ordering.remove(column.name)
+
+        # pop named constraints for Boolean/Enum for rename
+        if (
+            "existing_type" in kw
+            and isinstance(kw["existing_type"], SchemaEventTarget)
+            and kw["existing_type"].name  # type:ignore[attr-defined]
+        ):
+            self.named_constraints.pop(
+                kw["existing_type"].name, None  # type:ignore[attr-defined]
+            )
+
+    def create_column_comment(self, column):
+        """the batch table creation function will issue create_column_comment
+        on the real "impl" as part of the create table process.
+
+        That is, the Column object will have the comment on it already,
+        so when it is received by add_column() it will be a normal part of
+        the CREATE TABLE and doesn't need an extra step here.
+
+        """
+
+    def create_table_comment(self, table):
+        """the batch table creation function will issue create_table_comment
+        on the real "impl" as part of the create table process.
+
+        """
+
+    def drop_table_comment(self, table):
+        """the batch table creation function will issue drop_table_comment
+        on the real "impl" as part of the create table process.
+
+        """
+
+    def add_constraint(self, const: Constraint) -> None:
+        if not constraint_name_defined(const.name):
+            raise ValueError("Constraint must have a name")
+        if isinstance(const, sql_schema.PrimaryKeyConstraint):
+            if self.table.primary_key in self.unnamed_constraints:
+                self.unnamed_constraints.remove(self.table.primary_key)
+
+        if constraint_name_string(const.name):
+            self.named_constraints[const.name] = const
+        else:
+            self.unnamed_constraints.append(const)
+
+    def drop_constraint(self, const: Constraint) -> None:
+        if not const.name:
+            raise ValueError("Constraint must have a name")
+        try:
+            if const.name in self.col_named_constraints:
+                col, const = self.col_named_constraints.pop(const.name)
+
+                for col_const in list(self.columns[col.name].constraints):
+                    if col_const.name == const.name:
+                        self.columns[col.name].constraints.remove(col_const)
+            elif constraint_name_string(const.name):
+                const = self.named_constraints.pop(const.name)
+            elif const in self.unnamed_constraints:
+                self.unnamed_constraints.remove(const)
+
+        except KeyError:
+            if _is_type_bound(const):
+                # type-bound constraints are only included in the new
+                # table via their type object in any case, so ignore the
+                # drop_constraint() that comes here via the
+                # Operations.implementation_for(alter_column)
+                return
+            raise ValueError("No such constraint: '%s'" % const.name)
+        else:
+            if isinstance(const, PrimaryKeyConstraint):
+                for col in const.columns:
+                    self.columns[col.name].primary_key = False
+
+    def create_index(self, idx: Index) -> None:
+        self.new_indexes[idx.name] = idx  # type: ignore[index]
+
+    def drop_index(self, idx: Index) -> None:
+        try:
+            del self.indexes[idx.name]  # type: ignore[arg-type]
+        except KeyError:
+            raise ValueError("No such index: '%s'" % idx.name)
+
+    def rename_table(self, *arg, **kw):
+        raise NotImplementedError("TODO")
diff --git a/.venv/lib/python3.12/site-packages/alembic/operations/ops.py b/.venv/lib/python3.12/site-packages/alembic/operations/ops.py
new file mode 100644
index 00000000..bb4d825b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/operations/ops.py
@@ -0,0 +1,2799 @@
+from __future__ import annotations
+
+from abc import abstractmethod
+import re
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import FrozenSet
+from typing import Iterator
+from typing import List
+from typing import MutableMapping
+from typing import Optional
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from sqlalchemy.types import NULLTYPE
+
+from . import schemaobj
+from .base import BatchOperations
+from .base import Operations
+from .. import util
+from ..util import sqla_compat
+
+if TYPE_CHECKING:
+    from typing import Literal
+
+    from sqlalchemy.sql import Executable
+    from sqlalchemy.sql.elements import ColumnElement
+    from sqlalchemy.sql.elements import conv
+    from sqlalchemy.sql.elements import quoted_name
+    from sqlalchemy.sql.elements import TextClause
+    from sqlalchemy.sql.schema import CheckConstraint
+    from sqlalchemy.sql.schema import Column
+    from sqlalchemy.sql.schema import Computed
+    from sqlalchemy.sql.schema import Constraint
+    from sqlalchemy.sql.schema import ForeignKeyConstraint
+    from sqlalchemy.sql.schema import Identity
+    from sqlalchemy.sql.schema import Index
+    from sqlalchemy.sql.schema import MetaData
+    from sqlalchemy.sql.schema import PrimaryKeyConstraint
+    from sqlalchemy.sql.schema import SchemaItem
+    from sqlalchemy.sql.schema import Table
+    from sqlalchemy.sql.schema import UniqueConstraint
+    from sqlalchemy.sql.selectable import TableClause
+    from sqlalchemy.sql.type_api import TypeEngine
+
+    from ..autogenerate.rewriter import Rewriter
+    from ..runtime.migration import MigrationContext
+    from ..script.revision import _RevIdType
+
+_T = TypeVar("_T", bound=Any)
+_AC = TypeVar("_AC", bound="AddConstraintOp")
+
+
+class MigrateOperation:
+    """base class for migration command and organization objects.
+
+    This system is part of the operation extensibility API.
+
+    .. seealso::
+
+        :ref:`operation_objects`
+
+        :ref:`operation_plugins`
+
+        :ref:`customizing_revision`
+
+    """
+
+    @util.memoized_property
+    def info(self) -> Dict[Any, Any]:
+        """A dictionary that may be used to store arbitrary information
+        along with this :class:`.MigrateOperation` object.
+
+        """
+        return {}
+
+    _mutations: FrozenSet[Rewriter] = frozenset()
+
+    def reverse(self) -> MigrateOperation:
+        raise NotImplementedError
+
+    def to_diff_tuple(self) -> Tuple[Any, ...]:
+        raise NotImplementedError
+
+
+class AddConstraintOp(MigrateOperation):
+    """Represent an add constraint operation."""
+
+    add_constraint_ops = util.Dispatcher()
+
+    @property
+    def constraint_type(self) -> str:
+        raise NotImplementedError()
+
+    @classmethod
+    def register_add_constraint(
+        cls, type_: str
+    ) -> Callable[[Type[_AC]], Type[_AC]]:
+        def go(klass: Type[_AC]) -> Type[_AC]:
+            cls.add_constraint_ops.dispatch_for(type_)(klass.from_constraint)
+            return klass
+
+        return go
+
+    @classmethod
+    def from_constraint(cls, constraint: Constraint) -> AddConstraintOp:
+        return cls.add_constraint_ops.dispatch(constraint.__visit_name__)(  # type: ignore[no-any-return]  # noqa: E501
+            constraint
+        )
+
+    @abstractmethod
+    def to_constraint(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> Constraint:
+        pass
+
+    def reverse(self) -> DropConstraintOp:
+        return DropConstraintOp.from_constraint(self.to_constraint())
+
+    def to_diff_tuple(self) -> Tuple[str, Constraint]:
+        return ("add_constraint", self.to_constraint())
+
+
+@Operations.register_operation("drop_constraint")
+@BatchOperations.register_operation("drop_constraint", "batch_drop_constraint")
+class DropConstraintOp(MigrateOperation):
+    """Represent a drop constraint operation."""
+
+    def __init__(
+        self,
+        constraint_name: Optional[sqla_compat._ConstraintNameDefined],
+        table_name: str,
+        type_: Optional[str] = None,
+        *,
+        schema: Optional[str] = None,
+        _reverse: Optional[AddConstraintOp] = None,
+    ) -> None:
+        self.constraint_name = constraint_name
+        self.table_name = table_name
+        self.constraint_type = type_
+        self.schema = schema
+        self._reverse = _reverse
+
+    def reverse(self) -> AddConstraintOp:
+        return AddConstraintOp.from_constraint(self.to_constraint())
+
+    def to_diff_tuple(
+        self,
+    ) -> Tuple[str, SchemaItem]:
+        if self.constraint_type == "foreignkey":
+            return ("remove_fk", self.to_constraint())
+        else:
+            return ("remove_constraint", self.to_constraint())
+
+    @classmethod
+    def from_constraint(cls, constraint: Constraint) -> DropConstraintOp:
+        types = {
+            "unique_constraint": "unique",
+            "foreign_key_constraint": "foreignkey",
+            "primary_key_constraint": "primary",
+            "check_constraint": "check",
+            "column_check_constraint": "check",
+            "table_or_column_check_constraint": "check",
+        }
+
+        constraint_table = sqla_compat._table_for_constraint(constraint)
+        return cls(
+            sqla_compat.constraint_name_or_none(constraint.name),
+            constraint_table.name,
+            schema=constraint_table.schema,
+            type_=types.get(constraint.__visit_name__),
+            _reverse=AddConstraintOp.from_constraint(constraint),
+        )
+
+    def to_constraint(self) -> Constraint:
+        if self._reverse is not None:
+            constraint = self._reverse.to_constraint()
+            constraint.name = self.constraint_name
+            constraint_table = sqla_compat._table_for_constraint(constraint)
+            constraint_table.name = self.table_name
+            constraint_table.schema = self.schema
+
+            return constraint
+        else:
+            raise ValueError(
+                "constraint cannot be produced; "
+                "original constraint is not present"
+            )
+
+    @classmethod
+    def drop_constraint(
+        cls,
+        operations: Operations,
+        constraint_name: str,
+        table_name: str,
+        type_: Optional[str] = None,
+        *,
+        schema: Optional[str] = None,
+    ) -> None:
+        r"""Drop a constraint of the given name, typically via DROP CONSTRAINT.
+
+        :param constraint_name: name of the constraint.
+        :param table_name: table name.
+        :param type\_: optional, required on MySQL.  can be
+         'foreignkey', 'primary', 'unique', or 'check'.
+        :param schema: Optional schema name to operate within.  To control
+         quoting of the schema outside of the default behavior, use
+         the SQLAlchemy construct
+         :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+        """
+
+        op = cls(constraint_name, table_name, type_=type_, schema=schema)
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_drop_constraint(
+        cls,
+        operations: BatchOperations,
+        constraint_name: str,
+        type_: Optional[str] = None,
+    ) -> None:
+        """Issue a "drop constraint" instruction using the
+        current batch migration context.
+
+        The batch form of this call omits the ``table_name`` and ``schema``
+        arguments from the call.
+
+        .. seealso::
+
+            :meth:`.Operations.drop_constraint`
+
+        """
+        op = cls(
+            constraint_name,
+            operations.impl.table_name,
+            type_=type_,
+            schema=operations.impl.schema,
+        )
+        return operations.invoke(op)
+
+
+@Operations.register_operation("create_primary_key")
+@BatchOperations.register_operation(
+    "create_primary_key", "batch_create_primary_key"
+)
+@AddConstraintOp.register_add_constraint("primary_key_constraint")
+class CreatePrimaryKeyOp(AddConstraintOp):
+    """Represent a create primary key operation."""
+
+    constraint_type = "primarykey"
+
+    def __init__(
+        self,
+        constraint_name: Optional[sqla_compat._ConstraintNameDefined],
+        table_name: str,
+        columns: Sequence[str],
+        *,
+        schema: Optional[str] = None,
+        **kw: Any,
+    ) -> None:
+        self.constraint_name = constraint_name
+        self.table_name = table_name
+        self.columns = columns
+        self.schema = schema
+        self.kw = kw
+
+    @classmethod
+    def from_constraint(cls, constraint: Constraint) -> CreatePrimaryKeyOp:
+        constraint_table = sqla_compat._table_for_constraint(constraint)
+        pk_constraint = cast("PrimaryKeyConstraint", constraint)
+        return cls(
+            sqla_compat.constraint_name_or_none(pk_constraint.name),
+            constraint_table.name,
+            pk_constraint.columns.keys(),
+            schema=constraint_table.schema,
+            **pk_constraint.dialect_kwargs,
+        )
+
+    def to_constraint(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> PrimaryKeyConstraint:
+        schema_obj = schemaobj.SchemaObjects(migration_context)
+
+        return schema_obj.primary_key_constraint(
+            self.constraint_name,
+            self.table_name,
+            self.columns,
+            schema=self.schema,
+            **self.kw,
+        )
+
+    @classmethod
+    def create_primary_key(
+        cls,
+        operations: Operations,
+        constraint_name: Optional[str],
+        table_name: str,
+        columns: List[str],
+        *,
+        schema: Optional[str] = None,
+    ) -> None:
+        """Issue a "create primary key" instruction using the current
+        migration context.
+
+        e.g.::
+
+            from alembic import op
+
+            op.create_primary_key("pk_my_table", "my_table", ["id", "version"])
+
+        This internally generates a :class:`~sqlalchemy.schema.Table` object
+        containing the necessary columns, then generates a new
+        :class:`~sqlalchemy.schema.PrimaryKeyConstraint`
+        object which it then associates with the
+        :class:`~sqlalchemy.schema.Table`.
+        Any event listeners associated with this action will be fired
+        off normally.   The :class:`~sqlalchemy.schema.AddConstraint`
+        construct is ultimately used to generate the ALTER statement.
+
+        :param constraint_name: Name of the primary key constraint.  The name
+         is necessary so that an ALTER statement can be emitted.  For setups
+         that use an automated naming scheme such as that described at
+         :ref:`sqla:constraint_naming_conventions`
+         ``name`` here can be ``None``, as the event listener will
+         apply the name to the constraint object when it is associated
+         with the table.
+        :param table_name: String name of the target table.
+        :param columns: a list of string column names to be applied to the
+         primary key constraint.
+        :param schema: Optional schema name to operate within.  To control
+         quoting of the schema outside of the default behavior, use
+         the SQLAlchemy construct
+         :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+        """
+        op = cls(constraint_name, table_name, columns, schema=schema)
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_create_primary_key(
+        cls,
+        operations: BatchOperations,
+        constraint_name: Optional[str],
+        columns: List[str],
+    ) -> None:
+        """Issue a "create primary key" instruction using the
+        current batch migration context.
+
+        The batch form of this call omits the ``table_name`` and ``schema``
+        arguments from the call.
+
+        .. seealso::
+
+            :meth:`.Operations.create_primary_key`
+
+        """
+        op = cls(
+            constraint_name,
+            operations.impl.table_name,
+            columns,
+            schema=operations.impl.schema,
+        )
+        return operations.invoke(op)
+
+
+@Operations.register_operation("create_unique_constraint")
+@BatchOperations.register_operation(
+    "create_unique_constraint", "batch_create_unique_constraint"
+)
+@AddConstraintOp.register_add_constraint("unique_constraint")
+class CreateUniqueConstraintOp(AddConstraintOp):
+    """Represent a create unique constraint operation."""
+
+    constraint_type = "unique"
+
+    def __init__(
+        self,
+        constraint_name: Optional[sqla_compat._ConstraintNameDefined],
+        table_name: str,
+        columns: Sequence[str],
+        *,
+        schema: Optional[str] = None,
+        **kw: Any,
+    ) -> None:
+        self.constraint_name = constraint_name
+        self.table_name = table_name
+        self.columns = columns
+        self.schema = schema
+        self.kw = kw
+
+    @classmethod
+    def from_constraint(
+        cls, constraint: Constraint
+    ) -> CreateUniqueConstraintOp:
+        constraint_table = sqla_compat._table_for_constraint(constraint)
+
+        uq_constraint = cast("UniqueConstraint", constraint)
+
+        kw: Dict[str, Any] = {}
+        if uq_constraint.deferrable:
+            kw["deferrable"] = uq_constraint.deferrable
+        if uq_constraint.initially:
+            kw["initially"] = uq_constraint.initially
+        kw.update(uq_constraint.dialect_kwargs)
+        return cls(
+            sqla_compat.constraint_name_or_none(uq_constraint.name),
+            constraint_table.name,
+            [c.name for c in uq_constraint.columns],
+            schema=constraint_table.schema,
+            **kw,
+        )
+
+    def to_constraint(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> UniqueConstraint:
+        schema_obj = schemaobj.SchemaObjects(migration_context)
+        return schema_obj.unique_constraint(
+            self.constraint_name,
+            self.table_name,
+            self.columns,
+            schema=self.schema,
+            **self.kw,
+        )
+
+    @classmethod
+    def create_unique_constraint(
+        cls,
+        operations: Operations,
+        constraint_name: Optional[str],
+        table_name: str,
+        columns: Sequence[str],
+        *,
+        schema: Optional[str] = None,
+        **kw: Any,
+    ) -> Any:
+        """Issue a "create unique constraint" instruction using the
+        current migration context.
+
+        e.g.::
+
+            from alembic import op
+            op.create_unique_constraint("uq_user_name", "user", ["name"])
+
+        This internally generates a :class:`~sqlalchemy.schema.Table` object
+        containing the necessary columns, then generates a new
+        :class:`~sqlalchemy.schema.UniqueConstraint`
+        object which it then associates with the
+        :class:`~sqlalchemy.schema.Table`.
+        Any event listeners associated with this action will be fired
+        off normally.   The :class:`~sqlalchemy.schema.AddConstraint`
+        construct is ultimately used to generate the ALTER statement.
+
+        :param name: Name of the unique constraint.  The name is necessary
+         so that an ALTER statement can be emitted.  For setups that
+         use an automated naming scheme such as that described at
+         :ref:`sqla:constraint_naming_conventions`,
+         ``name`` here can be ``None``, as the event listener will
+         apply the name to the constraint object when it is associated
+         with the table.
+        :param table_name: String name of the source table.
+        :param columns: a list of string column names in the
+         source table.
+        :param deferrable: optional bool. If set, emit DEFERRABLE or
+         NOT DEFERRABLE when issuing DDL for this constraint.
+        :param initially: optional string. If set, emit INITIALLY <value>
+         when issuing DDL for this constraint.
+        :param schema: Optional schema name to operate within.  To control
+         quoting of the schema outside of the default behavior, use
+         the SQLAlchemy construct
+         :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+        """
+
+        op = cls(constraint_name, table_name, columns, schema=schema, **kw)
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_create_unique_constraint(
+        cls,
+        operations: BatchOperations,
+        constraint_name: str,
+        columns: Sequence[str],
+        **kw: Any,
+    ) -> Any:
+        """Issue a "create unique constraint" instruction using the
+        current batch migration context.
+
+        The batch form of this call omits the ``source`` and ``schema``
+        arguments from the call.
+
+        .. seealso::
+
+            :meth:`.Operations.create_unique_constraint`
+
+        """
+        kw["schema"] = operations.impl.schema
+        op = cls(constraint_name, operations.impl.table_name, columns, **kw)
+        return operations.invoke(op)
+
+
+@Operations.register_operation("create_foreign_key")
+@BatchOperations.register_operation(
+    "create_foreign_key", "batch_create_foreign_key"
+)
+@AddConstraintOp.register_add_constraint("foreign_key_constraint")
+class CreateForeignKeyOp(AddConstraintOp):
+    """Represent a create foreign key constraint operation."""
+
+    constraint_type = "foreignkey"
+
+    def __init__(
+        self,
+        constraint_name: Optional[sqla_compat._ConstraintNameDefined],
+        source_table: str,
+        referent_table: str,
+        local_cols: List[str],
+        remote_cols: List[str],
+        **kw: Any,
+    ) -> None:
+        self.constraint_name = constraint_name
+        self.source_table = source_table
+        self.referent_table = referent_table
+        self.local_cols = local_cols
+        self.remote_cols = remote_cols
+        self.kw = kw
+
+    def to_diff_tuple(self) -> Tuple[str, ForeignKeyConstraint]:
+        return ("add_fk", self.to_constraint())
+
+    @classmethod
+    def from_constraint(cls, constraint: Constraint) -> CreateForeignKeyOp:
+        fk_constraint = cast("ForeignKeyConstraint", constraint)
+        kw: Dict[str, Any] = {}
+        if fk_constraint.onupdate:
+            kw["onupdate"] = fk_constraint.onupdate
+        if fk_constraint.ondelete:
+            kw["ondelete"] = fk_constraint.ondelete
+        if fk_constraint.initially:
+            kw["initially"] = fk_constraint.initially
+        if fk_constraint.deferrable:
+            kw["deferrable"] = fk_constraint.deferrable
+        if fk_constraint.use_alter:
+            kw["use_alter"] = fk_constraint.use_alter
+        if fk_constraint.match:
+            kw["match"] = fk_constraint.match
+
+        (
+            source_schema,
+            source_table,
+            source_columns,
+            target_schema,
+            target_table,
+            target_columns,
+            onupdate,
+            ondelete,
+            deferrable,
+            initially,
+        ) = sqla_compat._fk_spec(fk_constraint)
+
+        kw["source_schema"] = source_schema
+        kw["referent_schema"] = target_schema
+        kw.update(fk_constraint.dialect_kwargs)
+        return cls(
+            sqla_compat.constraint_name_or_none(fk_constraint.name),
+            source_table,
+            target_table,
+            source_columns,
+            target_columns,
+            **kw,
+        )
+
+    def to_constraint(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> ForeignKeyConstraint:
+        schema_obj = schemaobj.SchemaObjects(migration_context)
+        return schema_obj.foreign_key_constraint(
+            self.constraint_name,
+            self.source_table,
+            self.referent_table,
+            self.local_cols,
+            self.remote_cols,
+            **self.kw,
+        )
+
+    @classmethod
+    def create_foreign_key(
+        cls,
+        operations: Operations,
+        constraint_name: Optional[str],
+        source_table: str,
+        referent_table: str,
+        local_cols: List[str],
+        remote_cols: List[str],
+        *,
+        onupdate: Optional[str] = None,
+        ondelete: Optional[str] = None,
+        deferrable: Optional[bool] = None,
+        initially: Optional[str] = None,
+        match: Optional[str] = None,
+        source_schema: Optional[str] = None,
+        referent_schema: Optional[str] = None,
+        **dialect_kw: Any,
+    ) -> None:
+        """Issue a "create foreign key" instruction using the
+        current migration context.
+
+        e.g.::
+
+            from alembic import op
+
+            op.create_foreign_key(
+                "fk_user_address",
+                "address",
+                "user",
+                ["user_id"],
+                ["id"],
+            )
+
+        This internally generates a :class:`~sqlalchemy.schema.Table` object
+        containing the necessary columns, then generates a new
+        :class:`~sqlalchemy.schema.ForeignKeyConstraint`
+        object which it then associates with the
+        :class:`~sqlalchemy.schema.Table`.
+        Any event listeners associated with this action will be fired
+        off normally.   The :class:`~sqlalchemy.schema.AddConstraint`
+        construct is ultimately used to generate the ALTER statement.
+
+        :param constraint_name: Name of the foreign key constraint.  The name
+         is necessary so that an ALTER statement can be emitted.  For setups
+         that use an automated naming scheme such as that described at
+         :ref:`sqla:constraint_naming_conventions`,
+         ``name`` here can be ``None``, as the event listener will
+         apply the name to the constraint object when it is associated
+         with the table.
+        :param source_table: String name of the source table.
+        :param referent_table: String name of the destination table.
+        :param local_cols: a list of string column names in the
+         source table.
+        :param remote_cols: a list of string column names in the
+         remote table.
+        :param onupdate: Optional string. If set, emit ON UPDATE <value> when
+         issuing DDL for this constraint. Typical values include CASCADE,
+         DELETE and RESTRICT.
+        :param ondelete: Optional string. If set, emit ON DELETE <value> when
+         issuing DDL for this constraint. Typical values include CASCADE,
+         DELETE and RESTRICT.
+        :param deferrable: optional bool. If set, emit DEFERRABLE or NOT
+         DEFERRABLE when issuing DDL for this constraint.
+        :param source_schema: Optional schema name of the source table.
+        :param referent_schema: Optional schema name of the destination table.
+
+        """
+
+        op = cls(
+            constraint_name,
+            source_table,
+            referent_table,
+            local_cols,
+            remote_cols,
+            onupdate=onupdate,
+            ondelete=ondelete,
+            deferrable=deferrable,
+            source_schema=source_schema,
+            referent_schema=referent_schema,
+            initially=initially,
+            match=match,
+            **dialect_kw,
+        )
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_create_foreign_key(
+        cls,
+        operations: BatchOperations,
+        constraint_name: Optional[str],
+        referent_table: str,
+        local_cols: List[str],
+        remote_cols: List[str],
+        *,
+        referent_schema: Optional[str] = None,
+        onupdate: Optional[str] = None,
+        ondelete: Optional[str] = None,
+        deferrable: Optional[bool] = None,
+        initially: Optional[str] = None,
+        match: Optional[str] = None,
+        **dialect_kw: Any,
+    ) -> None:
+        """Issue a "create foreign key" instruction using the
+        current batch migration context.
+
+        The batch form of this call omits the ``source`` and ``source_schema``
+        arguments from the call.
+
+        e.g.::
+
+            with batch_alter_table("address") as batch_op:
+                batch_op.create_foreign_key(
+                    "fk_user_address",
+                    "user",
+                    ["user_id"],
+                    ["id"],
+                )
+
+        .. seealso::
+
+            :meth:`.Operations.create_foreign_key`
+
+        """
+        op = cls(
+            constraint_name,
+            operations.impl.table_name,
+            referent_table,
+            local_cols,
+            remote_cols,
+            onupdate=onupdate,
+            ondelete=ondelete,
+            deferrable=deferrable,
+            source_schema=operations.impl.schema,
+            referent_schema=referent_schema,
+            initially=initially,
+            match=match,
+            **dialect_kw,
+        )
+        return operations.invoke(op)
+
+
+@Operations.register_operation("create_check_constraint")
+@BatchOperations.register_operation(
+    "create_check_constraint", "batch_create_check_constraint"
+)
+@AddConstraintOp.register_add_constraint("check_constraint")
+@AddConstraintOp.register_add_constraint("table_or_column_check_constraint")
+@AddConstraintOp.register_add_constraint("column_check_constraint")
+class CreateCheckConstraintOp(AddConstraintOp):
+    """Represent a create check constraint operation."""
+
+    constraint_type = "check"
+
+    def __init__(
+        self,
+        constraint_name: Optional[sqla_compat._ConstraintNameDefined],
+        table_name: str,
+        condition: Union[str, TextClause, ColumnElement[Any]],
+        *,
+        schema: Optional[str] = None,
+        **kw: Any,
+    ) -> None:
+        self.constraint_name = constraint_name
+        self.table_name = table_name
+        self.condition = condition
+        self.schema = schema
+        self.kw = kw
+
+    @classmethod
+    def from_constraint(
+        cls, constraint: Constraint
+    ) -> CreateCheckConstraintOp:
+        constraint_table = sqla_compat._table_for_constraint(constraint)
+
+        ck_constraint = cast("CheckConstraint", constraint)
+        return cls(
+            sqla_compat.constraint_name_or_none(ck_constraint.name),
+            constraint_table.name,
+            cast("ColumnElement[Any]", ck_constraint.sqltext),
+            schema=constraint_table.schema,
+            **ck_constraint.dialect_kwargs,
+        )
+
+    def to_constraint(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> CheckConstraint:
+        schema_obj = schemaobj.SchemaObjects(migration_context)
+        return schema_obj.check_constraint(
+            self.constraint_name,
+            self.table_name,
+            self.condition,
+            schema=self.schema,
+            **self.kw,
+        )
+
+    @classmethod
+    def create_check_constraint(
+        cls,
+        operations: Operations,
+        constraint_name: Optional[str],
+        table_name: str,
+        condition: Union[str, ColumnElement[bool], TextClause],
+        *,
+        schema: Optional[str] = None,
+        **kw: Any,
+    ) -> None:
+        """Issue a "create check constraint" instruction using the
+        current migration context.
+
+        e.g.::
+
+            from alembic import op
+            from sqlalchemy.sql import column, func
+
+            op.create_check_constraint(
+                "ck_user_name_len",
+                "user",
+                func.len(column("name")) > 5,
+            )
+
+        CHECK constraints are usually against a SQL expression, so ad-hoc
+        table metadata is usually needed.   The function will convert the given
+        arguments into a :class:`sqlalchemy.schema.CheckConstraint` bound
+        to an anonymous table in order to emit the CREATE statement.
+
+        :param name: Name of the check constraint.  The name is necessary
+         so that an ALTER statement can be emitted.  For setups that
+         use an automated naming scheme such as that described at
+         :ref:`sqla:constraint_naming_conventions`,
+         ``name`` here can be ``None``, as the event listener will
+         apply the name to the constraint object when it is associated
+         with the table.
+        :param table_name: String name of the source table.
+        :param condition: SQL expression that's the condition of the
+         constraint. Can be a string or SQLAlchemy expression language
+         structure.
+        :param deferrable: optional bool. If set, emit DEFERRABLE or
+         NOT DEFERRABLE when issuing DDL for this constraint.
+        :param initially: optional string. If set, emit INITIALLY <value>
+         when issuing DDL for this constraint.
+        :param schema: Optional schema name to operate within.  To control
+         quoting of the schema outside of the default behavior, use
+         the SQLAlchemy construct
+         :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+        """
+        op = cls(constraint_name, table_name, condition, schema=schema, **kw)
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_create_check_constraint(
+        cls,
+        operations: BatchOperations,
+        constraint_name: str,
+        condition: Union[str, ColumnElement[bool], TextClause],
+        **kw: Any,
+    ) -> None:
+        """Issue a "create check constraint" instruction using the
+        current batch migration context.
+
+        The batch form of this call omits the ``source`` and ``schema``
+        arguments from the call.
+
+        .. seealso::
+
+            :meth:`.Operations.create_check_constraint`
+
+        """
+        op = cls(
+            constraint_name,
+            operations.impl.table_name,
+            condition,
+            schema=operations.impl.schema,
+            **kw,
+        )
+        return operations.invoke(op)
+
+
+@Operations.register_operation("create_index")
+@BatchOperations.register_operation("create_index", "batch_create_index")
+class CreateIndexOp(MigrateOperation):
+    """Represent a create index operation."""
+
+    def __init__(
+        self,
+        index_name: Optional[str],
+        table_name: str,
+        columns: Sequence[Union[str, TextClause, ColumnElement[Any]]],
+        *,
+        schema: Optional[str] = None,
+        unique: bool = False,
+        if_not_exists: Optional[bool] = None,
+        **kw: Any,
+    ) -> None:
+        self.index_name = index_name
+        self.table_name = table_name
+        self.columns = columns
+        self.schema = schema
+        self.unique = unique
+        self.if_not_exists = if_not_exists
+        self.kw = kw
+
+    def reverse(self) -> DropIndexOp:
+        return DropIndexOp.from_index(self.to_index())
+
+    def to_diff_tuple(self) -> Tuple[str, Index]:
+        return ("add_index", self.to_index())
+
+    @classmethod
+    def from_index(cls, index: Index) -> CreateIndexOp:
+        assert index.table is not None
+        return cls(
+            index.name,
+            index.table.name,
+            index.expressions,
+            schema=index.table.schema,
+            unique=index.unique,
+            **index.kwargs,
+        )
+
+    def to_index(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> Index:
+        schema_obj = schemaobj.SchemaObjects(migration_context)
+
+        idx = schema_obj.index(
+            self.index_name,
+            self.table_name,
+            self.columns,
+            schema=self.schema,
+            unique=self.unique,
+            **self.kw,
+        )
+        return idx
+
+    @classmethod
+    def create_index(
+        cls,
+        operations: Operations,
+        index_name: Optional[str],
+        table_name: str,
+        columns: Sequence[Union[str, TextClause, ColumnElement[Any]]],
+        *,
+        schema: Optional[str] = None,
+        unique: bool = False,
+        if_not_exists: Optional[bool] = None,
+        **kw: Any,
+    ) -> None:
+        r"""Issue a "create index" instruction using the current
+        migration context.
+
+        e.g.::
+
+            from alembic import op
+
+            op.create_index("ik_test", "t1", ["foo", "bar"])
+
+        Functional indexes can be produced by using the
+        :func:`sqlalchemy.sql.expression.text` construct::
+
+            from alembic import op
+            from sqlalchemy import text
+
+            op.create_index("ik_test", "t1", [text("lower(foo)")])
+
+        :param index_name: name of the index.
+        :param table_name: name of the owning table.
+        :param columns: a list consisting of string column names and/or
+         :func:`~sqlalchemy.sql.expression.text` constructs.
+        :param schema: Optional schema name to operate within.  To control
+         quoting of the schema outside of the default behavior, use
+         the SQLAlchemy construct
+         :class:`~sqlalchemy.sql.elements.quoted_name`.
+        :param unique: If True, create a unique index.
+
+        :param quote: Force quoting of this column's name on or off,
+         corresponding to ``True`` or ``False``. When left at its default
+         of ``None``, the column identifier will be quoted according to
+         whether the name is case sensitive (identifiers with at least one
+         upper case character are treated as case sensitive), or if it's a
+         reserved word. This flag is only needed to force quoting of a
+         reserved word which is not known by the SQLAlchemy dialect.
+
+        :param if_not_exists: If True, adds IF NOT EXISTS operator when
+         creating the new index.
+
+         .. versionadded:: 1.12.0
+
+        :param \**kw: Additional keyword arguments not mentioned above are
+         dialect specific, and passed in the form
+         ``<dialectname>_<argname>``.
+         See the documentation regarding an individual dialect at
+         :ref:`dialect_toplevel` for detail on documented arguments.
+
+        """
+        op = cls(
+            index_name,
+            table_name,
+            columns,
+            schema=schema,
+            unique=unique,
+            if_not_exists=if_not_exists,
+            **kw,
+        )
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_create_index(
+        cls,
+        operations: BatchOperations,
+        index_name: str,
+        columns: List[str],
+        **kw: Any,
+    ) -> None:
+        """Issue a "create index" instruction using the
+        current batch migration context.
+
+        .. seealso::
+
+            :meth:`.Operations.create_index`
+
+        """
+
+        op = cls(
+            index_name,
+            operations.impl.table_name,
+            columns,
+            schema=operations.impl.schema,
+            **kw,
+        )
+        return operations.invoke(op)
+
+
+@Operations.register_operation("drop_index")
+@BatchOperations.register_operation("drop_index", "batch_drop_index")
+class DropIndexOp(MigrateOperation):
+    """Represent a drop index operation."""
+
+    def __init__(
+        self,
+        index_name: Union[quoted_name, str, conv],
+        table_name: Optional[str] = None,
+        *,
+        schema: Optional[str] = None,
+        if_exists: Optional[bool] = None,
+        _reverse: Optional[CreateIndexOp] = None,
+        **kw: Any,
+    ) -> None:
+        self.index_name = index_name
+        self.table_name = table_name
+        self.schema = schema
+        self.if_exists = if_exists
+        self._reverse = _reverse
+        self.kw = kw
+
+    def to_diff_tuple(self) -> Tuple[str, Index]:
+        return ("remove_index", self.to_index())
+
+    def reverse(self) -> CreateIndexOp:
+        return CreateIndexOp.from_index(self.to_index())
+
+    @classmethod
+    def from_index(cls, index: Index) -> DropIndexOp:
+        assert index.table is not None
+        return cls(
+            index.name,  # type: ignore[arg-type]
+            table_name=index.table.name,
+            schema=index.table.schema,
+            _reverse=CreateIndexOp.from_index(index),
+            unique=index.unique,
+            **index.kwargs,
+        )
+
+    def to_index(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> Index:
+        schema_obj = schemaobj.SchemaObjects(migration_context)
+
+        # need a dummy column name here since SQLAlchemy
+        # 0.7.6 and further raises on Index with no columns
+        return schema_obj.index(
+            self.index_name,
+            self.table_name,
+            self._reverse.columns if self._reverse else ["x"],
+            schema=self.schema,
+            **self.kw,
+        )
+
+    @classmethod
+    def drop_index(
+        cls,
+        operations: Operations,
+        index_name: str,
+        table_name: Optional[str] = None,
+        *,
+        schema: Optional[str] = None,
+        if_exists: Optional[bool] = None,
+        **kw: Any,
+    ) -> None:
+        r"""Issue a "drop index" instruction using the current
+        migration context.
+
+        e.g.::
+
+            drop_index("accounts")
+
+        :param index_name: name of the index.
+        :param table_name: name of the owning table.  Some
+         backends such as Microsoft SQL Server require this.
+        :param schema: Optional schema name to operate within.  To control
+         quoting of the schema outside of the default behavior, use
+         the SQLAlchemy construct
+         :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+        :param if_exists: If True, adds IF EXISTS operator when
+         dropping the index.
+
+         .. versionadded:: 1.12.0
+
+        :param \**kw: Additional keyword arguments not mentioned above are
+         dialect specific, and passed in the form
+         ``<dialectname>_<argname>``.
+         See the documentation regarding an individual dialect at
+         :ref:`dialect_toplevel` for detail on documented arguments.
+
+        """
+        op = cls(
+            index_name,
+            table_name=table_name,
+            schema=schema,
+            if_exists=if_exists,
+            **kw,
+        )
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_drop_index(
+        cls, operations: BatchOperations, index_name: str, **kw: Any
+    ) -> None:
+        """Issue a "drop index" instruction using the
+        current batch migration context.
+
+        .. seealso::
+
+            :meth:`.Operations.drop_index`
+
+        """
+
+        op = cls(
+            index_name,
+            table_name=operations.impl.table_name,
+            schema=operations.impl.schema,
+            **kw,
+        )
+        return operations.invoke(op)
+
+
+@Operations.register_operation("create_table")
+class CreateTableOp(MigrateOperation):
+    """Represent a create table operation."""
+
+    def __init__(
+        self,
+        table_name: str,
+        columns: Sequence[SchemaItem],
+        *,
+        schema: Optional[str] = None,
+        if_not_exists: Optional[bool] = None,
+        _namespace_metadata: Optional[MetaData] = None,
+        _constraints_included: bool = False,
+        **kw: Any,
+    ) -> None:
+        self.table_name = table_name
+        self.columns = columns
+        self.schema = schema
+        self.if_not_exists = if_not_exists
+        self.info = kw.pop("info", {})
+        self.comment = kw.pop("comment", None)
+        self.prefixes = kw.pop("prefixes", None)
+        self.kw = kw
+        self._namespace_metadata = _namespace_metadata
+        self._constraints_included = _constraints_included
+
+    def reverse(self) -> DropTableOp:
+        return DropTableOp.from_table(
+            self.to_table(), _namespace_metadata=self._namespace_metadata
+        )
+
+    def to_diff_tuple(self) -> Tuple[str, Table]:
+        return ("add_table", self.to_table())
+
+    @classmethod
+    def from_table(
+        cls, table: Table, *, _namespace_metadata: Optional[MetaData] = None
+    ) -> CreateTableOp:
+        if _namespace_metadata is None:
+            _namespace_metadata = table.metadata
+
+        return cls(
+            table.name,
+            list(table.c) + list(table.constraints),
+            schema=table.schema,
+            _namespace_metadata=_namespace_metadata,
+            # given a Table() object, this Table will contain full Index()
+            # and UniqueConstraint objects already constructed in response to
+            # each unique=True / index=True flag on a Column.  Carry this
+            # state along so that when we re-convert back into a Table, we
+            # skip unique=True/index=True so that these constraints are
+            # not doubled up. see #844 #848
+            _constraints_included=True,
+            comment=table.comment,
+            info=dict(table.info),
+            prefixes=list(table._prefixes),
+            **table.kwargs,
+        )
+
+    def to_table(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> Table:
+        schema_obj = schemaobj.SchemaObjects(migration_context)
+
+        return schema_obj.table(
+            self.table_name,
+            *self.columns,
+            schema=self.schema,
+            prefixes=list(self.prefixes) if self.prefixes else [],
+            comment=self.comment,
+            info=self.info.copy() if self.info else {},
+            _constraints_included=self._constraints_included,
+            **self.kw,
+        )
+
+    @classmethod
+    def create_table(
+        cls,
+        operations: Operations,
+        table_name: str,
+        *columns: SchemaItem,
+        if_not_exists: Optional[bool] = None,
+        **kw: Any,
+    ) -> Table:
+        r"""Issue a "create table" instruction using the current migration
+        context.
+
+        This directive receives an argument list similar to that of the
+        traditional :class:`sqlalchemy.schema.Table` construct, but without the
+        metadata::
+
+            from sqlalchemy import INTEGER, VARCHAR, NVARCHAR, Column
+            from alembic import op
+
+            op.create_table(
+                "account",
+                Column("id", INTEGER, primary_key=True),
+                Column("name", VARCHAR(50), nullable=False),
+                Column("description", NVARCHAR(200)),
+                Column("timestamp", TIMESTAMP, server_default=func.now()),
+            )
+
+        Note that :meth:`.create_table` accepts
+        :class:`~sqlalchemy.schema.Column`
+        constructs directly from the SQLAlchemy library.  In particular,
+        default values to be created on the database side are
+        specified using the ``server_default`` parameter, and not
+        ``default`` which only specifies Python-side defaults::
+
+            from alembic import op
+            from sqlalchemy import Column, TIMESTAMP, func
+
+            # specify "DEFAULT NOW" along with the "timestamp" column
+            op.create_table(
+                "account",
+                Column("id", INTEGER, primary_key=True),
+                Column("timestamp", TIMESTAMP, server_default=func.now()),
+            )
+
+        The function also returns a newly created
+        :class:`~sqlalchemy.schema.Table` object, corresponding to the table
+        specification given, which is suitable for
+        immediate SQL operations, in particular
+        :meth:`.Operations.bulk_insert`::
+
+            from sqlalchemy import INTEGER, VARCHAR, NVARCHAR, Column
+            from alembic import op
+
+            account_table = op.create_table(
+                "account",
+                Column("id", INTEGER, primary_key=True),
+                Column("name", VARCHAR(50), nullable=False),
+                Column("description", NVARCHAR(200)),
+                Column("timestamp", TIMESTAMP, server_default=func.now()),
+            )
+
+            op.bulk_insert(
+                account_table,
+                [
+                    {"name": "A1", "description": "account 1"},
+                    {"name": "A2", "description": "account 2"},
+                ],
+            )
+
+        :param table_name: Name of the table
+        :param \*columns: collection of :class:`~sqlalchemy.schema.Column`
+         objects within
+         the table, as well as optional :class:`~sqlalchemy.schema.Constraint`
+         objects
+         and :class:`~.sqlalchemy.schema.Index` objects.
+        :param schema: Optional schema name to operate within.  To control
+         quoting of the schema outside of the default behavior, use
+         the SQLAlchemy construct
+         :class:`~sqlalchemy.sql.elements.quoted_name`.
+        :param if_not_exists: If True, adds IF NOT EXISTS operator when
+         creating the new table.
+
+         .. versionadded:: 1.13.3
+        :param \**kw: Other keyword arguments are passed to the underlying
+         :class:`sqlalchemy.schema.Table` object created for the command.
+
+        :return: the :class:`~sqlalchemy.schema.Table` object corresponding
+         to the parameters given.
+
+        """
+        op = cls(table_name, columns, if_not_exists=if_not_exists, **kw)
+        return operations.invoke(op)
+
+
+@Operations.register_operation("drop_table")
+class DropTableOp(MigrateOperation):
+    """Represent a drop table operation."""
+
+    def __init__(
+        self,
+        table_name: str,
+        *,
+        schema: Optional[str] = None,
+        if_exists: Optional[bool] = None,
+        table_kw: Optional[MutableMapping[Any, Any]] = None,
+        _reverse: Optional[CreateTableOp] = None,
+    ) -> None:
+        self.table_name = table_name
+        self.schema = schema
+        self.if_exists = if_exists
+        self.table_kw = table_kw or {}
+        self.comment = self.table_kw.pop("comment", None)
+        self.info = self.table_kw.pop("info", None)
+        self.prefixes = self.table_kw.pop("prefixes", None)
+        self._reverse = _reverse
+
+    def to_diff_tuple(self) -> Tuple[str, Table]:
+        return ("remove_table", self.to_table())
+
+    def reverse(self) -> CreateTableOp:
+        return CreateTableOp.from_table(self.to_table())
+
+    @classmethod
+    def from_table(
+        cls, table: Table, *, _namespace_metadata: Optional[MetaData] = None
+    ) -> DropTableOp:
+        return cls(
+            table.name,
+            schema=table.schema,
+            table_kw={
+                "comment": table.comment,
+                "info": dict(table.info),
+                "prefixes": list(table._prefixes),
+                **table.kwargs,
+            },
+            _reverse=CreateTableOp.from_table(
+                table, _namespace_metadata=_namespace_metadata
+            ),
+        )
+
+    def to_table(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> Table:
+        if self._reverse:
+            cols_and_constraints = self._reverse.columns
+        else:
+            cols_and_constraints = []
+
+        schema_obj = schemaobj.SchemaObjects(migration_context)
+        t = schema_obj.table(
+            self.table_name,
+            *cols_and_constraints,
+            comment=self.comment,
+            info=self.info.copy() if self.info else {},
+            prefixes=list(self.prefixes) if self.prefixes else [],
+            schema=self.schema,
+            _constraints_included=(
+                self._reverse._constraints_included if self._reverse else False
+            ),
+            **self.table_kw,
+        )
+        return t
+
+    @classmethod
+    def drop_table(
+        cls,
+        operations: Operations,
+        table_name: str,
+        *,
+        schema: Optional[str] = None,
+        if_exists: Optional[bool] = None,
+        **kw: Any,
+    ) -> None:
+        r"""Issue a "drop table" instruction using the current
+        migration context.
+
+
+        e.g.::
+
+            drop_table("accounts")
+
+        :param table_name: Name of the table
+        :param schema: Optional schema name to operate within.  To control
+         quoting of the schema outside of the default behavior, use
+         the SQLAlchemy construct
+         :class:`~sqlalchemy.sql.elements.quoted_name`.
+        :param if_exists: If True, adds IF EXISTS operator when
+         dropping the table.
+
+         .. versionadded:: 1.13.3
+        :param \**kw: Other keyword arguments are passed to the underlying
+         :class:`sqlalchemy.schema.Table` object created for the command.
+
+        """
+        op = cls(table_name, schema=schema, if_exists=if_exists, table_kw=kw)
+        operations.invoke(op)
+
+
+class AlterTableOp(MigrateOperation):
+    """Represent an alter table operation."""
+
+    def __init__(
+        self,
+        table_name: str,
+        *,
+        schema: Optional[str] = None,
+    ) -> None:
+        self.table_name = table_name
+        self.schema = schema
+
+
+@Operations.register_operation("rename_table")
+class RenameTableOp(AlterTableOp):
+    """Represent a rename table operation."""
+
+    def __init__(
+        self,
+        old_table_name: str,
+        new_table_name: str,
+        *,
+        schema: Optional[str] = None,
+    ) -> None:
+        super().__init__(old_table_name, schema=schema)
+        self.new_table_name = new_table_name
+
+    @classmethod
+    def rename_table(
+        cls,
+        operations: Operations,
+        old_table_name: str,
+        new_table_name: str,
+        *,
+        schema: Optional[str] = None,
+    ) -> None:
+        """Emit an ALTER TABLE to rename a table.
+
+        :param old_table_name: old name.
+        :param new_table_name: new name.
+        :param schema: Optional schema name to operate within.  To control
+         quoting of the schema outside of the default behavior, use
+         the SQLAlchemy construct
+         :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+        """
+        op = cls(old_table_name, new_table_name, schema=schema)
+        return operations.invoke(op)
+
+
+@Operations.register_operation("create_table_comment")
+@BatchOperations.register_operation(
+    "create_table_comment", "batch_create_table_comment"
+)
+class CreateTableCommentOp(AlterTableOp):
+    """Represent a COMMENT ON `table` operation."""
+
+    def __init__(
+        self,
+        table_name: str,
+        comment: Optional[str],
+        *,
+        schema: Optional[str] = None,
+        existing_comment: Optional[str] = None,
+    ) -> None:
+        self.table_name = table_name
+        self.comment = comment
+        self.existing_comment = existing_comment
+        self.schema = schema
+
+    @classmethod
+    def create_table_comment(
+        cls,
+        operations: Operations,
+        table_name: str,
+        comment: Optional[str],
+        *,
+        existing_comment: Optional[str] = None,
+        schema: Optional[str] = None,
+    ) -> None:
+        """Emit a COMMENT ON operation to set the comment for a table.
+
+        :param table_name: string name of the target table.
+        :param comment: string value of the comment being registered against
+         the specified table.
+        :param existing_comment: String value of a comment
+         already registered on the specified table, used within autogenerate
+         so that the operation is reversible, but not required for direct
+         use.
+
+        .. seealso::
+
+            :meth:`.Operations.drop_table_comment`
+
+            :paramref:`.Operations.alter_column.comment`
+
+        """
+
+        op = cls(
+            table_name,
+            comment,
+            existing_comment=existing_comment,
+            schema=schema,
+        )
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_create_table_comment(
+        cls,
+        operations: BatchOperations,
+        comment: Optional[str],
+        *,
+        existing_comment: Optional[str] = None,
+    ) -> None:
+        """Emit a COMMENT ON operation to set the comment for a table
+        using the current batch migration context.
+
+        :param comment: string value of the comment being registered against
+         the specified table.
+        :param existing_comment: String value of a comment
+         already registered on the specified table, used within autogenerate
+         so that the operation is reversible, but not required for direct
+         use.
+
+        """
+
+        op = cls(
+            operations.impl.table_name,
+            comment,
+            existing_comment=existing_comment,
+            schema=operations.impl.schema,
+        )
+        return operations.invoke(op)
+
+    def reverse(self) -> Union[CreateTableCommentOp, DropTableCommentOp]:
+        """Reverses the COMMENT ON operation against a table."""
+        if self.existing_comment is None:
+            return DropTableCommentOp(
+                self.table_name,
+                existing_comment=self.comment,
+                schema=self.schema,
+            )
+        else:
+            return CreateTableCommentOp(
+                self.table_name,
+                self.existing_comment,
+                existing_comment=self.comment,
+                schema=self.schema,
+            )
+
+    def to_table(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> Table:
+        schema_obj = schemaobj.SchemaObjects(migration_context)
+
+        return schema_obj.table(
+            self.table_name, schema=self.schema, comment=self.comment
+        )
+
+    def to_diff_tuple(self) -> Tuple[Any, ...]:
+        return ("add_table_comment", self.to_table(), self.existing_comment)
+
+
+@Operations.register_operation("drop_table_comment")
+@BatchOperations.register_operation(
+    "drop_table_comment", "batch_drop_table_comment"
+)
+class DropTableCommentOp(AlterTableOp):
+    """Represent an operation to remove the comment from a table."""
+
+    def __init__(
+        self,
+        table_name: str,
+        *,
+        schema: Optional[str] = None,
+        existing_comment: Optional[str] = None,
+    ) -> None:
+        self.table_name = table_name
+        self.existing_comment = existing_comment
+        self.schema = schema
+
+    @classmethod
+    def drop_table_comment(
+        cls,
+        operations: Operations,
+        table_name: str,
+        *,
+        existing_comment: Optional[str] = None,
+        schema: Optional[str] = None,
+    ) -> None:
+        """Issue a "drop table comment" operation to
+        remove an existing comment set on a table.
+
+        :param table_name: string name of the target table.
+        :param existing_comment: An optional string value of a comment already
+         registered on the specified table.
+
+        .. seealso::
+
+            :meth:`.Operations.create_table_comment`
+
+            :paramref:`.Operations.alter_column.comment`
+
+        """
+
+        op = cls(table_name, existing_comment=existing_comment, schema=schema)
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_drop_table_comment(
+        cls,
+        operations: BatchOperations,
+        *,
+        existing_comment: Optional[str] = None,
+    ) -> None:
+        """Issue a "drop table comment" operation to
+        remove an existing comment set on a table using the current
+        batch operations context.
+
+        :param existing_comment: An optional string value of a comment already
+         registered on the specified table.
+
+        """
+
+        op = cls(
+            operations.impl.table_name,
+            existing_comment=existing_comment,
+            schema=operations.impl.schema,
+        )
+        return operations.invoke(op)
+
+    def reverse(self) -> CreateTableCommentOp:
+        """Reverses the COMMENT ON operation against a table."""
+        return CreateTableCommentOp(
+            self.table_name, self.existing_comment, schema=self.schema
+        )
+
+    def to_table(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> Table:
+        schema_obj = schemaobj.SchemaObjects(migration_context)
+
+        return schema_obj.table(self.table_name, schema=self.schema)
+
+    def to_diff_tuple(self) -> Tuple[Any, ...]:
+        return ("remove_table_comment", self.to_table())
+
+
+@Operations.register_operation("alter_column")
+@BatchOperations.register_operation("alter_column", "batch_alter_column")
+class AlterColumnOp(AlterTableOp):
+    """Represent an alter column operation."""
+
+    def __init__(
+        self,
+        table_name: str,
+        column_name: str,
+        *,
+        schema: Optional[str] = None,
+        existing_type: Optional[Any] = None,
+        existing_server_default: Any = False,
+        existing_nullable: Optional[bool] = None,
+        existing_comment: Optional[str] = None,
+        modify_nullable: Optional[bool] = None,
+        modify_comment: Optional[Union[str, Literal[False]]] = False,
+        modify_server_default: Any = False,
+        modify_name: Optional[str] = None,
+        modify_type: Optional[Any] = None,
+        **kw: Any,
+    ) -> None:
+        super().__init__(table_name, schema=schema)
+        self.column_name = column_name
+        self.existing_type = existing_type
+        self.existing_server_default = existing_server_default
+        self.existing_nullable = existing_nullable
+        self.existing_comment = existing_comment
+        self.modify_nullable = modify_nullable
+        self.modify_comment = modify_comment
+        self.modify_server_default = modify_server_default
+        self.modify_name = modify_name
+        self.modify_type = modify_type
+        self.kw = kw
+
+    def to_diff_tuple(self) -> Any:
+        col_diff = []
+        schema, tname, cname = self.schema, self.table_name, self.column_name
+
+        if self.modify_type is not None:
+            col_diff.append(
+                (
+                    "modify_type",
+                    schema,
+                    tname,
+                    cname,
+                    {
+                        "existing_nullable": self.existing_nullable,
+                        "existing_server_default": (
+                            self.existing_server_default
+                        ),
+                        "existing_comment": self.existing_comment,
+                    },
+                    self.existing_type,
+                    self.modify_type,
+                )
+            )
+
+        if self.modify_nullable is not None:
+            col_diff.append(
+                (
+                    "modify_nullable",
+                    schema,
+                    tname,
+                    cname,
+                    {
+                        "existing_type": self.existing_type,
+                        "existing_server_default": (
+                            self.existing_server_default
+                        ),
+                        "existing_comment": self.existing_comment,
+                    },
+                    self.existing_nullable,
+                    self.modify_nullable,
+                )
+            )
+
+        if self.modify_server_default is not False:
+            col_diff.append(
+                (
+                    "modify_default",
+                    schema,
+                    tname,
+                    cname,
+                    {
+                        "existing_nullable": self.existing_nullable,
+                        "existing_type": self.existing_type,
+                        "existing_comment": self.existing_comment,
+                    },
+                    self.existing_server_default,
+                    self.modify_server_default,
+                )
+            )
+
+        if self.modify_comment is not False:
+            col_diff.append(
+                (
+                    "modify_comment",
+                    schema,
+                    tname,
+                    cname,
+                    {
+                        "existing_nullable": self.existing_nullable,
+                        "existing_type": self.existing_type,
+                        "existing_server_default": (
+                            self.existing_server_default
+                        ),
+                    },
+                    self.existing_comment,
+                    self.modify_comment,
+                )
+            )
+
+        return col_diff
+
+    def has_changes(self) -> bool:
+        hc1 = (
+            self.modify_nullable is not None
+            or self.modify_server_default is not False
+            or self.modify_type is not None
+            or self.modify_comment is not False
+        )
+        if hc1:
+            return True
+        for kw in self.kw:
+            if kw.startswith("modify_"):
+                return True
+        else:
+            return False
+
+    def reverse(self) -> AlterColumnOp:
+        kw = self.kw.copy()
+        kw["existing_type"] = self.existing_type
+        kw["existing_nullable"] = self.existing_nullable
+        kw["existing_server_default"] = self.existing_server_default
+        kw["existing_comment"] = self.existing_comment
+        if self.modify_type is not None:
+            kw["modify_type"] = self.modify_type
+        if self.modify_nullable is not None:
+            kw["modify_nullable"] = self.modify_nullable
+        if self.modify_server_default is not False:
+            kw["modify_server_default"] = self.modify_server_default
+        if self.modify_comment is not False:
+            kw["modify_comment"] = self.modify_comment
+
+        # TODO: make this a little simpler
+        all_keys = {
+            m.group(1)
+            for m in [re.match(r"^(?:existing_|modify_)(.+)$", k) for k in kw]
+            if m
+        }
+
+        for k in all_keys:
+            if "modify_%s" % k in kw:
+                swap = kw["existing_%s" % k]
+                kw["existing_%s" % k] = kw["modify_%s" % k]
+                kw["modify_%s" % k] = swap
+
+        return self.__class__(
+            self.table_name, self.column_name, schema=self.schema, **kw
+        )
+
+    @classmethod
+    def alter_column(
+        cls,
+        operations: Operations,
+        table_name: str,
+        column_name: str,
+        *,
+        nullable: Optional[bool] = None,
+        comment: Optional[Union[str, Literal[False]]] = False,
+        server_default: Any = False,
+        new_column_name: Optional[str] = None,
+        type_: Optional[Union[TypeEngine[Any], Type[TypeEngine[Any]]]] = None,
+        existing_type: Optional[
+            Union[TypeEngine[Any], Type[TypeEngine[Any]]]
+        ] = None,
+        existing_server_default: Optional[
+            Union[str, bool, Identity, Computed]
+        ] = False,
+        existing_nullable: Optional[bool] = None,
+        existing_comment: Optional[str] = None,
+        schema: Optional[str] = None,
+        **kw: Any,
+    ) -> None:
+        r"""Issue an "alter column" instruction using the
+        current migration context.
+
+        Generally, only that aspect of the column which
+        is being changed, i.e. name, type, nullability,
+        default, needs to be specified.  Multiple changes
+        can also be specified at once and the backend should
+        "do the right thing", emitting each change either
+        separately or together as the backend allows.
+
+        MySQL has special requirements here, since MySQL
+        cannot ALTER a column without a full specification.
+        When producing MySQL-compatible migration files,
+        it is recommended that the ``existing_type``,
+        ``existing_server_default``, and ``existing_nullable``
+        parameters be present, if not being altered.
+
+        Type changes which are against the SQLAlchemy
+        "schema" types :class:`~sqlalchemy.types.Boolean`
+        and  :class:`~sqlalchemy.types.Enum` may also
+        add or drop constraints which accompany those
+        types on backends that don't support them natively.
+        The ``existing_type`` argument is
+        used in this case to identify and remove a previous
+        constraint that was bound to the type object.
+
+        :param table_name: string name of the target table.
+        :param column_name: string name of the target column,
+         as it exists before the operation begins.
+        :param nullable: Optional; specify ``True`` or ``False``
+         to alter the column's nullability.
+        :param server_default: Optional; specify a string
+         SQL expression, :func:`~sqlalchemy.sql.expression.text`,
+         or :class:`~sqlalchemy.schema.DefaultClause` to indicate
+         an alteration to the column's default value.
+         Set to ``None`` to have the default removed.
+        :param comment: optional string text of a new comment to add to the
+         column.
+        :param new_column_name: Optional; specify a string name here to
+         indicate the new name within a column rename operation.
+        :param type\_: Optional; a :class:`~sqlalchemy.types.TypeEngine`
+         type object to specify a change to the column's type.
+         For SQLAlchemy types that also indicate a constraint (i.e.
+         :class:`~sqlalchemy.types.Boolean`, :class:`~sqlalchemy.types.Enum`),
+         the constraint is also generated.
+        :param autoincrement: set the ``AUTO_INCREMENT`` flag of the column;
+         currently understood by the MySQL dialect.
+        :param existing_type: Optional; a
+         :class:`~sqlalchemy.types.TypeEngine`
+         type object to specify the previous type.   This
+         is required for all MySQL column alter operations that
+         don't otherwise specify a new type, as well as for
+         when nullability is being changed on a SQL Server
+         column.  It is also used if the type is a so-called
+         SQLAlchemy "schema" type which may define a constraint (i.e.
+         :class:`~sqlalchemy.types.Boolean`,
+         :class:`~sqlalchemy.types.Enum`),
+         so that the constraint can be dropped.
+        :param existing_server_default: Optional; The existing
+         default value of the column.   Required on MySQL if
+         an existing default is not being changed; else MySQL
+         removes the default.
+        :param existing_nullable: Optional; the existing nullability
+         of the column.  Required on MySQL if the existing nullability
+         is not being changed; else MySQL sets this to NULL.
+        :param existing_autoincrement: Optional; the existing autoincrement
+         of the column.  Used for MySQL's system of altering a column
+         that specifies ``AUTO_INCREMENT``.
+        :param existing_comment: string text of the existing comment on the
+         column to be maintained.  Required on MySQL if the existing comment
+         on the column is not being changed.
+        :param schema: Optional schema name to operate within.  To control
+         quoting of the schema outside of the default behavior, use
+         the SQLAlchemy construct
+         :class:`~sqlalchemy.sql.elements.quoted_name`.
+        :param postgresql_using: String argument which will indicate a
+         SQL expression to render within the Postgresql-specific USING clause
+         within ALTER COLUMN.    This string is taken directly as raw SQL which
+         must explicitly include any necessary quoting or escaping of tokens
+         within the expression.
+
+        """
+
+        alt = cls(
+            table_name,
+            column_name,
+            schema=schema,
+            existing_type=existing_type,
+            existing_server_default=existing_server_default,
+            existing_nullable=existing_nullable,
+            existing_comment=existing_comment,
+            modify_name=new_column_name,
+            modify_type=type_,
+            modify_server_default=server_default,
+            modify_nullable=nullable,
+            modify_comment=comment,
+            **kw,
+        )
+
+        return operations.invoke(alt)
+
+    @classmethod
+    def batch_alter_column(
+        cls,
+        operations: BatchOperations,
+        column_name: str,
+        *,
+        nullable: Optional[bool] = None,
+        comment: Optional[Union[str, Literal[False]]] = False,
+        server_default: Any = False,
+        new_column_name: Optional[str] = None,
+        type_: Optional[Union[TypeEngine[Any], Type[TypeEngine[Any]]]] = None,
+        existing_type: Optional[
+            Union[TypeEngine[Any], Type[TypeEngine[Any]]]
+        ] = None,
+        existing_server_default: Optional[
+            Union[str, bool, Identity, Computed]
+        ] = False,
+        existing_nullable: Optional[bool] = None,
+        existing_comment: Optional[str] = None,
+        insert_before: Optional[str] = None,
+        insert_after: Optional[str] = None,
+        **kw: Any,
+    ) -> None:
+        """Issue an "alter column" instruction using the current
+        batch migration context.
+
+        Parameters are the same as that of :meth:`.Operations.alter_column`,
+        as well as the following option(s):
+
+        :param insert_before: String name of an existing column which this
+         column should be placed before, when creating the new table.
+
+        :param insert_after: String name of an existing column which this
+         column should be placed after, when creating the new table.  If
+         both :paramref:`.BatchOperations.alter_column.insert_before`
+         and :paramref:`.BatchOperations.alter_column.insert_after` are
+         omitted, the column is inserted after the last existing column
+         in the table.
+
+        .. seealso::
+
+            :meth:`.Operations.alter_column`
+
+
+        """
+        alt = cls(
+            operations.impl.table_name,
+            column_name,
+            schema=operations.impl.schema,
+            existing_type=existing_type,
+            existing_server_default=existing_server_default,
+            existing_nullable=existing_nullable,
+            existing_comment=existing_comment,
+            modify_name=new_column_name,
+            modify_type=type_,
+            modify_server_default=server_default,
+            modify_nullable=nullable,
+            modify_comment=comment,
+            insert_before=insert_before,
+            insert_after=insert_after,
+            **kw,
+        )
+
+        return operations.invoke(alt)
+
+
+@Operations.register_operation("add_column")
+@BatchOperations.register_operation("add_column", "batch_add_column")
+class AddColumnOp(AlterTableOp):
+    """Represent an add column operation."""
+
+    def __init__(
+        self,
+        table_name: str,
+        column: Column[Any],
+        *,
+        schema: Optional[str] = None,
+        **kw: Any,
+    ) -> None:
+        super().__init__(table_name, schema=schema)
+        self.column = column
+        self.kw = kw
+
+    def reverse(self) -> DropColumnOp:
+        return DropColumnOp.from_column_and_tablename(
+            self.schema, self.table_name, self.column
+        )
+
+    def to_diff_tuple(
+        self,
+    ) -> Tuple[str, Optional[str], str, Column[Any]]:
+        return ("add_column", self.schema, self.table_name, self.column)
+
+    def to_column(self) -> Column[Any]:
+        return self.column
+
+    @classmethod
+    def from_column(cls, col: Column[Any]) -> AddColumnOp:
+        return cls(col.table.name, col, schema=col.table.schema)
+
+    @classmethod
+    def from_column_and_tablename(
+        cls,
+        schema: Optional[str],
+        tname: str,
+        col: Column[Any],
+    ) -> AddColumnOp:
+        return cls(tname, col, schema=schema)
+
+    @classmethod
+    def add_column(
+        cls,
+        operations: Operations,
+        table_name: str,
+        column: Column[Any],
+        *,
+        schema: Optional[str] = None,
+    ) -> None:
+        """Issue an "add column" instruction using the current
+        migration context.
+
+        e.g.::
+
+            from alembic import op
+            from sqlalchemy import Column, String
+
+            op.add_column("organization", Column("name", String()))
+
+        The :meth:`.Operations.add_column` method typically corresponds
+        to the SQL command "ALTER TABLE... ADD COLUMN".    Within the scope
+        of this command, the column's name, datatype, nullability,
+        and optional server-generated defaults may be indicated.
+
+        .. note::
+
+            With the exception of NOT NULL constraints or single-column FOREIGN
+            KEY constraints, other kinds of constraints such as PRIMARY KEY,
+            UNIQUE or CHECK constraints **cannot** be generated using this
+            method; for these constraints, refer to operations such as
+            :meth:`.Operations.create_primary_key` and
+            :meth:`.Operations.create_check_constraint`. In particular, the
+            following :class:`~sqlalchemy.schema.Column` parameters are
+            **ignored**:
+
+            * :paramref:`~sqlalchemy.schema.Column.primary_key` - SQL databases
+              typically do not support an ALTER operation that can add
+              individual columns one at a time to an existing primary key
+              constraint, therefore it's less ambiguous to use the
+              :meth:`.Operations.create_primary_key` method, which assumes no
+              existing primary key constraint is present.
+            * :paramref:`~sqlalchemy.schema.Column.unique` - use the
+              :meth:`.Operations.create_unique_constraint` method
+            * :paramref:`~sqlalchemy.schema.Column.index` - use the
+              :meth:`.Operations.create_index` method
+
+
+        The provided :class:`~sqlalchemy.schema.Column` object may include a
+        :class:`~sqlalchemy.schema.ForeignKey` constraint directive,
+        referencing a remote table name. For this specific type of constraint,
+        Alembic will automatically emit a second ALTER statement in order to
+        add the single-column FOREIGN KEY constraint separately::
+
+            from alembic import op
+            from sqlalchemy import Column, INTEGER, ForeignKey
+
+            op.add_column(
+                "organization",
+                Column("account_id", INTEGER, ForeignKey("accounts.id")),
+            )
+
+        The column argument passed to :meth:`.Operations.add_column` is a
+        :class:`~sqlalchemy.schema.Column` construct, used in the same way it's
+        used in SQLAlchemy. In particular, values or functions to be indicated
+        as producing the column's default value on the database side are
+        specified using the ``server_default`` parameter, and not ``default``
+        which only specifies Python-side defaults::
+
+            from alembic import op
+            from sqlalchemy import Column, TIMESTAMP, func
+
+            # specify "DEFAULT NOW" along with the column add
+            op.add_column(
+                "account",
+                Column("timestamp", TIMESTAMP, server_default=func.now()),
+            )
+
+        :param table_name: String name of the parent table.
+        :param column: a :class:`sqlalchemy.schema.Column` object
+         representing the new column.
+        :param schema: Optional schema name to operate within.  To control
+         quoting of the schema outside of the default behavior, use
+         the SQLAlchemy construct
+         :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+        """
+
+        op = cls(table_name, column, schema=schema)
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_add_column(
+        cls,
+        operations: BatchOperations,
+        column: Column[Any],
+        *,
+        insert_before: Optional[str] = None,
+        insert_after: Optional[str] = None,
+    ) -> None:
+        """Issue an "add column" instruction using the current
+        batch migration context.
+
+        .. seealso::
+
+            :meth:`.Operations.add_column`
+
+        """
+
+        kw = {}
+        if insert_before:
+            kw["insert_before"] = insert_before
+        if insert_after:
+            kw["insert_after"] = insert_after
+
+        op = cls(
+            operations.impl.table_name,
+            column,
+            schema=operations.impl.schema,
+            **kw,
+        )
+        return operations.invoke(op)
+
+
+@Operations.register_operation("drop_column")
+@BatchOperations.register_operation("drop_column", "batch_drop_column")
+class DropColumnOp(AlterTableOp):
+    """Represent a drop column operation."""
+
+    def __init__(
+        self,
+        table_name: str,
+        column_name: str,
+        *,
+        schema: Optional[str] = None,
+        _reverse: Optional[AddColumnOp] = None,
+        **kw: Any,
+    ) -> None:
+        super().__init__(table_name, schema=schema)
+        self.column_name = column_name
+        self.kw = kw
+        self._reverse = _reverse
+
+    def to_diff_tuple(
+        self,
+    ) -> Tuple[str, Optional[str], str, Column[Any]]:
+        return (
+            "remove_column",
+            self.schema,
+            self.table_name,
+            self.to_column(),
+        )
+
+    def reverse(self) -> AddColumnOp:
+        if self._reverse is None:
+            raise ValueError(
+                "operation is not reversible; "
+                "original column is not present"
+            )
+
+        return AddColumnOp.from_column_and_tablename(
+            self.schema, self.table_name, self._reverse.column
+        )
+
+    @classmethod
+    def from_column_and_tablename(
+        cls,
+        schema: Optional[str],
+        tname: str,
+        col: Column[Any],
+    ) -> DropColumnOp:
+        return cls(
+            tname,
+            col.name,
+            schema=schema,
+            _reverse=AddColumnOp.from_column_and_tablename(schema, tname, col),
+        )
+
+    def to_column(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> Column[Any]:
+        if self._reverse is not None:
+            return self._reverse.column
+        schema_obj = schemaobj.SchemaObjects(migration_context)
+        return schema_obj.column(self.column_name, NULLTYPE)
+
+    @classmethod
+    def drop_column(
+        cls,
+        operations: Operations,
+        table_name: str,
+        column_name: str,
+        *,
+        schema: Optional[str] = None,
+        **kw: Any,
+    ) -> None:
+        """Issue a "drop column" instruction using the current
+        migration context.
+
+        e.g.::
+
+            drop_column("organization", "account_id")
+
+        :param table_name: name of table
+        :param column_name: name of column
+        :param schema: Optional schema name to operate within.  To control
+         quoting of the schema outside of the default behavior, use
+         the SQLAlchemy construct
+         :class:`~sqlalchemy.sql.elements.quoted_name`.
+        :param mssql_drop_check: Optional boolean.  When ``True``, on
+         Microsoft SQL Server only, first
+         drop the CHECK constraint on the column using a
+         SQL-script-compatible
+         block that selects into a @variable from sys.check_constraints,
+         then exec's a separate DROP CONSTRAINT for that constraint.
+        :param mssql_drop_default: Optional boolean.  When ``True``, on
+         Microsoft SQL Server only, first
+         drop the DEFAULT constraint on the column using a
+         SQL-script-compatible
+         block that selects into a @variable from sys.default_constraints,
+         then exec's a separate DROP CONSTRAINT for that default.
+        :param mssql_drop_foreign_key: Optional boolean.  When ``True``, on
+         Microsoft SQL Server only, first
+         drop a single FOREIGN KEY constraint on the column using a
+         SQL-script-compatible
+         block that selects into a @variable from
+         sys.foreign_keys/sys.foreign_key_columns,
+         then exec's a separate DROP CONSTRAINT for that default.  Only
+         works if the column has exactly one FK constraint which refers to
+         it, at the moment.
+
+        """
+
+        op = cls(table_name, column_name, schema=schema, **kw)
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_drop_column(
+        cls, operations: BatchOperations, column_name: str, **kw: Any
+    ) -> None:
+        """Issue a "drop column" instruction using the current
+        batch migration context.
+
+        .. seealso::
+
+            :meth:`.Operations.drop_column`
+
+        """
+        op = cls(
+            operations.impl.table_name,
+            column_name,
+            schema=operations.impl.schema,
+            **kw,
+        )
+        return operations.invoke(op)
+
+
+@Operations.register_operation("bulk_insert")
+class BulkInsertOp(MigrateOperation):
+    """Represent a bulk insert operation."""
+
+    def __init__(
+        self,
+        table: Union[Table, TableClause],
+        rows: List[Dict[str, Any]],
+        *,
+        multiinsert: bool = True,
+    ) -> None:
+        self.table = table
+        self.rows = rows
+        self.multiinsert = multiinsert
+
+    @classmethod
+    def bulk_insert(
+        cls,
+        operations: Operations,
+        table: Union[Table, TableClause],
+        rows: List[Dict[str, Any]],
+        *,
+        multiinsert: bool = True,
+    ) -> None:
+        """Issue a "bulk insert" operation using the current
+        migration context.
+
+        This provides a means of representing an INSERT of multiple rows
+        which works equally well in the context of executing on a live
+        connection as well as that of generating a SQL script.   In the
+        case of a SQL script, the values are rendered inline into the
+        statement.
+
+        e.g.::
+
+            from alembic import op
+            from datetime import date
+            from sqlalchemy.sql import table, column
+            from sqlalchemy import String, Integer, Date
+
+            # Create an ad-hoc table to use for the insert statement.
+            accounts_table = table(
+                "account",
+                column("id", Integer),
+                column("name", String),
+                column("create_date", Date),
+            )
+
+            op.bulk_insert(
+                accounts_table,
+                [
+                    {
+                        "id": 1,
+                        "name": "John Smith",
+                        "create_date": date(2010, 10, 5),
+                    },
+                    {
+                        "id": 2,
+                        "name": "Ed Williams",
+                        "create_date": date(2007, 5, 27),
+                    },
+                    {
+                        "id": 3,
+                        "name": "Wendy Jones",
+                        "create_date": date(2008, 8, 15),
+                    },
+                ],
+            )
+
+        When using --sql mode, some datatypes may not render inline
+        automatically, such as dates and other special types.   When this
+        issue is present, :meth:`.Operations.inline_literal` may be used::
+
+            op.bulk_insert(
+                accounts_table,
+                [
+                    {
+                        "id": 1,
+                        "name": "John Smith",
+                        "create_date": op.inline_literal("2010-10-05"),
+                    },
+                    {
+                        "id": 2,
+                        "name": "Ed Williams",
+                        "create_date": op.inline_literal("2007-05-27"),
+                    },
+                    {
+                        "id": 3,
+                        "name": "Wendy Jones",
+                        "create_date": op.inline_literal("2008-08-15"),
+                    },
+                ],
+                multiinsert=False,
+            )
+
+        When using :meth:`.Operations.inline_literal` in conjunction with
+        :meth:`.Operations.bulk_insert`, in order for the statement to work
+        in "online" (e.g. non --sql) mode, the
+        :paramref:`~.Operations.bulk_insert.multiinsert`
+        flag should be set to ``False``, which will have the effect of
+        individual INSERT statements being emitted to the database, each
+        with a distinct VALUES clause, so that the "inline" values can
+        still be rendered, rather than attempting to pass the values
+        as bound parameters.
+
+        :param table: a table object which represents the target of the INSERT.
+
+        :param rows: a list of dictionaries indicating rows.
+
+        :param multiinsert: when at its default of True and --sql mode is not
+           enabled, the INSERT statement will be executed using
+           "executemany()" style, where all elements in the list of
+           dictionaries are passed as bound parameters in a single
+           list.   Setting this to False results in individual INSERT
+           statements being emitted per parameter set, and is needed
+           in those cases where non-literal values are present in the
+           parameter sets.
+
+        """
+
+        op = cls(table, rows, multiinsert=multiinsert)
+        operations.invoke(op)
+
+
+@Operations.register_operation("execute")
+@BatchOperations.register_operation("execute", "batch_execute")
+class ExecuteSQLOp(MigrateOperation):
+    """Represent an execute SQL operation."""
+
+    def __init__(
+        self,
+        sqltext: Union[Executable, str],
+        *,
+        execution_options: Optional[dict[str, Any]] = None,
+    ) -> None:
+        self.sqltext = sqltext
+        self.execution_options = execution_options
+
+    @classmethod
+    def execute(
+        cls,
+        operations: Operations,
+        sqltext: Union[Executable, str],
+        *,
+        execution_options: Optional[dict[str, Any]] = None,
+    ) -> None:
+        r"""Execute the given SQL using the current migration context.
+
+        The given SQL can be a plain string, e.g.::
+
+            op.execute("INSERT INTO table (foo) VALUES ('some value')")
+
+        Or it can be any kind of Core SQL Expression construct, such as
+        below where we use an update construct::
+
+            from sqlalchemy.sql import table, column
+            from sqlalchemy import String
+            from alembic import op
+
+            account = table("account", column("name", String))
+            op.execute(
+                account.update()
+                .where(account.c.name == op.inline_literal("account 1"))
+                .values({"name": op.inline_literal("account 2")})
+            )
+
+        Above, we made use of the SQLAlchemy
+        :func:`sqlalchemy.sql.expression.table` and
+        :func:`sqlalchemy.sql.expression.column` constructs to make a brief,
+        ad-hoc table construct just for our UPDATE statement.  A full
+        :class:`~sqlalchemy.schema.Table` construct of course works perfectly
+        fine as well, though note it's a recommended practice to at least
+        ensure the definition of a table is self-contained within the migration
+        script, rather than imported from a module that may break compatibility
+        with older migrations.
+
+        In a SQL script context, the statement is emitted directly to the
+        output stream.   There is *no* return result, however, as this
+        function is oriented towards generating a change script
+        that can run in "offline" mode.     Additionally, parameterized
+        statements are discouraged here, as they *will not work* in offline
+        mode.  Above, we use :meth:`.inline_literal` where parameters are
+        to be used.
+
+        For full interaction with a connected database where parameters can
+        also be used normally, use the "bind" available from the context::
+
+            from alembic import op
+
+            connection = op.get_bind()
+
+            connection.execute(
+                account.update()
+                .where(account.c.name == "account 1")
+                .values({"name": "account 2"})
+            )
+
+        Additionally, when passing the statement as a plain string, it is first
+        coerced into a :func:`sqlalchemy.sql.expression.text` construct
+        before being passed along.  In the less likely case that the
+        literal SQL string contains a colon, it must be escaped with a
+        backslash, as::
+
+           op.execute(r"INSERT INTO table (foo) VALUES ('\:colon_value')")
+
+
+        :param sqltext: Any legal SQLAlchemy expression, including:
+
+        * a string
+        * a :func:`sqlalchemy.sql.expression.text` construct.
+        * a :func:`sqlalchemy.sql.expression.insert` construct.
+        * a :func:`sqlalchemy.sql.expression.update` construct.
+        * a :func:`sqlalchemy.sql.expression.delete` construct.
+        * Any "executable" described in SQLAlchemy Core documentation,
+          noting that no result set is returned.
+
+        .. note::  when passing a plain string, the statement is coerced into
+           a :func:`sqlalchemy.sql.expression.text` construct. This construct
+           considers symbols with colons, e.g. ``:foo`` to be bound parameters.
+           To avoid this, ensure that colon symbols are escaped, e.g.
+           ``\:foo``.
+
+        :param execution_options: Optional dictionary of
+         execution options, will be passed to
+         :meth:`sqlalchemy.engine.Connection.execution_options`.
+        """
+        op = cls(sqltext, execution_options=execution_options)
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_execute(
+        cls,
+        operations: Operations,
+        sqltext: Union[Executable, str],
+        *,
+        execution_options: Optional[dict[str, Any]] = None,
+    ) -> None:
+        """Execute the given SQL using the current migration context.
+
+        .. seealso::
+
+            :meth:`.Operations.execute`
+
+        """
+        return cls.execute(
+            operations, sqltext, execution_options=execution_options
+        )
+
+    def to_diff_tuple(self) -> Tuple[str, Union[Executable, str]]:
+        return ("execute", self.sqltext)
+
+
+class OpContainer(MigrateOperation):
+    """Represent a sequence of operations operation."""
+
+    def __init__(self, ops: Sequence[MigrateOperation] = ()) -> None:
+        self.ops = list(ops)
+
+    def is_empty(self) -> bool:
+        return not self.ops
+
+    def as_diffs(self) -> Any:
+        return list(OpContainer._ops_as_diffs(self))
+
+    @classmethod
+    def _ops_as_diffs(
+        cls, migrations: OpContainer
+    ) -> Iterator[Tuple[Any, ...]]:
+        for op in migrations.ops:
+            if hasattr(op, "ops"):
+                yield from cls._ops_as_diffs(cast("OpContainer", op))
+            else:
+                yield op.to_diff_tuple()
+
+
+class ModifyTableOps(OpContainer):
+    """Contains a sequence of operations that all apply to a single Table."""
+
+    def __init__(
+        self,
+        table_name: str,
+        ops: Sequence[MigrateOperation],
+        *,
+        schema: Optional[str] = None,
+    ) -> None:
+        super().__init__(ops)
+        self.table_name = table_name
+        self.schema = schema
+
+    def reverse(self) -> ModifyTableOps:
+        return ModifyTableOps(
+            self.table_name,
+            ops=list(reversed([op.reverse() for op in self.ops])),
+            schema=self.schema,
+        )
+
+
+class UpgradeOps(OpContainer):
+    """contains a sequence of operations that would apply to the
+    'upgrade' stream of a script.
+
+    .. seealso::
+
+        :ref:`customizing_revision`
+
+    """
+
+    def __init__(
+        self,
+        ops: Sequence[MigrateOperation] = (),
+        upgrade_token: str = "upgrades",
+    ) -> None:
+        super().__init__(ops=ops)
+        self.upgrade_token = upgrade_token
+
+    def reverse_into(self, downgrade_ops: DowngradeOps) -> DowngradeOps:
+        downgrade_ops.ops[:] = list(
+            reversed([op.reverse() for op in self.ops])
+        )
+        return downgrade_ops
+
+    def reverse(self) -> DowngradeOps:
+        return self.reverse_into(DowngradeOps(ops=[]))
+
+
+class DowngradeOps(OpContainer):
+    """contains a sequence of operations that would apply to the
+    'downgrade' stream of a script.
+
+    .. seealso::
+
+        :ref:`customizing_revision`
+
+    """
+
+    def __init__(
+        self,
+        ops: Sequence[MigrateOperation] = (),
+        downgrade_token: str = "downgrades",
+    ) -> None:
+        super().__init__(ops=ops)
+        self.downgrade_token = downgrade_token
+
+    def reverse(self) -> UpgradeOps:
+        return UpgradeOps(
+            ops=list(reversed([op.reverse() for op in self.ops]))
+        )
+
+
+class MigrationScript(MigrateOperation):
+    """represents a migration script.
+
+    E.g. when autogenerate encounters this object, this corresponds to the
+    production of an actual script file.
+
+    A normal :class:`.MigrationScript` object would contain a single
+    :class:`.UpgradeOps` and a single :class:`.DowngradeOps` directive.
+    These are accessible via the ``.upgrade_ops`` and ``.downgrade_ops``
+    attributes.
+
+    In the case of an autogenerate operation that runs multiple times,
+    such as the multiple database example in the "multidb" template,
+    the ``.upgrade_ops`` and ``.downgrade_ops`` attributes are disabled,
+    and instead these objects should be accessed via the ``.upgrade_ops_list``
+    and ``.downgrade_ops_list`` list-based attributes.  These latter
+    attributes are always available at the very least as single-element lists.
+
+    .. seealso::
+
+        :ref:`customizing_revision`
+
+    """
+
+    _needs_render: Optional[bool]
+    _upgrade_ops: List[UpgradeOps]
+    _downgrade_ops: List[DowngradeOps]
+
+    def __init__(
+        self,
+        rev_id: Optional[str],
+        upgrade_ops: UpgradeOps,
+        downgrade_ops: DowngradeOps,
+        *,
+        message: Optional[str] = None,
+        imports: Set[str] = set(),
+        head: Optional[str] = None,
+        splice: Optional[bool] = None,
+        branch_label: Optional[_RevIdType] = None,
+        version_path: Optional[str] = None,
+        depends_on: Optional[_RevIdType] = None,
+    ) -> None:
+        self.rev_id = rev_id
+        self.message = message
+        self.imports = imports
+        self.head = head
+        self.splice = splice
+        self.branch_label = branch_label
+        self.version_path = version_path
+        self.depends_on = depends_on
+        self.upgrade_ops = upgrade_ops
+        self.downgrade_ops = downgrade_ops
+
+    @property
+    def upgrade_ops(self) -> Optional[UpgradeOps]:
+        """An instance of :class:`.UpgradeOps`.
+
+        .. seealso::
+
+            :attr:`.MigrationScript.upgrade_ops_list`
+        """
+        if len(self._upgrade_ops) > 1:
+            raise ValueError(
+                "This MigrationScript instance has a multiple-entry "
+                "list for UpgradeOps; please use the "
+                "upgrade_ops_list attribute."
+            )
+        elif not self._upgrade_ops:
+            return None
+        else:
+            return self._upgrade_ops[0]
+
+    @upgrade_ops.setter
+    def upgrade_ops(
+        self, upgrade_ops: Union[UpgradeOps, List[UpgradeOps]]
+    ) -> None:
+        self._upgrade_ops = util.to_list(upgrade_ops)
+        for elem in self._upgrade_ops:
+            assert isinstance(elem, UpgradeOps)
+
+    @property
+    def downgrade_ops(self) -> Optional[DowngradeOps]:
+        """An instance of :class:`.DowngradeOps`.
+
+        .. seealso::
+
+            :attr:`.MigrationScript.downgrade_ops_list`
+        """
+        if len(self._downgrade_ops) > 1:
+            raise ValueError(
+                "This MigrationScript instance has a multiple-entry "
+                "list for DowngradeOps; please use the "
+                "downgrade_ops_list attribute."
+            )
+        elif not self._downgrade_ops:
+            return None
+        else:
+            return self._downgrade_ops[0]
+
+    @downgrade_ops.setter
+    def downgrade_ops(
+        self, downgrade_ops: Union[DowngradeOps, List[DowngradeOps]]
+    ) -> None:
+        self._downgrade_ops = util.to_list(downgrade_ops)
+        for elem in self._downgrade_ops:
+            assert isinstance(elem, DowngradeOps)
+
+    @property
+    def upgrade_ops_list(self) -> List[UpgradeOps]:
+        """A list of :class:`.UpgradeOps` instances.
+
+        This is used in place of the :attr:`.MigrationScript.upgrade_ops`
+        attribute when dealing with a revision operation that does
+        multiple autogenerate passes.
+
+        """
+        return self._upgrade_ops
+
+    @property
+    def downgrade_ops_list(self) -> List[DowngradeOps]:
+        """A list of :class:`.DowngradeOps` instances.
+
+        This is used in place of the :attr:`.MigrationScript.downgrade_ops`
+        attribute when dealing with a revision operation that does
+        multiple autogenerate passes.
+
+        """
+        return self._downgrade_ops
diff --git a/.venv/lib/python3.12/site-packages/alembic/operations/schemaobj.py b/.venv/lib/python3.12/site-packages/alembic/operations/schemaobj.py
new file mode 100644
index 00000000..59c1002f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/operations/schemaobj.py
@@ -0,0 +1,290 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from __future__ import annotations
+
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy import schema as sa_schema
+from sqlalchemy.sql.schema import Column
+from sqlalchemy.sql.schema import Constraint
+from sqlalchemy.sql.schema import Index
+from sqlalchemy.types import Integer
+from sqlalchemy.types import NULLTYPE
+
+from .. import util
+from ..util import sqla_compat
+
+if TYPE_CHECKING:
+    from sqlalchemy.sql.elements import ColumnElement
+    from sqlalchemy.sql.elements import TextClause
+    from sqlalchemy.sql.schema import CheckConstraint
+    from sqlalchemy.sql.schema import ForeignKey
+    from sqlalchemy.sql.schema import ForeignKeyConstraint
+    from sqlalchemy.sql.schema import MetaData
+    from sqlalchemy.sql.schema import PrimaryKeyConstraint
+    from sqlalchemy.sql.schema import Table
+    from sqlalchemy.sql.schema import UniqueConstraint
+    from sqlalchemy.sql.type_api import TypeEngine
+
+    from ..runtime.migration import MigrationContext
+
+
+class SchemaObjects:
+    def __init__(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> None:
+        self.migration_context = migration_context
+
+    def primary_key_constraint(
+        self,
+        name: Optional[sqla_compat._ConstraintNameDefined],
+        table_name: str,
+        cols: Sequence[str],
+        schema: Optional[str] = None,
+        **dialect_kw,
+    ) -> PrimaryKeyConstraint:
+        m = self.metadata()
+        columns = [sa_schema.Column(n, NULLTYPE) for n in cols]
+        t = sa_schema.Table(table_name, m, *columns, schema=schema)
+        # SQLAlchemy primary key constraint name arg is wrongly typed on
+        # the SQLAlchemy side through 2.0.5 at least
+        p = sa_schema.PrimaryKeyConstraint(
+            *[t.c[n] for n in cols], name=name, **dialect_kw  # type: ignore
+        )
+        return p
+
+    def foreign_key_constraint(
+        self,
+        name: Optional[sqla_compat._ConstraintNameDefined],
+        source: str,
+        referent: str,
+        local_cols: List[str],
+        remote_cols: List[str],
+        onupdate: Optional[str] = None,
+        ondelete: Optional[str] = None,
+        deferrable: Optional[bool] = None,
+        source_schema: Optional[str] = None,
+        referent_schema: Optional[str] = None,
+        initially: Optional[str] = None,
+        match: Optional[str] = None,
+        **dialect_kw,
+    ) -> ForeignKeyConstraint:
+        m = self.metadata()
+        if source == referent and source_schema == referent_schema:
+            t1_cols = local_cols + remote_cols
+        else:
+            t1_cols = local_cols
+            sa_schema.Table(
+                referent,
+                m,
+                *[sa_schema.Column(n, NULLTYPE) for n in remote_cols],
+                schema=referent_schema,
+            )
+
+        t1 = sa_schema.Table(
+            source,
+            m,
+            *[
+                sa_schema.Column(n, NULLTYPE)
+                for n in util.unique_list(t1_cols)
+            ],
+            schema=source_schema,
+        )
+
+        tname = (
+            "%s.%s" % (referent_schema, referent)
+            if referent_schema
+            else referent
+        )
+
+        dialect_kw["match"] = match
+
+        f = sa_schema.ForeignKeyConstraint(
+            local_cols,
+            ["%s.%s" % (tname, n) for n in remote_cols],
+            name=name,
+            onupdate=onupdate,
+            ondelete=ondelete,
+            deferrable=deferrable,
+            initially=initially,
+            **dialect_kw,
+        )
+        t1.append_constraint(f)
+
+        return f
+
+    def unique_constraint(
+        self,
+        name: Optional[sqla_compat._ConstraintNameDefined],
+        source: str,
+        local_cols: Sequence[str],
+        schema: Optional[str] = None,
+        **kw,
+    ) -> UniqueConstraint:
+        t = sa_schema.Table(
+            source,
+            self.metadata(),
+            *[sa_schema.Column(n, NULLTYPE) for n in local_cols],
+            schema=schema,
+        )
+        kw["name"] = name
+        uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw)
+        # TODO: need event tests to ensure the event
+        # is fired off here
+        t.append_constraint(uq)
+        return uq
+
+    def check_constraint(
+        self,
+        name: Optional[sqla_compat._ConstraintNameDefined],
+        source: str,
+        condition: Union[str, TextClause, ColumnElement[Any]],
+        schema: Optional[str] = None,
+        **kw,
+    ) -> Union[CheckConstraint]:
+        t = sa_schema.Table(
+            source,
+            self.metadata(),
+            sa_schema.Column("x", Integer),
+            schema=schema,
+        )
+        ck = sa_schema.CheckConstraint(condition, name=name, **kw)
+        t.append_constraint(ck)
+        return ck
+
+    def generic_constraint(
+        self,
+        name: Optional[sqla_compat._ConstraintNameDefined],
+        table_name: str,
+        type_: Optional[str],
+        schema: Optional[str] = None,
+        **kw,
+    ) -> Any:
+        t = self.table(table_name, schema=schema)
+        types: Dict[Optional[str], Any] = {
+            "foreignkey": lambda name: sa_schema.ForeignKeyConstraint(
+                [], [], name=name
+            ),
+            "primary": sa_schema.PrimaryKeyConstraint,
+            "unique": sa_schema.UniqueConstraint,
+            "check": lambda name: sa_schema.CheckConstraint("", name=name),
+            None: sa_schema.Constraint,
+        }
+        try:
+            const = types[type_]
+        except KeyError as ke:
+            raise TypeError(
+                "'type' can be one of %s"
+                % ", ".join(sorted(repr(x) for x in types))
+            ) from ke
+        else:
+            const = const(name=name)
+            t.append_constraint(const)
+            return const
+
+    def metadata(self) -> MetaData:
+        kw = {}
+        if (
+            self.migration_context is not None
+            and "target_metadata" in self.migration_context.opts
+        ):
+            mt = self.migration_context.opts["target_metadata"]
+            if hasattr(mt, "naming_convention"):
+                kw["naming_convention"] = mt.naming_convention
+        return sa_schema.MetaData(**kw)
+
+    def table(self, name: str, *columns, **kw) -> Table:
+        m = self.metadata()
+
+        cols = [
+            sqla_compat._copy(c) if c.table is not None else c
+            for c in columns
+            if isinstance(c, Column)
+        ]
+        # these flags have already added their UniqueConstraint /
+        # Index objects to the table, so flip them off here.
+        # SQLAlchemy tometadata() avoids this instead by preserving the
+        # flags and skipping the constraints that have _type_bound on them,
+        # but for a migration we'd rather list out the constraints
+        # explicitly.
+        _constraints_included = kw.pop("_constraints_included", False)
+        if _constraints_included:
+            for c in cols:
+                c.unique = c.index = False
+
+        t = sa_schema.Table(name, m, *cols, **kw)
+
+        constraints = [
+            (
+                sqla_compat._copy(elem, target_table=t)
+                if getattr(elem, "parent", None) is not t
+                and getattr(elem, "parent", None) is not None
+                else elem
+            )
+            for elem in columns
+            if isinstance(elem, (Constraint, Index))
+        ]
+
+        for const in constraints:
+            t.append_constraint(const)
+
+        for f in t.foreign_keys:
+            self._ensure_table_for_fk(m, f)
+        return t
+
+    def column(self, name: str, type_: TypeEngine, **kw) -> Column:
+        return sa_schema.Column(name, type_, **kw)
+
+    def index(
+        self,
+        name: Optional[str],
+        tablename: Optional[str],
+        columns: Sequence[Union[str, TextClause, ColumnElement[Any]]],
+        schema: Optional[str] = None,
+        **kw,
+    ) -> Index:
+        t = sa_schema.Table(
+            tablename or "no_table",
+            self.metadata(),
+            schema=schema,
+        )
+        kw["_table"] = t
+        idx = sa_schema.Index(
+            name,
+            *[util.sqla_compat._textual_index_column(t, n) for n in columns],
+            **kw,
+        )
+        return idx
+
+    def _parse_table_key(self, table_key: str) -> Tuple[Optional[str], str]:
+        if "." in table_key:
+            tokens = table_key.split(".")
+            sname: Optional[str] = ".".join(tokens[0:-1])
+            tname = tokens[-1]
+        else:
+            tname = table_key
+            sname = None
+        return (sname, tname)
+
+    def _ensure_table_for_fk(self, metadata: MetaData, fk: ForeignKey) -> None:
+        """create a placeholder Table object for the referent of a
+        ForeignKey.
+
+        """
+        if isinstance(fk._colspec, str):
+            table_key, cname = fk._colspec.rsplit(".", 1)
+            sname, tname = self._parse_table_key(table_key)
+            if table_key not in metadata.tables:
+                rel_t = sa_schema.Table(tname, metadata, schema=sname)
+            else:
+                rel_t = metadata.tables[table_key]
+            if cname not in rel_t.c:
+                rel_t.append_column(sa_schema.Column(cname, NULLTYPE))
diff --git a/.venv/lib/python3.12/site-packages/alembic/operations/toimpl.py b/.venv/lib/python3.12/site-packages/alembic/operations/toimpl.py
new file mode 100644
index 00000000..528c0542
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/operations/toimpl.py
@@ -0,0 +1,225 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from typing import TYPE_CHECKING
+
+from sqlalchemy import schema as sa_schema
+
+from . import ops
+from .base import Operations
+from ..util.sqla_compat import _copy
+
+if TYPE_CHECKING:
+    from sqlalchemy.sql.schema import Table
+
+
+@Operations.implementation_for(ops.AlterColumnOp)
+def alter_column(
+    operations: "Operations", operation: "ops.AlterColumnOp"
+) -> None:
+    compiler = operations.impl.dialect.statement_compiler(
+        operations.impl.dialect, None
+    )
+
+    existing_type = operation.existing_type
+    existing_nullable = operation.existing_nullable
+    existing_server_default = operation.existing_server_default
+    type_ = operation.modify_type
+    column_name = operation.column_name
+    table_name = operation.table_name
+    schema = operation.schema
+    server_default = operation.modify_server_default
+    new_column_name = operation.modify_name
+    nullable = operation.modify_nullable
+    comment = operation.modify_comment
+    existing_comment = operation.existing_comment
+
+    def _count_constraint(constraint):
+        return not isinstance(constraint, sa_schema.PrimaryKeyConstraint) and (
+            not constraint._create_rule or constraint._create_rule(compiler)
+        )
+
+    if existing_type and type_:
+        t = operations.schema_obj.table(
+            table_name,
+            sa_schema.Column(column_name, existing_type),
+            schema=schema,
+        )
+        for constraint in t.constraints:
+            if _count_constraint(constraint):
+                operations.impl.drop_constraint(constraint)
+
+    operations.impl.alter_column(
+        table_name,
+        column_name,
+        nullable=nullable,
+        server_default=server_default,
+        name=new_column_name,
+        type_=type_,
+        schema=schema,
+        existing_type=existing_type,
+        existing_server_default=existing_server_default,
+        existing_nullable=existing_nullable,
+        comment=comment,
+        existing_comment=existing_comment,
+        **operation.kw,
+    )
+
+    if type_:
+        t = operations.schema_obj.table(
+            table_name,
+            operations.schema_obj.column(column_name, type_),
+            schema=schema,
+        )
+        for constraint in t.constraints:
+            if _count_constraint(constraint):
+                operations.impl.add_constraint(constraint)
+
+
+@Operations.implementation_for(ops.DropTableOp)
+def drop_table(operations: "Operations", operation: "ops.DropTableOp") -> None:
+    kw = {}
+    if operation.if_exists is not None:
+        kw["if_exists"] = operation.if_exists
+    operations.impl.drop_table(
+        operation.to_table(operations.migration_context), **kw
+    )
+
+
+@Operations.implementation_for(ops.DropColumnOp)
+def drop_column(
+    operations: "Operations", operation: "ops.DropColumnOp"
+) -> None:
+    column = operation.to_column(operations.migration_context)
+    operations.impl.drop_column(
+        operation.table_name, column, schema=operation.schema, **operation.kw
+    )
+
+
+@Operations.implementation_for(ops.CreateIndexOp)
+def create_index(
+    operations: "Operations", operation: "ops.CreateIndexOp"
+) -> None:
+    idx = operation.to_index(operations.migration_context)
+    kw = {}
+    if operation.if_not_exists is not None:
+        kw["if_not_exists"] = operation.if_not_exists
+    operations.impl.create_index(idx, **kw)
+
+
+@Operations.implementation_for(ops.DropIndexOp)
+def drop_index(operations: "Operations", operation: "ops.DropIndexOp") -> None:
+    kw = {}
+    if operation.if_exists is not None:
+        kw["if_exists"] = operation.if_exists
+
+    operations.impl.drop_index(
+        operation.to_index(operations.migration_context),
+        **kw,
+    )
+
+
+@Operations.implementation_for(ops.CreateTableOp)
+def create_table(
+    operations: "Operations", operation: "ops.CreateTableOp"
+) -> "Table":
+    kw = {}
+    if operation.if_not_exists is not None:
+        kw["if_not_exists"] = operation.if_not_exists
+    table = operation.to_table(operations.migration_context)
+    operations.impl.create_table(table, **kw)
+    return table
+
+
+@Operations.implementation_for(ops.RenameTableOp)
+def rename_table(
+    operations: "Operations", operation: "ops.RenameTableOp"
+) -> None:
+    operations.impl.rename_table(
+        operation.table_name, operation.new_table_name, schema=operation.schema
+    )
+
+
+@Operations.implementation_for(ops.CreateTableCommentOp)
+def create_table_comment(
+    operations: "Operations", operation: "ops.CreateTableCommentOp"
+) -> None:
+    table = operation.to_table(operations.migration_context)
+    operations.impl.create_table_comment(table)
+
+
+@Operations.implementation_for(ops.DropTableCommentOp)
+def drop_table_comment(
+    operations: "Operations", operation: "ops.DropTableCommentOp"
+) -> None:
+    table = operation.to_table(operations.migration_context)
+    operations.impl.drop_table_comment(table)
+
+
+@Operations.implementation_for(ops.AddColumnOp)
+def add_column(operations: "Operations", operation: "ops.AddColumnOp") -> None:
+    table_name = operation.table_name
+    column = operation.column
+    schema = operation.schema
+    kw = operation.kw
+
+    if column.table is not None:
+        column = _copy(column)
+
+    t = operations.schema_obj.table(table_name, column, schema=schema)
+    operations.impl.add_column(table_name, column, schema=schema, **kw)
+
+    for constraint in t.constraints:
+        if not isinstance(constraint, sa_schema.PrimaryKeyConstraint):
+            operations.impl.add_constraint(constraint)
+    for index in t.indexes:
+        operations.impl.create_index(index)
+
+    with_comment = (
+        operations.impl.dialect.supports_comments
+        and not operations.impl.dialect.inline_comments
+    )
+    comment = column.comment
+    if comment and with_comment:
+        operations.impl.create_column_comment(column)
+
+
+@Operations.implementation_for(ops.AddConstraintOp)
+def create_constraint(
+    operations: "Operations", operation: "ops.AddConstraintOp"
+) -> None:
+    operations.impl.add_constraint(
+        operation.to_constraint(operations.migration_context)
+    )
+
+
+@Operations.implementation_for(ops.DropConstraintOp)
+def drop_constraint(
+    operations: "Operations", operation: "ops.DropConstraintOp"
+) -> None:
+    operations.impl.drop_constraint(
+        operations.schema_obj.generic_constraint(
+            operation.constraint_name,
+            operation.table_name,
+            operation.constraint_type,
+            schema=operation.schema,
+        )
+    )
+
+
+@Operations.implementation_for(ops.BulkInsertOp)
+def bulk_insert(
+    operations: "Operations", operation: "ops.BulkInsertOp"
+) -> None:
+    operations.impl.bulk_insert(  # type: ignore[union-attr]
+        operation.table, operation.rows, multiinsert=operation.multiinsert
+    )
+
+
+@Operations.implementation_for(ops.ExecuteSQLOp)
+def execute_sql(
+    operations: "Operations", operation: "ops.ExecuteSQLOp"
+) -> None:
+    operations.migration_context.impl.execute(
+        operation.sqltext, execution_options=operation.execution_options
+    )