aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/alembic/operations
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/operations')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/operations/__init__.py15
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/operations/base.py1906
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/operations/batch.py718
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/operations/ops.py2799
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/operations/schemaobj.py290
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/operations/toimpl.py225
6 files changed, 5953 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/operations/__init__.py b/.venv/lib/python3.12/site-packages/alembic/operations/__init__.py
new file mode 100644
index 00000000..26197cbe
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/operations/__init__.py
@@ -0,0 +1,15 @@
+from . import toimpl
+from .base import AbstractOperations
+from .base import BatchOperations
+from .base import Operations
+from .ops import MigrateOperation
+from .ops import MigrationScript
+
+
+__all__ = [
+ "AbstractOperations",
+ "Operations",
+ "BatchOperations",
+ "MigrateOperation",
+ "MigrationScript",
+]
diff --git a/.venv/lib/python3.12/site-packages/alembic/operations/base.py b/.venv/lib/python3.12/site-packages/alembic/operations/base.py
new file mode 100644
index 00000000..456d1c75
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/operations/base.py
@@ -0,0 +1,1906 @@
+# mypy: allow-untyped-calls
+
+from __future__ import annotations
+
+from contextlib import contextmanager
+import re
+import textwrap
+from typing import Any
+from typing import Awaitable
+from typing import Callable
+from typing import Dict
+from typing import Iterator
+from typing import List # noqa
+from typing import Mapping
+from typing import NoReturn
+from typing import Optional
+from typing import overload
+from typing import Sequence # noqa
+from typing import Tuple
+from typing import Type # noqa
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from sqlalchemy.sql.elements import conv
+
+from . import batch
+from . import schemaobj
+from .. import util
+from ..util import sqla_compat
+from ..util.compat import formatannotation_fwdref
+from ..util.compat import inspect_formatargspec
+from ..util.compat import inspect_getfullargspec
+from ..util.sqla_compat import _literal_bindparam
+
+
+if TYPE_CHECKING:
+ from typing import Literal
+
+ from sqlalchemy import Table
+ from sqlalchemy.engine import Connection
+ from sqlalchemy.sql import Executable
+ from sqlalchemy.sql.expression import ColumnElement
+ from sqlalchemy.sql.expression import TableClause
+ from sqlalchemy.sql.expression import TextClause
+ from sqlalchemy.sql.schema import Column
+ from sqlalchemy.sql.schema import Computed
+ from sqlalchemy.sql.schema import Identity
+ from sqlalchemy.sql.schema import SchemaItem
+ from sqlalchemy.types import TypeEngine
+
+ from .batch import BatchOperationsImpl
+ from .ops import AddColumnOp
+ from .ops import AddConstraintOp
+ from .ops import AlterColumnOp
+ from .ops import AlterTableOp
+ from .ops import BulkInsertOp
+ from .ops import CreateIndexOp
+ from .ops import CreateTableCommentOp
+ from .ops import CreateTableOp
+ from .ops import DropColumnOp
+ from .ops import DropConstraintOp
+ from .ops import DropIndexOp
+ from .ops import DropTableCommentOp
+ from .ops import DropTableOp
+ from .ops import ExecuteSQLOp
+ from .ops import MigrateOperation
+ from ..ddl import DefaultImpl
+ from ..runtime.migration import MigrationContext
+__all__ = ("Operations", "BatchOperations")
+_T = TypeVar("_T")
+
+_C = TypeVar("_C", bound=Callable[..., Any])
+
+
+class AbstractOperations(util.ModuleClsProxy):
+ """Base class for Operations and BatchOperations.
+
+ .. versionadded:: 1.11.0
+
+ """
+
+ impl: Union[DefaultImpl, BatchOperationsImpl]
+ _to_impl = util.Dispatcher()
+
+ def __init__(
+ self,
+ migration_context: MigrationContext,
+ impl: Optional[BatchOperationsImpl] = None,
+ ) -> None:
+ """Construct a new :class:`.Operations`
+
+ :param migration_context: a :class:`.MigrationContext`
+ instance.
+
+ """
+ self.migration_context = migration_context
+ if impl is None:
+ self.impl = migration_context.impl
+ else:
+ self.impl = impl
+
+ self.schema_obj = schemaobj.SchemaObjects(migration_context)
+
+ @classmethod
+ def register_operation(
+ cls, name: str, sourcename: Optional[str] = None
+ ) -> Callable[[Type[_T]], Type[_T]]:
+ """Register a new operation for this class.
+
+ This method is normally used to add new operations
+ to the :class:`.Operations` class, and possibly the
+ :class:`.BatchOperations` class as well. All Alembic migration
+ operations are implemented via this system, however the system
+ is also available as a public API to facilitate adding custom
+ operations.
+
+ .. seealso::
+
+ :ref:`operation_plugins`
+
+
+ """
+
+ def register(op_cls: Type[_T]) -> Type[_T]:
+ if sourcename is None:
+ fn = getattr(op_cls, name)
+ source_name = fn.__name__
+ else:
+ fn = getattr(op_cls, sourcename)
+ source_name = fn.__name__
+
+ spec = inspect_getfullargspec(fn)
+
+ name_args = spec[0]
+ assert name_args[0:2] == ["cls", "operations"]
+
+ name_args[0:2] = ["self"]
+
+ args = inspect_formatargspec(
+ *spec, formatannotation=formatannotation_fwdref
+ )
+ num_defaults = len(spec[3]) if spec[3] else 0
+
+ defaulted_vals: Tuple[Any, ...]
+
+ if num_defaults:
+ defaulted_vals = tuple(name_args[0 - num_defaults :])
+ else:
+ defaulted_vals = ()
+
+ defaulted_vals += tuple(spec[4])
+ # here, we are using formatargspec in a different way in order
+ # to get a string that will re-apply incoming arguments to a new
+ # function call
+
+ apply_kw = inspect_formatargspec(
+ name_args + spec[4],
+ spec[1],
+ spec[2],
+ defaulted_vals,
+ formatvalue=lambda x: "=" + x,
+ formatannotation=formatannotation_fwdref,
+ )
+
+ args = re.sub(
+ r'[_]?ForwardRef\(([\'"].+?[\'"])\)',
+ lambda m: m.group(1),
+ args,
+ )
+
+ func_text = textwrap.dedent(
+ """\
+ def %(name)s%(args)s:
+ %(doc)r
+ return op_cls.%(source_name)s%(apply_kw)s
+ """
+ % {
+ "name": name,
+ "source_name": source_name,
+ "args": args,
+ "apply_kw": apply_kw,
+ "doc": fn.__doc__,
+ }
+ )
+
+ globals_ = dict(globals())
+ globals_.update({"op_cls": op_cls})
+ lcl: Dict[str, Any] = {}
+
+ exec(func_text, globals_, lcl)
+ setattr(cls, name, lcl[name])
+ fn.__func__.__doc__ = (
+ "This method is proxied on "
+ "the :class:`.%s` class, via the :meth:`.%s.%s` method."
+ % (cls.__name__, cls.__name__, name)
+ )
+ if hasattr(fn, "_legacy_translations"):
+ lcl[name]._legacy_translations = fn._legacy_translations
+ return op_cls
+
+ return register
+
+ @classmethod
+ def implementation_for(cls, op_cls: Any) -> Callable[[_C], _C]:
+ """Register an implementation for a given :class:`.MigrateOperation`.
+
+ This is part of the operation extensibility API.
+
+ .. seealso::
+
+ :ref:`operation_plugins` - example of use
+
+ """
+
+ def decorate(fn: _C) -> _C:
+ cls._to_impl.dispatch_for(op_cls)(fn)
+ return fn
+
+ return decorate
+
+ @classmethod
+ @contextmanager
+ def context(
+ cls, migration_context: MigrationContext
+ ) -> Iterator[Operations]:
+ op = Operations(migration_context)
+ op._install_proxy()
+ yield op
+ op._remove_proxy()
+
+ @contextmanager
+ def batch_alter_table(
+ self,
+ table_name: str,
+ schema: Optional[str] = None,
+ recreate: Literal["auto", "always", "never"] = "auto",
+ partial_reordering: Optional[Tuple[Any, ...]] = None,
+ copy_from: Optional[Table] = None,
+ table_args: Tuple[Any, ...] = (),
+ table_kwargs: Mapping[str, Any] = util.immutabledict(),
+ reflect_args: Tuple[Any, ...] = (),
+ reflect_kwargs: Mapping[str, Any] = util.immutabledict(),
+ naming_convention: Optional[Dict[str, str]] = None,
+ ) -> Iterator[BatchOperations]:
+ """Invoke a series of per-table migrations in batch.
+
+ Batch mode allows a series of operations specific to a table
+ to be syntactically grouped together, and allows for alternate
+ modes of table migration, in particular the "recreate" style of
+ migration required by SQLite.
+
+ "recreate" style is as follows:
+
+ 1. A new table is created with the new specification, based on the
+ migration directives within the batch, using a temporary name.
+
+ 2. the data copied from the existing table to the new table.
+
+ 3. the existing table is dropped.
+
+ 4. the new table is renamed to the existing table name.
+
+ The directive by default will only use "recreate" style on the
+ SQLite backend, and only if directives are present which require
+ this form, e.g. anything other than ``add_column()``. The batch
+ operation on other backends will proceed using standard ALTER TABLE
+ operations.
+
+ The method is used as a context manager, which returns an instance
+ of :class:`.BatchOperations`; this object is the same as
+ :class:`.Operations` except that table names and schema names
+ are omitted. E.g.::
+
+ with op.batch_alter_table("some_table") as batch_op:
+ batch_op.add_column(Column("foo", Integer))
+ batch_op.drop_column("bar")
+
+ The operations within the context manager are invoked at once
+ when the context is ended. When run against SQLite, if the
+ migrations include operations not supported by SQLite's ALTER TABLE,
+ the entire table will be copied to a new one with the new
+ specification, moving all data across as well.
+
+ The copy operation by default uses reflection to retrieve the current
+ structure of the table, and therefore :meth:`.batch_alter_table`
+ in this mode requires that the migration is run in "online" mode.
+ The ``copy_from`` parameter may be passed which refers to an existing
+ :class:`.Table` object, which will bypass this reflection step.
+
+ .. note:: The table copy operation will currently not copy
+ CHECK constraints, and may not copy UNIQUE constraints that are
+ unnamed, as is possible on SQLite. See the section
+ :ref:`sqlite_batch_constraints` for workarounds.
+
+ :param table_name: name of table
+ :param schema: optional schema name.
+ :param recreate: under what circumstances the table should be
+ recreated. At its default of ``"auto"``, the SQLite dialect will
+ recreate the table if any operations other than ``add_column()``,
+ ``create_index()``, or ``drop_index()`` are
+ present. Other options include ``"always"`` and ``"never"``.
+ :param copy_from: optional :class:`~sqlalchemy.schema.Table` object
+ that will act as the structure of the table being copied. If omitted,
+ table reflection is used to retrieve the structure of the table.
+
+ .. seealso::
+
+ :ref:`batch_offline_mode`
+
+ :paramref:`~.Operations.batch_alter_table.reflect_args`
+
+ :paramref:`~.Operations.batch_alter_table.reflect_kwargs`
+
+ :param reflect_args: a sequence of additional positional arguments that
+ will be applied to the table structure being reflected / copied;
+ this may be used to pass column and constraint overrides to the
+ table that will be reflected, in lieu of passing the whole
+ :class:`~sqlalchemy.schema.Table` using
+ :paramref:`~.Operations.batch_alter_table.copy_from`.
+ :param reflect_kwargs: a dictionary of additional keyword arguments
+ that will be applied to the table structure being copied; this may be
+ used to pass additional table and reflection options to the table that
+ will be reflected, in lieu of passing the whole
+ :class:`~sqlalchemy.schema.Table` using
+ :paramref:`~.Operations.batch_alter_table.copy_from`.
+ :param table_args: a sequence of additional positional arguments that
+ will be applied to the new :class:`~sqlalchemy.schema.Table` when
+ created, in addition to those copied from the source table.
+ This may be used to provide additional constraints such as CHECK
+ constraints that may not be reflected.
+ :param table_kwargs: a dictionary of additional keyword arguments
+ that will be applied to the new :class:`~sqlalchemy.schema.Table`
+ when created, in addition to those copied from the source table.
+ This may be used to provide for additional table options that may
+ not be reflected.
+ :param naming_convention: a naming convention dictionary of the form
+ described at :ref:`autogen_naming_conventions` which will be applied
+ to the :class:`~sqlalchemy.schema.MetaData` during the reflection
+ process. This is typically required if one wants to drop SQLite
+ constraints, as these constraints will not have names when
+ reflected on this backend. Requires SQLAlchemy **0.9.4** or greater.
+
+ .. seealso::
+
+ :ref:`dropping_sqlite_foreign_keys`
+
+ :param partial_reordering: a list of tuples, each suggesting a desired
+ ordering of two or more columns in the newly created table. Requires
+ that :paramref:`.batch_alter_table.recreate` is set to ``"always"``.
+ Examples, given a table with columns "a", "b", "c", and "d":
+
+ Specify the order of all columns::
+
+ with op.batch_alter_table(
+ "some_table",
+ recreate="always",
+ partial_reordering=[("c", "d", "a", "b")],
+ ) as batch_op:
+ pass
+
+ Ensure "d" appears before "c", and "b", appears before "a"::
+
+ with op.batch_alter_table(
+ "some_table",
+ recreate="always",
+ partial_reordering=[("d", "c"), ("b", "a")],
+ ) as batch_op:
+ pass
+
+ The ordering of columns not included in the partial_reordering
+ set is undefined. Therefore it is best to specify the complete
+ ordering of all columns for best results.
+
+ .. note:: batch mode requires SQLAlchemy 0.8 or above.
+
+ .. seealso::
+
+ :ref:`batch_migrations`
+
+ """
+ impl = batch.BatchOperationsImpl(
+ self,
+ table_name,
+ schema,
+ recreate,
+ copy_from,
+ table_args,
+ table_kwargs,
+ reflect_args,
+ reflect_kwargs,
+ naming_convention,
+ partial_reordering,
+ )
+ batch_op = BatchOperations(self.migration_context, impl=impl)
+ yield batch_op
+ impl.flush()
+
+ def get_context(self) -> MigrationContext:
+ """Return the :class:`.MigrationContext` object that's
+ currently in use.
+
+ """
+
+ return self.migration_context
+
+ @overload
+ def invoke(self, operation: CreateTableOp) -> Table: ...
+
+ @overload
+ def invoke(
+ self,
+ operation: Union[
+ AddConstraintOp,
+ DropConstraintOp,
+ CreateIndexOp,
+ DropIndexOp,
+ AddColumnOp,
+ AlterColumnOp,
+ AlterTableOp,
+ CreateTableCommentOp,
+ DropTableCommentOp,
+ DropColumnOp,
+ BulkInsertOp,
+ DropTableOp,
+ ExecuteSQLOp,
+ ],
+ ) -> None: ...
+
+ @overload
+ def invoke(self, operation: MigrateOperation) -> Any: ...
+
+ def invoke(self, operation: MigrateOperation) -> Any:
+ """Given a :class:`.MigrateOperation`, invoke it in terms of
+ this :class:`.Operations` instance.
+
+ """
+ fn = self._to_impl.dispatch(
+ operation, self.migration_context.impl.__dialect__
+ )
+ return fn(self, operation)
+
+ def f(self, name: str) -> conv:
+ """Indicate a string name that has already had a naming convention
+ applied to it.
+
+ This feature combines with the SQLAlchemy ``naming_convention`` feature
+ to disambiguate constraint names that have already had naming
+ conventions applied to them, versus those that have not. This is
+ necessary in the case that the ``"%(constraint_name)s"`` token
+ is used within a naming convention, so that it can be identified
+ that this particular name should remain fixed.
+
+ If the :meth:`.Operations.f` is used on a constraint, the naming
+ convention will not take effect::
+
+ op.add_column("t", "x", Boolean(name=op.f("ck_bool_t_x")))
+
+ Above, the CHECK constraint generated will have the name
+ ``ck_bool_t_x`` regardless of whether or not a naming convention is
+ in use.
+
+ Alternatively, if a naming convention is in use, and 'f' is not used,
+ names will be converted along conventions. If the ``target_metadata``
+ contains the naming convention
+ ``{"ck": "ck_bool_%(table_name)s_%(constraint_name)s"}``, then the
+ output of the following:
+
+ op.add_column("t", "x", Boolean(name="x"))
+
+ will be::
+
+ CONSTRAINT ck_bool_t_x CHECK (x in (1, 0)))
+
+ The function is rendered in the output of autogenerate when
+ a particular constraint name is already converted.
+
+ """
+ return conv(name)
+
+ def inline_literal(
+ self, value: Union[str, int], type_: Optional[TypeEngine[Any]] = None
+ ) -> _literal_bindparam:
+ r"""Produce an 'inline literal' expression, suitable for
+ using in an INSERT, UPDATE, or DELETE statement.
+
+ When using Alembic in "offline" mode, CRUD operations
+ aren't compatible with SQLAlchemy's default behavior surrounding
+ literal values,
+ which is that they are converted into bound values and passed
+ separately into the ``execute()`` method of the DBAPI cursor.
+ An offline SQL
+ script needs to have these rendered inline. While it should
+ always be noted that inline literal values are an **enormous**
+ security hole in an application that handles untrusted input,
+ a schema migration is not run in this context, so
+ literals are safe to render inline, with the caveat that
+ advanced types like dates may not be supported directly
+ by SQLAlchemy.
+
+ See :meth:`.Operations.execute` for an example usage of
+ :meth:`.Operations.inline_literal`.
+
+ The environment can also be configured to attempt to render
+ "literal" values inline automatically, for those simple types
+ that are supported by the dialect; see
+ :paramref:`.EnvironmentContext.configure.literal_binds` for this
+ more recently added feature.
+
+ :param value: The value to render. Strings, integers, and simple
+ numerics should be supported. Other types like boolean,
+ dates, etc. may or may not be supported yet by various
+ backends.
+ :param type\_: optional - a :class:`sqlalchemy.types.TypeEngine`
+ subclass stating the type of this value. In SQLAlchemy
+ expressions, this is usually derived automatically
+ from the Python type of the value itself, as well as
+ based on the context in which the value is used.
+
+ .. seealso::
+
+ :paramref:`.EnvironmentContext.configure.literal_binds`
+
+ """
+ return sqla_compat._literal_bindparam(None, value, type_=type_)
+
+ def get_bind(self) -> Connection:
+ """Return the current 'bind'.
+
+ Under normal circumstances, this is the
+ :class:`~sqlalchemy.engine.Connection` currently being used
+ to emit SQL to the database.
+
+ In a SQL script context, this value is ``None``. [TODO: verify this]
+
+ """
+ return self.migration_context.impl.bind # type: ignore[return-value]
+
+ def run_async(
+ self,
+ async_function: Callable[..., Awaitable[_T]],
+ *args: Any,
+ **kw_args: Any,
+ ) -> _T:
+ """Invoke the given asynchronous callable, passing an asynchronous
+ :class:`~sqlalchemy.ext.asyncio.AsyncConnection` as the first
+ argument.
+
+ This method allows calling async functions from within the
+ synchronous ``upgrade()`` or ``downgrade()`` alembic migration
+ method.
+
+ The async connection passed to the callable shares the same
+ transaction as the connection running in the migration context.
+
+ Any additional arg or kw_arg passed to this function are passed
+ to the provided async function.
+
+ .. versionadded: 1.11
+
+ .. note::
+
+ This method can be called only when alembic is called using
+ an async dialect.
+ """
+ if not sqla_compat.sqla_14_18:
+ raise NotImplementedError("SQLAlchemy 1.4.18+ required")
+ sync_conn = self.get_bind()
+ if sync_conn is None:
+ raise NotImplementedError("Cannot call run_async in SQL mode")
+ if not sync_conn.dialect.is_async:
+ raise ValueError("Cannot call run_async with a sync engine")
+ from sqlalchemy.ext.asyncio import AsyncConnection
+ from sqlalchemy.util import await_only
+
+ async_conn = AsyncConnection._retrieve_proxy_for_target(sync_conn)
+ return await_only(async_function(async_conn, *args, **kw_args))
+
+
+class Operations(AbstractOperations):
+ """Define high level migration operations.
+
+ Each operation corresponds to some schema migration operation,
+ executed against a particular :class:`.MigrationContext`
+ which in turn represents connectivity to a database,
+ or a file output stream.
+
+ While :class:`.Operations` is normally configured as
+ part of the :meth:`.EnvironmentContext.run_migrations`
+ method called from an ``env.py`` script, a standalone
+ :class:`.Operations` instance can be
+ made for use cases external to regular Alembic
+ migrations by passing in a :class:`.MigrationContext`::
+
+ from alembic.migration import MigrationContext
+ from alembic.operations import Operations
+
+ conn = myengine.connect()
+ ctx = MigrationContext.configure(conn)
+ op = Operations(ctx)
+
+ op.alter_column("t", "c", nullable=True)
+
+ Note that as of 0.8, most of the methods on this class are produced
+ dynamically using the :meth:`.Operations.register_operation`
+ method.
+
+ """
+
+ if TYPE_CHECKING:
+ # START STUB FUNCTIONS: op_cls
+ # ### the following stubs are generated by tools/write_pyi.py ###
+ # ### do not edit ###
+
+ def add_column(
+ self,
+ table_name: str,
+ column: Column[Any],
+ *,
+ schema: Optional[str] = None,
+ ) -> None:
+ """Issue an "add column" instruction using the current
+ migration context.
+
+ e.g.::
+
+ from alembic import op
+ from sqlalchemy import Column, String
+
+ op.add_column("organization", Column("name", String()))
+
+ The :meth:`.Operations.add_column` method typically corresponds
+ to the SQL command "ALTER TABLE... ADD COLUMN". Within the scope
+ of this command, the column's name, datatype, nullability,
+ and optional server-generated defaults may be indicated.
+
+ .. note::
+
+ With the exception of NOT NULL constraints or single-column FOREIGN
+ KEY constraints, other kinds of constraints such as PRIMARY KEY,
+ UNIQUE or CHECK constraints **cannot** be generated using this
+ method; for these constraints, refer to operations such as
+ :meth:`.Operations.create_primary_key` and
+ :meth:`.Operations.create_check_constraint`. In particular, the
+ following :class:`~sqlalchemy.schema.Column` parameters are
+ **ignored**:
+
+ * :paramref:`~sqlalchemy.schema.Column.primary_key` - SQL databases
+ typically do not support an ALTER operation that can add
+ individual columns one at a time to an existing primary key
+ constraint, therefore it's less ambiguous to use the
+ :meth:`.Operations.create_primary_key` method, which assumes no
+ existing primary key constraint is present.
+ * :paramref:`~sqlalchemy.schema.Column.unique` - use the
+ :meth:`.Operations.create_unique_constraint` method
+ * :paramref:`~sqlalchemy.schema.Column.index` - use the
+ :meth:`.Operations.create_index` method
+
+
+ The provided :class:`~sqlalchemy.schema.Column` object may include a
+ :class:`~sqlalchemy.schema.ForeignKey` constraint directive,
+ referencing a remote table name. For this specific type of constraint,
+ Alembic will automatically emit a second ALTER statement in order to
+ add the single-column FOREIGN KEY constraint separately::
+
+ from alembic import op
+ from sqlalchemy import Column, INTEGER, ForeignKey
+
+ op.add_column(
+ "organization",
+ Column("account_id", INTEGER, ForeignKey("accounts.id")),
+ )
+
+ The column argument passed to :meth:`.Operations.add_column` is a
+ :class:`~sqlalchemy.schema.Column` construct, used in the same way it's
+ used in SQLAlchemy. In particular, values or functions to be indicated
+ as producing the column's default value on the database side are
+ specified using the ``server_default`` parameter, and not ``default``
+ which only specifies Python-side defaults::
+
+ from alembic import op
+ from sqlalchemy import Column, TIMESTAMP, func
+
+ # specify "DEFAULT NOW" along with the column add
+ op.add_column(
+ "account",
+ Column("timestamp", TIMESTAMP, server_default=func.now()),
+ )
+
+ :param table_name: String name of the parent table.
+ :param column: a :class:`sqlalchemy.schema.Column` object
+ representing the new column.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ """ # noqa: E501
+ ...
+
+ def alter_column(
+ self,
+ table_name: str,
+ column_name: str,
+ *,
+ nullable: Optional[bool] = None,
+ comment: Union[str, Literal[False], None] = False,
+ server_default: Any = False,
+ new_column_name: Optional[str] = None,
+ type_: Union[TypeEngine[Any], Type[TypeEngine[Any]], None] = None,
+ existing_type: Union[
+ TypeEngine[Any], Type[TypeEngine[Any]], None
+ ] = None,
+ existing_server_default: Union[
+ str, bool, Identity, Computed, None
+ ] = False,
+ existing_nullable: Optional[bool] = None,
+ existing_comment: Optional[str] = None,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> None:
+ r"""Issue an "alter column" instruction using the
+ current migration context.
+
+ Generally, only that aspect of the column which
+ is being changed, i.e. name, type, nullability,
+ default, needs to be specified. Multiple changes
+ can also be specified at once and the backend should
+ "do the right thing", emitting each change either
+ separately or together as the backend allows.
+
+ MySQL has special requirements here, since MySQL
+ cannot ALTER a column without a full specification.
+ When producing MySQL-compatible migration files,
+ it is recommended that the ``existing_type``,
+ ``existing_server_default``, and ``existing_nullable``
+ parameters be present, if not being altered.
+
+ Type changes which are against the SQLAlchemy
+ "schema" types :class:`~sqlalchemy.types.Boolean`
+ and :class:`~sqlalchemy.types.Enum` may also
+ add or drop constraints which accompany those
+ types on backends that don't support them natively.
+ The ``existing_type`` argument is
+ used in this case to identify and remove a previous
+ constraint that was bound to the type object.
+
+ :param table_name: string name of the target table.
+ :param column_name: string name of the target column,
+ as it exists before the operation begins.
+ :param nullable: Optional; specify ``True`` or ``False``
+ to alter the column's nullability.
+ :param server_default: Optional; specify a string
+ SQL expression, :func:`~sqlalchemy.sql.expression.text`,
+ or :class:`~sqlalchemy.schema.DefaultClause` to indicate
+ an alteration to the column's default value.
+ Set to ``None`` to have the default removed.
+ :param comment: optional string text of a new comment to add to the
+ column.
+ :param new_column_name: Optional; specify a string name here to
+ indicate the new name within a column rename operation.
+ :param type\_: Optional; a :class:`~sqlalchemy.types.TypeEngine`
+ type object to specify a change to the column's type.
+ For SQLAlchemy types that also indicate a constraint (i.e.
+ :class:`~sqlalchemy.types.Boolean`, :class:`~sqlalchemy.types.Enum`),
+ the constraint is also generated.
+ :param autoincrement: set the ``AUTO_INCREMENT`` flag of the column;
+ currently understood by the MySQL dialect.
+ :param existing_type: Optional; a
+ :class:`~sqlalchemy.types.TypeEngine`
+ type object to specify the previous type. This
+ is required for all MySQL column alter operations that
+ don't otherwise specify a new type, as well as for
+ when nullability is being changed on a SQL Server
+ column. It is also used if the type is a so-called
+ SQLAlchemy "schema" type which may define a constraint (i.e.
+ :class:`~sqlalchemy.types.Boolean`,
+ :class:`~sqlalchemy.types.Enum`),
+ so that the constraint can be dropped.
+ :param existing_server_default: Optional; The existing
+ default value of the column. Required on MySQL if
+ an existing default is not being changed; else MySQL
+ removes the default.
+ :param existing_nullable: Optional; the existing nullability
+ of the column. Required on MySQL if the existing nullability
+ is not being changed; else MySQL sets this to NULL.
+ :param existing_autoincrement: Optional; the existing autoincrement
+ of the column. Used for MySQL's system of altering a column
+ that specifies ``AUTO_INCREMENT``.
+ :param existing_comment: string text of the existing comment on the
+ column to be maintained. Required on MySQL if the existing comment
+ on the column is not being changed.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+ :param postgresql_using: String argument which will indicate a
+ SQL expression to render within the Postgresql-specific USING clause
+ within ALTER COLUMN. This string is taken directly as raw SQL which
+ must explicitly include any necessary quoting or escaping of tokens
+ within the expression.
+
+ """ # noqa: E501
+ ...
+
+ def bulk_insert(
+ self,
+ table: Union[Table, TableClause],
+ rows: List[Dict[str, Any]],
+ *,
+ multiinsert: bool = True,
+ ) -> None:
+ """Issue a "bulk insert" operation using the current
+ migration context.
+
+ This provides a means of representing an INSERT of multiple rows
+ which works equally well in the context of executing on a live
+ connection as well as that of generating a SQL script. In the
+ case of a SQL script, the values are rendered inline into the
+ statement.
+
+ e.g.::
+
+ from alembic import op
+ from datetime import date
+ from sqlalchemy.sql import table, column
+ from sqlalchemy import String, Integer, Date
+
+ # Create an ad-hoc table to use for the insert statement.
+ accounts_table = table(
+ "account",
+ column("id", Integer),
+ column("name", String),
+ column("create_date", Date),
+ )
+
+ op.bulk_insert(
+ accounts_table,
+ [
+ {
+ "id": 1,
+ "name": "John Smith",
+ "create_date": date(2010, 10, 5),
+ },
+ {
+ "id": 2,
+ "name": "Ed Williams",
+ "create_date": date(2007, 5, 27),
+ },
+ {
+ "id": 3,
+ "name": "Wendy Jones",
+ "create_date": date(2008, 8, 15),
+ },
+ ],
+ )
+
+ When using --sql mode, some datatypes may not render inline
+ automatically, such as dates and other special types. When this
+ issue is present, :meth:`.Operations.inline_literal` may be used::
+
+ op.bulk_insert(
+ accounts_table,
+ [
+ {
+ "id": 1,
+ "name": "John Smith",
+ "create_date": op.inline_literal("2010-10-05"),
+ },
+ {
+ "id": 2,
+ "name": "Ed Williams",
+ "create_date": op.inline_literal("2007-05-27"),
+ },
+ {
+ "id": 3,
+ "name": "Wendy Jones",
+ "create_date": op.inline_literal("2008-08-15"),
+ },
+ ],
+ multiinsert=False,
+ )
+
+ When using :meth:`.Operations.inline_literal` in conjunction with
+ :meth:`.Operations.bulk_insert`, in order for the statement to work
+ in "online" (e.g. non --sql) mode, the
+ :paramref:`~.Operations.bulk_insert.multiinsert`
+ flag should be set to ``False``, which will have the effect of
+ individual INSERT statements being emitted to the database, each
+ with a distinct VALUES clause, so that the "inline" values can
+ still be rendered, rather than attempting to pass the values
+ as bound parameters.
+
+ :param table: a table object which represents the target of the INSERT.
+
+ :param rows: a list of dictionaries indicating rows.
+
+ :param multiinsert: when at its default of True and --sql mode is not
+ enabled, the INSERT statement will be executed using
+ "executemany()" style, where all elements in the list of
+ dictionaries are passed as bound parameters in a single
+ list. Setting this to False results in individual INSERT
+ statements being emitted per parameter set, and is needed
+ in those cases where non-literal values are present in the
+ parameter sets.
+
+ """ # noqa: E501
+ ...
+
+ def create_check_constraint(
+ self,
+ constraint_name: Optional[str],
+ table_name: str,
+ condition: Union[str, ColumnElement[bool], TextClause],
+ *,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> None:
+ """Issue a "create check constraint" instruction using the
+ current migration context.
+
+ e.g.::
+
+ from alembic import op
+ from sqlalchemy.sql import column, func
+
+ op.create_check_constraint(
+ "ck_user_name_len",
+ "user",
+ func.len(column("name")) > 5,
+ )
+
+ CHECK constraints are usually against a SQL expression, so ad-hoc
+ table metadata is usually needed. The function will convert the given
+ arguments into a :class:`sqlalchemy.schema.CheckConstraint` bound
+ to an anonymous table in order to emit the CREATE statement.
+
+ :param name: Name of the check constraint. The name is necessary
+ so that an ALTER statement can be emitted. For setups that
+ use an automated naming scheme such as that described at
+ :ref:`sqla:constraint_naming_conventions`,
+ ``name`` here can be ``None``, as the event listener will
+ apply the name to the constraint object when it is associated
+ with the table.
+ :param table_name: String name of the source table.
+ :param condition: SQL expression that's the condition of the
+ constraint. Can be a string or SQLAlchemy expression language
+ structure.
+ :param deferrable: optional bool. If set, emit DEFERRABLE or
+ NOT DEFERRABLE when issuing DDL for this constraint.
+ :param initially: optional string. If set, emit INITIALLY <value>
+ when issuing DDL for this constraint.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ """ # noqa: E501
+ ...
+
+ def create_exclude_constraint(
+ self,
+ constraint_name: str,
+ table_name: str,
+ *elements: Any,
+ **kw: Any,
+ ) -> Optional[Table]:
+ """Issue an alter to create an EXCLUDE constraint using the
+ current migration context.
+
+ .. note:: This method is Postgresql specific, and additionally
+ requires at least SQLAlchemy 1.0.
+
+ e.g.::
+
+ from alembic import op
+
+ op.create_exclude_constraint(
+ "user_excl",
+ "user",
+ ("period", "&&"),
+ ("group", "="),
+ where=("group != 'some group'"),
+ )
+
+ Note that the expressions work the same way as that of
+ the ``ExcludeConstraint`` object itself; if plain strings are
+ passed, quoting rules must be applied manually.
+
+ :param name: Name of the constraint.
+ :param table_name: String name of the source table.
+ :param elements: exclude conditions.
+ :param where: SQL expression or SQL string with optional WHERE
+ clause.
+ :param deferrable: optional bool. If set, emit DEFERRABLE or
+ NOT DEFERRABLE when issuing DDL for this constraint.
+ :param initially: optional string. If set, emit INITIALLY <value>
+ when issuing DDL for this constraint.
+ :param schema: Optional schema name to operate within.
+
+ """ # noqa: E501
+ ...
+
+ def create_foreign_key(
+ self,
+ constraint_name: Optional[str],
+ source_table: str,
+ referent_table: str,
+ local_cols: List[str],
+ remote_cols: List[str],
+ *,
+ onupdate: Optional[str] = None,
+ ondelete: Optional[str] = None,
+ deferrable: Optional[bool] = None,
+ initially: Optional[str] = None,
+ match: Optional[str] = None,
+ source_schema: Optional[str] = None,
+ referent_schema: Optional[str] = None,
+ **dialect_kw: Any,
+ ) -> None:
+ """Issue a "create foreign key" instruction using the
+ current migration context.
+
+ e.g.::
+
+ from alembic import op
+
+ op.create_foreign_key(
+ "fk_user_address",
+ "address",
+ "user",
+ ["user_id"],
+ ["id"],
+ )
+
+ This internally generates a :class:`~sqlalchemy.schema.Table` object
+ containing the necessary columns, then generates a new
+ :class:`~sqlalchemy.schema.ForeignKeyConstraint`
+ object which it then associates with the
+ :class:`~sqlalchemy.schema.Table`.
+ Any event listeners associated with this action will be fired
+ off normally. The :class:`~sqlalchemy.schema.AddConstraint`
+ construct is ultimately used to generate the ALTER statement.
+
+ :param constraint_name: Name of the foreign key constraint. The name
+ is necessary so that an ALTER statement can be emitted. For setups
+ that use an automated naming scheme such as that described at
+ :ref:`sqla:constraint_naming_conventions`,
+ ``name`` here can be ``None``, as the event listener will
+ apply the name to the constraint object when it is associated
+ with the table.
+ :param source_table: String name of the source table.
+ :param referent_table: String name of the destination table.
+ :param local_cols: a list of string column names in the
+ source table.
+ :param remote_cols: a list of string column names in the
+ remote table.
+ :param onupdate: Optional string. If set, emit ON UPDATE <value> when
+ issuing DDL for this constraint. Typical values include CASCADE,
+ DELETE and RESTRICT.
+ :param ondelete: Optional string. If set, emit ON DELETE <value> when
+ issuing DDL for this constraint. Typical values include CASCADE,
+ DELETE and RESTRICT.
+ :param deferrable: optional bool. If set, emit DEFERRABLE or NOT
+ DEFERRABLE when issuing DDL for this constraint.
+ :param source_schema: Optional schema name of the source table.
+ :param referent_schema: Optional schema name of the destination table.
+
+ """ # noqa: E501
+ ...
+
+ def create_index(
+ self,
+ index_name: Optional[str],
+ table_name: str,
+ columns: Sequence[Union[str, TextClause, ColumnElement[Any]]],
+ *,
+ schema: Optional[str] = None,
+ unique: bool = False,
+ if_not_exists: Optional[bool] = None,
+ **kw: Any,
+ ) -> None:
+ r"""Issue a "create index" instruction using the current
+ migration context.
+
+ e.g.::
+
+ from alembic import op
+
+ op.create_index("ik_test", "t1", ["foo", "bar"])
+
+ Functional indexes can be produced by using the
+ :func:`sqlalchemy.sql.expression.text` construct::
+
+ from alembic import op
+ from sqlalchemy import text
+
+ op.create_index("ik_test", "t1", [text("lower(foo)")])
+
+ :param index_name: name of the index.
+ :param table_name: name of the owning table.
+ :param columns: a list consisting of string column names and/or
+ :func:`~sqlalchemy.sql.expression.text` constructs.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+ :param unique: If True, create a unique index.
+
+ :param quote: Force quoting of this column's name on or off,
+ corresponding to ``True`` or ``False``. When left at its default
+ of ``None``, the column identifier will be quoted according to
+ whether the name is case sensitive (identifiers with at least one
+ upper case character are treated as case sensitive), or if it's a
+ reserved word. This flag is only needed to force quoting of a
+ reserved word which is not known by the SQLAlchemy dialect.
+
+ :param if_not_exists: If True, adds IF NOT EXISTS operator when
+ creating the new index.
+
+ .. versionadded:: 1.12.0
+
+ :param \**kw: Additional keyword arguments not mentioned above are
+ dialect specific, and passed in the form
+ ``<dialectname>_<argname>``.
+ See the documentation regarding an individual dialect at
+ :ref:`dialect_toplevel` for detail on documented arguments.
+
+ """ # noqa: E501
+ ...
+
+ def create_primary_key(
+ self,
+ constraint_name: Optional[str],
+ table_name: str,
+ columns: List[str],
+ *,
+ schema: Optional[str] = None,
+ ) -> None:
+ """Issue a "create primary key" instruction using the current
+ migration context.
+
+ e.g.::
+
+ from alembic import op
+
+ op.create_primary_key("pk_my_table", "my_table", ["id", "version"])
+
+ This internally generates a :class:`~sqlalchemy.schema.Table` object
+ containing the necessary columns, then generates a new
+ :class:`~sqlalchemy.schema.PrimaryKeyConstraint`
+ object which it then associates with the
+ :class:`~sqlalchemy.schema.Table`.
+ Any event listeners associated with this action will be fired
+ off normally. The :class:`~sqlalchemy.schema.AddConstraint`
+ construct is ultimately used to generate the ALTER statement.
+
+ :param constraint_name: Name of the primary key constraint. The name
+ is necessary so that an ALTER statement can be emitted. For setups
+ that use an automated naming scheme such as that described at
+ :ref:`sqla:constraint_naming_conventions`
+ ``name`` here can be ``None``, as the event listener will
+ apply the name to the constraint object when it is associated
+ with the table.
+ :param table_name: String name of the target table.
+ :param columns: a list of string column names to be applied to the
+ primary key constraint.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ """ # noqa: E501
+ ...
+
+ def create_table(
+ self,
+ table_name: str,
+ *columns: SchemaItem,
+ if_not_exists: Optional[bool] = None,
+ **kw: Any,
+ ) -> Table:
+ r"""Issue a "create table" instruction using the current migration
+ context.
+
+ This directive receives an argument list similar to that of the
+ traditional :class:`sqlalchemy.schema.Table` construct, but without the
+ metadata::
+
+ from sqlalchemy import INTEGER, VARCHAR, NVARCHAR, Column
+ from alembic import op
+
+ op.create_table(
+ "account",
+ Column("id", INTEGER, primary_key=True),
+ Column("name", VARCHAR(50), nullable=False),
+ Column("description", NVARCHAR(200)),
+ Column("timestamp", TIMESTAMP, server_default=func.now()),
+ )
+
+ Note that :meth:`.create_table` accepts
+ :class:`~sqlalchemy.schema.Column`
+ constructs directly from the SQLAlchemy library. In particular,
+ default values to be created on the database side are
+ specified using the ``server_default`` parameter, and not
+ ``default`` which only specifies Python-side defaults::
+
+ from alembic import op
+ from sqlalchemy import Column, TIMESTAMP, func
+
+ # specify "DEFAULT NOW" along with the "timestamp" column
+ op.create_table(
+ "account",
+ Column("id", INTEGER, primary_key=True),
+ Column("timestamp", TIMESTAMP, server_default=func.now()),
+ )
+
+ The function also returns a newly created
+ :class:`~sqlalchemy.schema.Table` object, corresponding to the table
+ specification given, which is suitable for
+ immediate SQL operations, in particular
+ :meth:`.Operations.bulk_insert`::
+
+ from sqlalchemy import INTEGER, VARCHAR, NVARCHAR, Column
+ from alembic import op
+
+ account_table = op.create_table(
+ "account",
+ Column("id", INTEGER, primary_key=True),
+ Column("name", VARCHAR(50), nullable=False),
+ Column("description", NVARCHAR(200)),
+ Column("timestamp", TIMESTAMP, server_default=func.now()),
+ )
+
+ op.bulk_insert(
+ account_table,
+ [
+ {"name": "A1", "description": "account 1"},
+ {"name": "A2", "description": "account 2"},
+ ],
+ )
+
+ :param table_name: Name of the table
+ :param \*columns: collection of :class:`~sqlalchemy.schema.Column`
+ objects within
+ the table, as well as optional :class:`~sqlalchemy.schema.Constraint`
+ objects
+ and :class:`~.sqlalchemy.schema.Index` objects.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+ :param if_not_exists: If True, adds IF NOT EXISTS operator when
+ creating the new table.
+
+ .. versionadded:: 1.13.3
+ :param \**kw: Other keyword arguments are passed to the underlying
+ :class:`sqlalchemy.schema.Table` object created for the command.
+
+ :return: the :class:`~sqlalchemy.schema.Table` object corresponding
+ to the parameters given.
+
+ """ # noqa: E501
+ ...
+
+ def create_table_comment(
+ self,
+ table_name: str,
+ comment: Optional[str],
+ *,
+ existing_comment: Optional[str] = None,
+ schema: Optional[str] = None,
+ ) -> None:
+ """Emit a COMMENT ON operation to set the comment for a table.
+
+ :param table_name: string name of the target table.
+ :param comment: string value of the comment being registered against
+ the specified table.
+ :param existing_comment: String value of a comment
+ already registered on the specified table, used within autogenerate
+ so that the operation is reversible, but not required for direct
+ use.
+
+ .. seealso::
+
+ :meth:`.Operations.drop_table_comment`
+
+ :paramref:`.Operations.alter_column.comment`
+
+ """ # noqa: E501
+ ...
+
+ def create_unique_constraint(
+ self,
+ constraint_name: Optional[str],
+ table_name: str,
+ columns: Sequence[str],
+ *,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ """Issue a "create unique constraint" instruction using the
+ current migration context.
+
+ e.g.::
+
+ from alembic import op
+ op.create_unique_constraint("uq_user_name", "user", ["name"])
+
+ This internally generates a :class:`~sqlalchemy.schema.Table` object
+ containing the necessary columns, then generates a new
+ :class:`~sqlalchemy.schema.UniqueConstraint`
+ object which it then associates with the
+ :class:`~sqlalchemy.schema.Table`.
+ Any event listeners associated with this action will be fired
+ off normally. The :class:`~sqlalchemy.schema.AddConstraint`
+ construct is ultimately used to generate the ALTER statement.
+
+ :param name: Name of the unique constraint. The name is necessary
+ so that an ALTER statement can be emitted. For setups that
+ use an automated naming scheme such as that described at
+ :ref:`sqla:constraint_naming_conventions`,
+ ``name`` here can be ``None``, as the event listener will
+ apply the name to the constraint object when it is associated
+ with the table.
+ :param table_name: String name of the source table.
+ :param columns: a list of string column names in the
+ source table.
+ :param deferrable: optional bool. If set, emit DEFERRABLE or
+ NOT DEFERRABLE when issuing DDL for this constraint.
+ :param initially: optional string. If set, emit INITIALLY <value>
+ when issuing DDL for this constraint.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ """ # noqa: E501
+ ...
+
+ def drop_column(
+ self,
+ table_name: str,
+ column_name: str,
+ *,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> None:
+ """Issue a "drop column" instruction using the current
+ migration context.
+
+ e.g.::
+
+ drop_column("organization", "account_id")
+
+ :param table_name: name of table
+ :param column_name: name of column
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+ :param mssql_drop_check: Optional boolean. When ``True``, on
+ Microsoft SQL Server only, first
+ drop the CHECK constraint on the column using a
+ SQL-script-compatible
+ block that selects into a @variable from sys.check_constraints,
+ then exec's a separate DROP CONSTRAINT for that constraint.
+ :param mssql_drop_default: Optional boolean. When ``True``, on
+ Microsoft SQL Server only, first
+ drop the DEFAULT constraint on the column using a
+ SQL-script-compatible
+ block that selects into a @variable from sys.default_constraints,
+ then exec's a separate DROP CONSTRAINT for that default.
+ :param mssql_drop_foreign_key: Optional boolean. When ``True``, on
+ Microsoft SQL Server only, first
+ drop a single FOREIGN KEY constraint on the column using a
+ SQL-script-compatible
+ block that selects into a @variable from
+ sys.foreign_keys/sys.foreign_key_columns,
+ then exec's a separate DROP CONSTRAINT for that default. Only
+ works if the column has exactly one FK constraint which refers to
+ it, at the moment.
+
+ """ # noqa: E501
+ ...
+
+ def drop_constraint(
+ self,
+ constraint_name: str,
+ table_name: str,
+ type_: Optional[str] = None,
+ *,
+ schema: Optional[str] = None,
+ ) -> None:
+ r"""Drop a constraint of the given name, typically via DROP CONSTRAINT.
+
+ :param constraint_name: name of the constraint.
+ :param table_name: table name.
+ :param type\_: optional, required on MySQL. can be
+ 'foreignkey', 'primary', 'unique', or 'check'.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ """ # noqa: E501
+ ...
+
+ def drop_index(
+ self,
+ index_name: str,
+ table_name: Optional[str] = None,
+ *,
+ schema: Optional[str] = None,
+ if_exists: Optional[bool] = None,
+ **kw: Any,
+ ) -> None:
+ r"""Issue a "drop index" instruction using the current
+ migration context.
+
+ e.g.::
+
+ drop_index("accounts")
+
+ :param index_name: name of the index.
+ :param table_name: name of the owning table. Some
+ backends such as Microsoft SQL Server require this.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ :param if_exists: If True, adds IF EXISTS operator when
+ dropping the index.
+
+ .. versionadded:: 1.12.0
+
+ :param \**kw: Additional keyword arguments not mentioned above are
+ dialect specific, and passed in the form
+ ``<dialectname>_<argname>``.
+ See the documentation regarding an individual dialect at
+ :ref:`dialect_toplevel` for detail on documented arguments.
+
+ """ # noqa: E501
+ ...
+
+ def drop_table(
+ self,
+ table_name: str,
+ *,
+ schema: Optional[str] = None,
+ if_exists: Optional[bool] = None,
+ **kw: Any,
+ ) -> None:
+ r"""Issue a "drop table" instruction using the current
+ migration context.
+
+
+ e.g.::
+
+ drop_table("accounts")
+
+ :param table_name: Name of the table
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+ :param if_exists: If True, adds IF EXISTS operator when
+ dropping the table.
+
+ .. versionadded:: 1.13.3
+ :param \**kw: Other keyword arguments are passed to the underlying
+ :class:`sqlalchemy.schema.Table` object created for the command.
+
+ """ # noqa: E501
+ ...
+
+ def drop_table_comment(
+ self,
+ table_name: str,
+ *,
+ existing_comment: Optional[str] = None,
+ schema: Optional[str] = None,
+ ) -> None:
+ """Issue a "drop table comment" operation to
+ remove an existing comment set on a table.
+
+ :param table_name: string name of the target table.
+ :param existing_comment: An optional string value of a comment already
+ registered on the specified table.
+
+ .. seealso::
+
+ :meth:`.Operations.create_table_comment`
+
+ :paramref:`.Operations.alter_column.comment`
+
+ """ # noqa: E501
+ ...
+
+ def execute(
+ self,
+ sqltext: Union[Executable, str],
+ *,
+ execution_options: Optional[dict[str, Any]] = None,
+ ) -> None:
+ r"""Execute the given SQL using the current migration context.
+
+ The given SQL can be a plain string, e.g.::
+
+ op.execute("INSERT INTO table (foo) VALUES ('some value')")
+
+ Or it can be any kind of Core SQL Expression construct, such as
+ below where we use an update construct::
+
+ from sqlalchemy.sql import table, column
+ from sqlalchemy import String
+ from alembic import op
+
+ account = table("account", column("name", String))
+ op.execute(
+ account.update()
+ .where(account.c.name == op.inline_literal("account 1"))
+ .values({"name": op.inline_literal("account 2")})
+ )
+
+ Above, we made use of the SQLAlchemy
+ :func:`sqlalchemy.sql.expression.table` and
+ :func:`sqlalchemy.sql.expression.column` constructs to make a brief,
+ ad-hoc table construct just for our UPDATE statement. A full
+ :class:`~sqlalchemy.schema.Table` construct of course works perfectly
+ fine as well, though note it's a recommended practice to at least
+ ensure the definition of a table is self-contained within the migration
+ script, rather than imported from a module that may break compatibility
+ with older migrations.
+
+ In a SQL script context, the statement is emitted directly to the
+ output stream. There is *no* return result, however, as this
+ function is oriented towards generating a change script
+ that can run in "offline" mode. Additionally, parameterized
+ statements are discouraged here, as they *will not work* in offline
+ mode. Above, we use :meth:`.inline_literal` where parameters are
+ to be used.
+
+ For full interaction with a connected database where parameters can
+ also be used normally, use the "bind" available from the context::
+
+ from alembic import op
+
+ connection = op.get_bind()
+
+ connection.execute(
+ account.update()
+ .where(account.c.name == "account 1")
+ .values({"name": "account 2"})
+ )
+
+ Additionally, when passing the statement as a plain string, it is first
+ coerced into a :func:`sqlalchemy.sql.expression.text` construct
+ before being passed along. In the less likely case that the
+ literal SQL string contains a colon, it must be escaped with a
+ backslash, as::
+
+ op.execute(r"INSERT INTO table (foo) VALUES ('\:colon_value')")
+
+
+ :param sqltext: Any legal SQLAlchemy expression, including:
+
+ * a string
+ * a :func:`sqlalchemy.sql.expression.text` construct.
+ * a :func:`sqlalchemy.sql.expression.insert` construct.
+ * a :func:`sqlalchemy.sql.expression.update` construct.
+ * a :func:`sqlalchemy.sql.expression.delete` construct.
+ * Any "executable" described in SQLAlchemy Core documentation,
+ noting that no result set is returned.
+
+ .. note:: when passing a plain string, the statement is coerced into
+ a :func:`sqlalchemy.sql.expression.text` construct. This construct
+ considers symbols with colons, e.g. ``:foo`` to be bound parameters.
+ To avoid this, ensure that colon symbols are escaped, e.g.
+ ``\:foo``.
+
+ :param execution_options: Optional dictionary of
+ execution options, will be passed to
+ :meth:`sqlalchemy.engine.Connection.execution_options`.
+ """ # noqa: E501
+ ...
+
+ def rename_table(
+ self,
+ old_table_name: str,
+ new_table_name: str,
+ *,
+ schema: Optional[str] = None,
+ ) -> None:
+ """Emit an ALTER TABLE to rename a table.
+
+ :param old_table_name: old name.
+ :param new_table_name: new name.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ """ # noqa: E501
+ ...
+
+ # END STUB FUNCTIONS: op_cls
+
+
+class BatchOperations(AbstractOperations):
+ """Modifies the interface :class:`.Operations` for batch mode.
+
+ This basically omits the ``table_name`` and ``schema`` parameters
+ from associated methods, as these are a given when running under batch
+ mode.
+
+ .. seealso::
+
+ :meth:`.Operations.batch_alter_table`
+
+ Note that as of 0.8, most of the methods on this class are produced
+ dynamically using the :meth:`.Operations.register_operation`
+ method.
+
+ """
+
+ impl: BatchOperationsImpl
+
+ def _noop(self, operation: Any) -> NoReturn:
+ raise NotImplementedError(
+ "The %s method does not apply to a batch table alter operation."
+ % operation
+ )
+
+ if TYPE_CHECKING:
+ # START STUB FUNCTIONS: batch_op
+ # ### the following stubs are generated by tools/write_pyi.py ###
+ # ### do not edit ###
+
+ def add_column(
+ self,
+ column: Column[Any],
+ *,
+ insert_before: Optional[str] = None,
+ insert_after: Optional[str] = None,
+ ) -> None:
+ """Issue an "add column" instruction using the current
+ batch migration context.
+
+ .. seealso::
+
+ :meth:`.Operations.add_column`
+
+ """ # noqa: E501
+ ...
+
+ def alter_column(
+ self,
+ column_name: str,
+ *,
+ nullable: Optional[bool] = None,
+ comment: Union[str, Literal[False], None] = False,
+ server_default: Any = False,
+ new_column_name: Optional[str] = None,
+ type_: Union[TypeEngine[Any], Type[TypeEngine[Any]], None] = None,
+ existing_type: Union[
+ TypeEngine[Any], Type[TypeEngine[Any]], None
+ ] = None,
+ existing_server_default: Union[
+ str, bool, Identity, Computed, None
+ ] = False,
+ existing_nullable: Optional[bool] = None,
+ existing_comment: Optional[str] = None,
+ insert_before: Optional[str] = None,
+ insert_after: Optional[str] = None,
+ **kw: Any,
+ ) -> None:
+ """Issue an "alter column" instruction using the current
+ batch migration context.
+
+ Parameters are the same as that of :meth:`.Operations.alter_column`,
+ as well as the following option(s):
+
+ :param insert_before: String name of an existing column which this
+ column should be placed before, when creating the new table.
+
+ :param insert_after: String name of an existing column which this
+ column should be placed after, when creating the new table. If
+ both :paramref:`.BatchOperations.alter_column.insert_before`
+ and :paramref:`.BatchOperations.alter_column.insert_after` are
+ omitted, the column is inserted after the last existing column
+ in the table.
+
+ .. seealso::
+
+ :meth:`.Operations.alter_column`
+
+
+ """ # noqa: E501
+ ...
+
+ def create_check_constraint(
+ self,
+ constraint_name: str,
+ condition: Union[str, ColumnElement[bool], TextClause],
+ **kw: Any,
+ ) -> None:
+ """Issue a "create check constraint" instruction using the
+ current batch migration context.
+
+ The batch form of this call omits the ``source`` and ``schema``
+ arguments from the call.
+
+ .. seealso::
+
+ :meth:`.Operations.create_check_constraint`
+
+ """ # noqa: E501
+ ...
+
+ def create_exclude_constraint(
+ self, constraint_name: str, *elements: Any, **kw: Any
+ ) -> Optional[Table]:
+ """Issue a "create exclude constraint" instruction using the
+ current batch migration context.
+
+ .. note:: This method is Postgresql specific, and additionally
+ requires at least SQLAlchemy 1.0.
+
+ .. seealso::
+
+ :meth:`.Operations.create_exclude_constraint`
+
+ """ # noqa: E501
+ ...
+
+ def create_foreign_key(
+ self,
+ constraint_name: Optional[str],
+ referent_table: str,
+ local_cols: List[str],
+ remote_cols: List[str],
+ *,
+ referent_schema: Optional[str] = None,
+ onupdate: Optional[str] = None,
+ ondelete: Optional[str] = None,
+ deferrable: Optional[bool] = None,
+ initially: Optional[str] = None,
+ match: Optional[str] = None,
+ **dialect_kw: Any,
+ ) -> None:
+ """Issue a "create foreign key" instruction using the
+ current batch migration context.
+
+ The batch form of this call omits the ``source`` and ``source_schema``
+ arguments from the call.
+
+ e.g.::
+
+ with batch_alter_table("address") as batch_op:
+ batch_op.create_foreign_key(
+ "fk_user_address",
+ "user",
+ ["user_id"],
+ ["id"],
+ )
+
+ .. seealso::
+
+ :meth:`.Operations.create_foreign_key`
+
+ """ # noqa: E501
+ ...
+
+ def create_index(
+ self, index_name: str, columns: List[str], **kw: Any
+ ) -> None:
+ """Issue a "create index" instruction using the
+ current batch migration context.
+
+ .. seealso::
+
+ :meth:`.Operations.create_index`
+
+ """ # noqa: E501
+ ...
+
+ def create_primary_key(
+ self, constraint_name: Optional[str], columns: List[str]
+ ) -> None:
+ """Issue a "create primary key" instruction using the
+ current batch migration context.
+
+ The batch form of this call omits the ``table_name`` and ``schema``
+ arguments from the call.
+
+ .. seealso::
+
+ :meth:`.Operations.create_primary_key`
+
+ """ # noqa: E501
+ ...
+
+ def create_table_comment(
+ self,
+ comment: Optional[str],
+ *,
+ existing_comment: Optional[str] = None,
+ ) -> None:
+ """Emit a COMMENT ON operation to set the comment for a table
+ using the current batch migration context.
+
+ :param comment: string value of the comment being registered against
+ the specified table.
+ :param existing_comment: String value of a comment
+ already registered on the specified table, used within autogenerate
+ so that the operation is reversible, but not required for direct
+ use.
+
+ """ # noqa: E501
+ ...
+
+ def create_unique_constraint(
+ self, constraint_name: str, columns: Sequence[str], **kw: Any
+ ) -> Any:
+ """Issue a "create unique constraint" instruction using the
+ current batch migration context.
+
+ The batch form of this call omits the ``source`` and ``schema``
+ arguments from the call.
+
+ .. seealso::
+
+ :meth:`.Operations.create_unique_constraint`
+
+ """ # noqa: E501
+ ...
+
+ def drop_column(self, column_name: str, **kw: Any) -> None:
+ """Issue a "drop column" instruction using the current
+ batch migration context.
+
+ .. seealso::
+
+ :meth:`.Operations.drop_column`
+
+ """ # noqa: E501
+ ...
+
+ def drop_constraint(
+ self, constraint_name: str, type_: Optional[str] = None
+ ) -> None:
+ """Issue a "drop constraint" instruction using the
+ current batch migration context.
+
+ The batch form of this call omits the ``table_name`` and ``schema``
+ arguments from the call.
+
+ .. seealso::
+
+ :meth:`.Operations.drop_constraint`
+
+ """ # noqa: E501
+ ...
+
+ def drop_index(self, index_name: str, **kw: Any) -> None:
+ """Issue a "drop index" instruction using the
+ current batch migration context.
+
+ .. seealso::
+
+ :meth:`.Operations.drop_index`
+
+ """ # noqa: E501
+ ...
+
+ def drop_table_comment(
+ self, *, existing_comment: Optional[str] = None
+ ) -> None:
+ """Issue a "drop table comment" operation to
+ remove an existing comment set on a table using the current
+ batch operations context.
+
+ :param existing_comment: An optional string value of a comment already
+ registered on the specified table.
+
+ """ # noqa: E501
+ ...
+
+ def execute(
+ self,
+ sqltext: Union[Executable, str],
+ *,
+ execution_options: Optional[dict[str, Any]] = None,
+ ) -> None:
+ """Execute the given SQL using the current migration context.
+
+ .. seealso::
+
+ :meth:`.Operations.execute`
+
+ """ # noqa: E501
+ ...
+
+ # END STUB FUNCTIONS: batch_op
diff --git a/.venv/lib/python3.12/site-packages/alembic/operations/batch.py b/.venv/lib/python3.12/site-packages/alembic/operations/batch.py
new file mode 100644
index 00000000..fe183e9c
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/operations/batch.py
@@ -0,0 +1,718 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from __future__ import annotations
+
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy import CheckConstraint
+from sqlalchemy import Column
+from sqlalchemy import ForeignKeyConstraint
+from sqlalchemy import Index
+from sqlalchemy import MetaData
+from sqlalchemy import PrimaryKeyConstraint
+from sqlalchemy import schema as sql_schema
+from sqlalchemy import select
+from sqlalchemy import Table
+from sqlalchemy import types as sqltypes
+from sqlalchemy.sql.schema import SchemaEventTarget
+from sqlalchemy.util import OrderedDict
+from sqlalchemy.util import topological
+
+from ..util import exc
+from ..util.sqla_compat import _columns_for_constraint
+from ..util.sqla_compat import _copy
+from ..util.sqla_compat import _copy_expression
+from ..util.sqla_compat import _ensure_scope_for_ddl
+from ..util.sqla_compat import _fk_is_self_referential
+from ..util.sqla_compat import _idx_table_bound_expressions
+from ..util.sqla_compat import _is_type_bound
+from ..util.sqla_compat import _remove_column_from_collection
+from ..util.sqla_compat import _resolve_for_variant
+from ..util.sqla_compat import constraint_name_defined
+from ..util.sqla_compat import constraint_name_string
+
+if TYPE_CHECKING:
+ from typing import Literal
+
+ from sqlalchemy.engine import Dialect
+ from sqlalchemy.sql.elements import ColumnClause
+ from sqlalchemy.sql.elements import quoted_name
+ from sqlalchemy.sql.functions import Function
+ from sqlalchemy.sql.schema import Constraint
+ from sqlalchemy.sql.type_api import TypeEngine
+
+ from ..ddl.impl import DefaultImpl
+
+
+class BatchOperationsImpl:
+ def __init__(
+ self,
+ operations,
+ table_name,
+ schema,
+ recreate,
+ copy_from,
+ table_args,
+ table_kwargs,
+ reflect_args,
+ reflect_kwargs,
+ naming_convention,
+ partial_reordering,
+ ):
+ self.operations = operations
+ self.table_name = table_name
+ self.schema = schema
+ if recreate not in ("auto", "always", "never"):
+ raise ValueError(
+ "recreate may be one of 'auto', 'always', or 'never'."
+ )
+ self.recreate = recreate
+ self.copy_from = copy_from
+ self.table_args = table_args
+ self.table_kwargs = dict(table_kwargs)
+ self.reflect_args = reflect_args
+ self.reflect_kwargs = dict(reflect_kwargs)
+ self.reflect_kwargs.setdefault(
+ "listeners", list(self.reflect_kwargs.get("listeners", ()))
+ )
+ self.reflect_kwargs["listeners"].append(
+ ("column_reflect", operations.impl.autogen_column_reflect)
+ )
+ self.naming_convention = naming_convention
+ self.partial_reordering = partial_reordering
+ self.batch = []
+
+ @property
+ def dialect(self) -> Dialect:
+ return self.operations.impl.dialect
+
+ @property
+ def impl(self) -> DefaultImpl:
+ return self.operations.impl
+
+ def _should_recreate(self) -> bool:
+ if self.recreate == "auto":
+ return self.operations.impl.requires_recreate_in_batch(self)
+ elif self.recreate == "always":
+ return True
+ else:
+ return False
+
+ def flush(self) -> None:
+ should_recreate = self._should_recreate()
+
+ with _ensure_scope_for_ddl(self.impl.connection):
+ if not should_recreate:
+ for opname, arg, kw in self.batch:
+ fn = getattr(self.operations.impl, opname)
+ fn(*arg, **kw)
+ else:
+ if self.naming_convention:
+ m1 = MetaData(naming_convention=self.naming_convention)
+ else:
+ m1 = MetaData()
+
+ if self.copy_from is not None:
+ existing_table = self.copy_from
+ reflected = False
+ else:
+ if self.operations.migration_context.as_sql:
+ raise exc.CommandError(
+ f"This operation cannot proceed in --sql mode; "
+ f"batch mode with dialect "
+ f"{self.operations.migration_context.dialect.name} " # noqa: E501
+ f"requires a live database connection with which "
+ f'to reflect the table "{self.table_name}". '
+ f"To generate a batch SQL migration script using "
+ "table "
+ '"move and copy", a complete Table object '
+ f'should be passed to the "copy_from" argument '
+ "of the batch_alter_table() method so that table "
+ "reflection can be skipped."
+ )
+
+ existing_table = Table(
+ self.table_name,
+ m1,
+ schema=self.schema,
+ autoload_with=self.operations.get_bind(),
+ *self.reflect_args,
+ **self.reflect_kwargs,
+ )
+ reflected = True
+
+ batch_impl = ApplyBatchImpl(
+ self.impl,
+ existing_table,
+ self.table_args,
+ self.table_kwargs,
+ reflected,
+ partial_reordering=self.partial_reordering,
+ )
+ for opname, arg, kw in self.batch:
+ fn = getattr(batch_impl, opname)
+ fn(*arg, **kw)
+
+ batch_impl._create(self.impl)
+
+ def alter_column(self, *arg, **kw) -> None:
+ self.batch.append(("alter_column", arg, kw))
+
+ def add_column(self, *arg, **kw) -> None:
+ if (
+ "insert_before" in kw or "insert_after" in kw
+ ) and not self._should_recreate():
+ raise exc.CommandError(
+ "Can't specify insert_before or insert_after when using "
+ "ALTER; please specify recreate='always'"
+ )
+ self.batch.append(("add_column", arg, kw))
+
+ def drop_column(self, *arg, **kw) -> None:
+ self.batch.append(("drop_column", arg, kw))
+
+ def add_constraint(self, const: Constraint) -> None:
+ self.batch.append(("add_constraint", (const,), {}))
+
+ def drop_constraint(self, const: Constraint) -> None:
+ self.batch.append(("drop_constraint", (const,), {}))
+
+ def rename_table(self, *arg, **kw):
+ self.batch.append(("rename_table", arg, kw))
+
+ def create_index(self, idx: Index, **kw: Any) -> None:
+ self.batch.append(("create_index", (idx,), kw))
+
+ def drop_index(self, idx: Index, **kw: Any) -> None:
+ self.batch.append(("drop_index", (idx,), kw))
+
+ def create_table_comment(self, table):
+ self.batch.append(("create_table_comment", (table,), {}))
+
+ def drop_table_comment(self, table):
+ self.batch.append(("drop_table_comment", (table,), {}))
+
+ def create_table(self, table):
+ raise NotImplementedError("Can't create table in batch mode")
+
+ def drop_table(self, table):
+ raise NotImplementedError("Can't drop table in batch mode")
+
+ def create_column_comment(self, column):
+ self.batch.append(("create_column_comment", (column,), {}))
+
+
+class ApplyBatchImpl:
+ def __init__(
+ self,
+ impl: DefaultImpl,
+ table: Table,
+ table_args: tuple,
+ table_kwargs: Dict[str, Any],
+ reflected: bool,
+ partial_reordering: tuple = (),
+ ) -> None:
+ self.impl = impl
+ self.table = table # this is a Table object
+ self.table_args = table_args
+ self.table_kwargs = table_kwargs
+ self.temp_table_name = self._calc_temp_name(table.name)
+ self.new_table: Optional[Table] = None
+
+ self.partial_reordering = partial_reordering # tuple of tuples
+ self.add_col_ordering: Tuple[
+ Tuple[str, str], ...
+ ] = () # tuple of tuples
+
+ self.column_transfers = OrderedDict(
+ (c.name, {"expr": c}) for c in self.table.c
+ )
+ self.existing_ordering = list(self.column_transfers)
+
+ self.reflected = reflected
+ self._grab_table_elements()
+
+ @classmethod
+ def _calc_temp_name(cls, tablename: Union[quoted_name, str]) -> str:
+ return ("_alembic_tmp_%s" % tablename)[0:50]
+
+ def _grab_table_elements(self) -> None:
+ schema = self.table.schema
+ self.columns: Dict[str, Column[Any]] = OrderedDict()
+ for c in self.table.c:
+ c_copy = _copy(c, schema=schema)
+ c_copy.unique = c_copy.index = False
+ # ensure that the type object was copied,
+ # as we may need to modify it in-place
+ if isinstance(c.type, SchemaEventTarget):
+ assert c_copy.type is not c.type
+ self.columns[c.name] = c_copy
+ self.named_constraints: Dict[str, Constraint] = {}
+ self.unnamed_constraints = []
+ self.col_named_constraints = {}
+ self.indexes: Dict[str, Index] = {}
+ self.new_indexes: Dict[str, Index] = {}
+
+ for const in self.table.constraints:
+ if _is_type_bound(const):
+ continue
+ elif (
+ self.reflected
+ and isinstance(const, CheckConstraint)
+ and not const.name
+ ):
+ # TODO: we are skipping unnamed reflected CheckConstraint
+ # because
+ # we have no way to determine _is_type_bound() for these.
+ pass
+ elif constraint_name_string(const.name):
+ self.named_constraints[const.name] = const
+ else:
+ self.unnamed_constraints.append(const)
+
+ if not self.reflected:
+ for col in self.table.c:
+ for const in col.constraints:
+ if const.name:
+ self.col_named_constraints[const.name] = (col, const)
+
+ for idx in self.table.indexes:
+ self.indexes[idx.name] = idx # type: ignore[index]
+
+ for k in self.table.kwargs:
+ self.table_kwargs.setdefault(k, self.table.kwargs[k])
+
+ def _adjust_self_columns_for_partial_reordering(self) -> None:
+ pairs = set()
+
+ col_by_idx = list(self.columns)
+
+ if self.partial_reordering:
+ for tuple_ in self.partial_reordering:
+ for index, elem in enumerate(tuple_):
+ if index > 0:
+ pairs.add((tuple_[index - 1], elem))
+ else:
+ for index, elem in enumerate(self.existing_ordering):
+ if index > 0:
+ pairs.add((col_by_idx[index - 1], elem))
+
+ pairs.update(self.add_col_ordering)
+
+ # this can happen if some columns were dropped and not removed
+ # from existing_ordering. this should be prevented already, but
+ # conservatively making sure this didn't happen
+ pairs_list = [p for p in pairs if p[0] != p[1]]
+
+ sorted_ = list(
+ topological.sort(pairs_list, col_by_idx, deterministic_order=True)
+ )
+ self.columns = OrderedDict((k, self.columns[k]) for k in sorted_)
+ self.column_transfers = OrderedDict(
+ (k, self.column_transfers[k]) for k in sorted_
+ )
+
+ def _transfer_elements_to_new_table(self) -> None:
+ assert self.new_table is None, "Can only create new table once"
+
+ m = MetaData()
+ schema = self.table.schema
+
+ if self.partial_reordering or self.add_col_ordering:
+ self._adjust_self_columns_for_partial_reordering()
+
+ self.new_table = new_table = Table(
+ self.temp_table_name,
+ m,
+ *(list(self.columns.values()) + list(self.table_args)),
+ schema=schema,
+ **self.table_kwargs,
+ )
+
+ for const in (
+ list(self.named_constraints.values()) + self.unnamed_constraints
+ ):
+ const_columns = {c.key for c in _columns_for_constraint(const)}
+
+ if not const_columns.issubset(self.column_transfers):
+ continue
+
+ const_copy: Constraint
+ if isinstance(const, ForeignKeyConstraint):
+ if _fk_is_self_referential(const):
+ # for self-referential constraint, refer to the
+ # *original* table name, and not _alembic_batch_temp.
+ # This is consistent with how we're handling
+ # FK constraints from other tables; we assume SQLite
+ # no foreign keys just keeps the names unchanged, so
+ # when we rename back, they match again.
+ const_copy = _copy(
+ const, schema=schema, target_table=self.table
+ )
+ else:
+ # "target_table" for ForeignKeyConstraint.copy() is
+ # only used if the FK is detected as being
+ # self-referential, which we are handling above.
+ const_copy = _copy(const, schema=schema)
+ else:
+ const_copy = _copy(
+ const, schema=schema, target_table=new_table
+ )
+ if isinstance(const, ForeignKeyConstraint):
+ self._setup_referent(m, const)
+ new_table.append_constraint(const_copy)
+
+ def _gather_indexes_from_both_tables(self) -> List[Index]:
+ assert self.new_table is not None
+ idx: List[Index] = []
+
+ for idx_existing in self.indexes.values():
+ # this is a lift-and-move from Table.to_metadata
+
+ if idx_existing._column_flag:
+ continue
+
+ idx_copy = Index(
+ idx_existing.name,
+ unique=idx_existing.unique,
+ *[
+ _copy_expression(expr, self.new_table)
+ for expr in _idx_table_bound_expressions(idx_existing)
+ ],
+ _table=self.new_table,
+ **idx_existing.kwargs,
+ )
+ idx.append(idx_copy)
+
+ for index in self.new_indexes.values():
+ idx.append(
+ Index(
+ index.name,
+ unique=index.unique,
+ *[self.new_table.c[col] for col in index.columns.keys()],
+ **index.kwargs,
+ )
+ )
+ return idx
+
+ def _setup_referent(
+ self, metadata: MetaData, constraint: ForeignKeyConstraint
+ ) -> None:
+ spec = constraint.elements[0]._get_colspec()
+ parts = spec.split(".")
+ tname = parts[-2]
+ if len(parts) == 3:
+ referent_schema = parts[0]
+ else:
+ referent_schema = None
+
+ if tname != self.temp_table_name:
+ key = sql_schema._get_table_key(tname, referent_schema)
+
+ def colspec(elem: Any):
+ return elem._get_colspec()
+
+ if key in metadata.tables:
+ t = metadata.tables[key]
+ for elem in constraint.elements:
+ colname = colspec(elem).split(".")[-1]
+ if colname not in t.c:
+ t.append_column(Column(colname, sqltypes.NULLTYPE))
+ else:
+ Table(
+ tname,
+ metadata,
+ *[
+ Column(n, sqltypes.NULLTYPE)
+ for n in [
+ colspec(elem).split(".")[-1]
+ for elem in constraint.elements
+ ]
+ ],
+ schema=referent_schema,
+ )
+
+ def _create(self, op_impl: DefaultImpl) -> None:
+ self._transfer_elements_to_new_table()
+
+ op_impl.prep_table_for_batch(self, self.table)
+ assert self.new_table is not None
+ op_impl.create_table(self.new_table)
+
+ try:
+ op_impl._exec(
+ self.new_table.insert()
+ .inline()
+ .from_select(
+ list(
+ k
+ for k, transfer in self.column_transfers.items()
+ if "expr" in transfer
+ ),
+ select(
+ *[
+ transfer["expr"]
+ for transfer in self.column_transfers.values()
+ if "expr" in transfer
+ ]
+ ),
+ )
+ )
+ op_impl.drop_table(self.table)
+ except:
+ op_impl.drop_table(self.new_table)
+ raise
+ else:
+ op_impl.rename_table(
+ self.temp_table_name, self.table.name, schema=self.table.schema
+ )
+ self.new_table.name = self.table.name
+ try:
+ for idx in self._gather_indexes_from_both_tables():
+ op_impl.create_index(idx)
+ finally:
+ self.new_table.name = self.temp_table_name
+
+ def alter_column(
+ self,
+ table_name: str,
+ column_name: str,
+ nullable: Optional[bool] = None,
+ server_default: Optional[Union[Function[Any], str, bool]] = False,
+ name: Optional[str] = None,
+ type_: Optional[TypeEngine] = None,
+ autoincrement: Optional[Union[bool, Literal["auto"]]] = None,
+ comment: Union[str, Literal[False]] = False,
+ **kw,
+ ) -> None:
+ existing = self.columns[column_name]
+ existing_transfer: Dict[str, Any] = self.column_transfers[column_name]
+ if name is not None and name != column_name:
+ # note that we don't change '.key' - we keep referring
+ # to the renamed column by its old key in _create(). neat!
+ existing.name = name
+ existing_transfer["name"] = name
+
+ existing_type = kw.get("existing_type", None)
+ if existing_type:
+ resolved_existing_type = _resolve_for_variant(
+ kw["existing_type"], self.impl.dialect
+ )
+
+ # pop named constraints for Boolean/Enum for rename
+ if (
+ isinstance(resolved_existing_type, SchemaEventTarget)
+ and resolved_existing_type.name # type:ignore[attr-defined] # noqa E501
+ ):
+ self.named_constraints.pop(
+ resolved_existing_type.name, # type:ignore[attr-defined] # noqa E501
+ None,
+ )
+
+ if type_ is not None:
+ type_ = sqltypes.to_instance(type_)
+ # old type is being discarded so turn off eventing
+ # rules. Alternatively we can
+ # erase the events set up by this type, but this is simpler.
+ # we also ignore the drop_constraint that will come here from
+ # Operations.implementation_for(alter_column)
+
+ if isinstance(existing.type, SchemaEventTarget):
+ existing.type._create_events = ( # type:ignore[attr-defined]
+ existing.type.create_constraint # type:ignore[attr-defined] # noqa
+ ) = False
+
+ self.impl.cast_for_batch_migrate(
+ existing, existing_transfer, type_
+ )
+
+ existing.type = type_
+
+ # we *dont* however set events for the new type, because
+ # alter_column is invoked from
+ # Operations.implementation_for(alter_column) which already
+ # will emit an add_constraint()
+
+ if nullable is not None:
+ existing.nullable = nullable
+ if server_default is not False:
+ if server_default is None:
+ existing.server_default = None
+ else:
+ sql_schema.DefaultClause(
+ server_default # type: ignore[arg-type]
+ )._set_parent(existing)
+ if autoincrement is not None:
+ existing.autoincrement = bool(autoincrement)
+
+ if comment is not False:
+ existing.comment = comment
+
+ def _setup_dependencies_for_add_column(
+ self,
+ colname: str,
+ insert_before: Optional[str],
+ insert_after: Optional[str],
+ ) -> None:
+ index_cols = self.existing_ordering
+ col_indexes = {name: i for i, name in enumerate(index_cols)}
+
+ if not self.partial_reordering:
+ if insert_after:
+ if not insert_before:
+ if insert_after in col_indexes:
+ # insert after an existing column
+ idx = col_indexes[insert_after] + 1
+ if idx < len(index_cols):
+ insert_before = index_cols[idx]
+ else:
+ # insert after a column that is also new
+ insert_before = dict(self.add_col_ordering)[
+ insert_after
+ ]
+ if insert_before:
+ if not insert_after:
+ if insert_before in col_indexes:
+ # insert before an existing column
+ idx = col_indexes[insert_before] - 1
+ if idx >= 0:
+ insert_after = index_cols[idx]
+ else:
+ # insert before a column that is also new
+ insert_after = {
+ b: a for a, b in self.add_col_ordering
+ }[insert_before]
+
+ if insert_before:
+ self.add_col_ordering += ((colname, insert_before),)
+ if insert_after:
+ self.add_col_ordering += ((insert_after, colname),)
+
+ if (
+ not self.partial_reordering
+ and not insert_before
+ and not insert_after
+ and col_indexes
+ ):
+ self.add_col_ordering += ((index_cols[-1], colname),)
+
+ def add_column(
+ self,
+ table_name: str,
+ column: Column[Any],
+ insert_before: Optional[str] = None,
+ insert_after: Optional[str] = None,
+ **kw,
+ ) -> None:
+ self._setup_dependencies_for_add_column(
+ column.name, insert_before, insert_after
+ )
+ # we copy the column because operations.add_column()
+ # gives us a Column that is part of a Table already.
+ self.columns[column.name] = _copy(column, schema=self.table.schema)
+ self.column_transfers[column.name] = {}
+
+ def drop_column(
+ self,
+ table_name: str,
+ column: Union[ColumnClause[Any], Column[Any]],
+ **kw,
+ ) -> None:
+ if column.name in self.table.primary_key.columns:
+ _remove_column_from_collection(
+ self.table.primary_key.columns, column
+ )
+ del self.columns[column.name]
+ del self.column_transfers[column.name]
+ self.existing_ordering.remove(column.name)
+
+ # pop named constraints for Boolean/Enum for rename
+ if (
+ "existing_type" in kw
+ and isinstance(kw["existing_type"], SchemaEventTarget)
+ and kw["existing_type"].name # type:ignore[attr-defined]
+ ):
+ self.named_constraints.pop(
+ kw["existing_type"].name, None # type:ignore[attr-defined]
+ )
+
+ def create_column_comment(self, column):
+ """the batch table creation function will issue create_column_comment
+ on the real "impl" as part of the create table process.
+
+ That is, the Column object will have the comment on it already,
+ so when it is received by add_column() it will be a normal part of
+ the CREATE TABLE and doesn't need an extra step here.
+
+ """
+
+ def create_table_comment(self, table):
+ """the batch table creation function will issue create_table_comment
+ on the real "impl" as part of the create table process.
+
+ """
+
+ def drop_table_comment(self, table):
+ """the batch table creation function will issue drop_table_comment
+ on the real "impl" as part of the create table process.
+
+ """
+
+ def add_constraint(self, const: Constraint) -> None:
+ if not constraint_name_defined(const.name):
+ raise ValueError("Constraint must have a name")
+ if isinstance(const, sql_schema.PrimaryKeyConstraint):
+ if self.table.primary_key in self.unnamed_constraints:
+ self.unnamed_constraints.remove(self.table.primary_key)
+
+ if constraint_name_string(const.name):
+ self.named_constraints[const.name] = const
+ else:
+ self.unnamed_constraints.append(const)
+
+ def drop_constraint(self, const: Constraint) -> None:
+ if not const.name:
+ raise ValueError("Constraint must have a name")
+ try:
+ if const.name in self.col_named_constraints:
+ col, const = self.col_named_constraints.pop(const.name)
+
+ for col_const in list(self.columns[col.name].constraints):
+ if col_const.name == const.name:
+ self.columns[col.name].constraints.remove(col_const)
+ elif constraint_name_string(const.name):
+ const = self.named_constraints.pop(const.name)
+ elif const in self.unnamed_constraints:
+ self.unnamed_constraints.remove(const)
+
+ except KeyError:
+ if _is_type_bound(const):
+ # type-bound constraints are only included in the new
+ # table via their type object in any case, so ignore the
+ # drop_constraint() that comes here via the
+ # Operations.implementation_for(alter_column)
+ return
+ raise ValueError("No such constraint: '%s'" % const.name)
+ else:
+ if isinstance(const, PrimaryKeyConstraint):
+ for col in const.columns:
+ self.columns[col.name].primary_key = False
+
+ def create_index(self, idx: Index) -> None:
+ self.new_indexes[idx.name] = idx # type: ignore[index]
+
+ def drop_index(self, idx: Index) -> None:
+ try:
+ del self.indexes[idx.name] # type: ignore[arg-type]
+ except KeyError:
+ raise ValueError("No such index: '%s'" % idx.name)
+
+ def rename_table(self, *arg, **kw):
+ raise NotImplementedError("TODO")
diff --git a/.venv/lib/python3.12/site-packages/alembic/operations/ops.py b/.venv/lib/python3.12/site-packages/alembic/operations/ops.py
new file mode 100644
index 00000000..bb4d825b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/operations/ops.py
@@ -0,0 +1,2799 @@
+from __future__ import annotations
+
+from abc import abstractmethod
+import re
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import FrozenSet
+from typing import Iterator
+from typing import List
+from typing import MutableMapping
+from typing import Optional
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from sqlalchemy.types import NULLTYPE
+
+from . import schemaobj
+from .base import BatchOperations
+from .base import Operations
+from .. import util
+from ..util import sqla_compat
+
+if TYPE_CHECKING:
+ from typing import Literal
+
+ from sqlalchemy.sql import Executable
+ from sqlalchemy.sql.elements import ColumnElement
+ from sqlalchemy.sql.elements import conv
+ from sqlalchemy.sql.elements import quoted_name
+ from sqlalchemy.sql.elements import TextClause
+ from sqlalchemy.sql.schema import CheckConstraint
+ from sqlalchemy.sql.schema import Column
+ from sqlalchemy.sql.schema import Computed
+ from sqlalchemy.sql.schema import Constraint
+ from sqlalchemy.sql.schema import ForeignKeyConstraint
+ from sqlalchemy.sql.schema import Identity
+ from sqlalchemy.sql.schema import Index
+ from sqlalchemy.sql.schema import MetaData
+ from sqlalchemy.sql.schema import PrimaryKeyConstraint
+ from sqlalchemy.sql.schema import SchemaItem
+ from sqlalchemy.sql.schema import Table
+ from sqlalchemy.sql.schema import UniqueConstraint
+ from sqlalchemy.sql.selectable import TableClause
+ from sqlalchemy.sql.type_api import TypeEngine
+
+ from ..autogenerate.rewriter import Rewriter
+ from ..runtime.migration import MigrationContext
+ from ..script.revision import _RevIdType
+
+_T = TypeVar("_T", bound=Any)
+_AC = TypeVar("_AC", bound="AddConstraintOp")
+
+
+class MigrateOperation:
+ """base class for migration command and organization objects.
+
+ This system is part of the operation extensibility API.
+
+ .. seealso::
+
+ :ref:`operation_objects`
+
+ :ref:`operation_plugins`
+
+ :ref:`customizing_revision`
+
+ """
+
+ @util.memoized_property
+ def info(self) -> Dict[Any, Any]:
+ """A dictionary that may be used to store arbitrary information
+ along with this :class:`.MigrateOperation` object.
+
+ """
+ return {}
+
+ _mutations: FrozenSet[Rewriter] = frozenset()
+
+ def reverse(self) -> MigrateOperation:
+ raise NotImplementedError
+
+ def to_diff_tuple(self) -> Tuple[Any, ...]:
+ raise NotImplementedError
+
+
+class AddConstraintOp(MigrateOperation):
+ """Represent an add constraint operation."""
+
+ add_constraint_ops = util.Dispatcher()
+
+ @property
+ def constraint_type(self) -> str:
+ raise NotImplementedError()
+
+ @classmethod
+ def register_add_constraint(
+ cls, type_: str
+ ) -> Callable[[Type[_AC]], Type[_AC]]:
+ def go(klass: Type[_AC]) -> Type[_AC]:
+ cls.add_constraint_ops.dispatch_for(type_)(klass.from_constraint)
+ return klass
+
+ return go
+
+ @classmethod
+ def from_constraint(cls, constraint: Constraint) -> AddConstraintOp:
+ return cls.add_constraint_ops.dispatch(constraint.__visit_name__)( # type: ignore[no-any-return] # noqa: E501
+ constraint
+ )
+
+ @abstractmethod
+ def to_constraint(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> Constraint:
+ pass
+
+ def reverse(self) -> DropConstraintOp:
+ return DropConstraintOp.from_constraint(self.to_constraint())
+
+ def to_diff_tuple(self) -> Tuple[str, Constraint]:
+ return ("add_constraint", self.to_constraint())
+
+
+@Operations.register_operation("drop_constraint")
+@BatchOperations.register_operation("drop_constraint", "batch_drop_constraint")
+class DropConstraintOp(MigrateOperation):
+ """Represent a drop constraint operation."""
+
+ def __init__(
+ self,
+ constraint_name: Optional[sqla_compat._ConstraintNameDefined],
+ table_name: str,
+ type_: Optional[str] = None,
+ *,
+ schema: Optional[str] = None,
+ _reverse: Optional[AddConstraintOp] = None,
+ ) -> None:
+ self.constraint_name = constraint_name
+ self.table_name = table_name
+ self.constraint_type = type_
+ self.schema = schema
+ self._reverse = _reverse
+
+ def reverse(self) -> AddConstraintOp:
+ return AddConstraintOp.from_constraint(self.to_constraint())
+
+ def to_diff_tuple(
+ self,
+ ) -> Tuple[str, SchemaItem]:
+ if self.constraint_type == "foreignkey":
+ return ("remove_fk", self.to_constraint())
+ else:
+ return ("remove_constraint", self.to_constraint())
+
+ @classmethod
+ def from_constraint(cls, constraint: Constraint) -> DropConstraintOp:
+ types = {
+ "unique_constraint": "unique",
+ "foreign_key_constraint": "foreignkey",
+ "primary_key_constraint": "primary",
+ "check_constraint": "check",
+ "column_check_constraint": "check",
+ "table_or_column_check_constraint": "check",
+ }
+
+ constraint_table = sqla_compat._table_for_constraint(constraint)
+ return cls(
+ sqla_compat.constraint_name_or_none(constraint.name),
+ constraint_table.name,
+ schema=constraint_table.schema,
+ type_=types.get(constraint.__visit_name__),
+ _reverse=AddConstraintOp.from_constraint(constraint),
+ )
+
+ def to_constraint(self) -> Constraint:
+ if self._reverse is not None:
+ constraint = self._reverse.to_constraint()
+ constraint.name = self.constraint_name
+ constraint_table = sqla_compat._table_for_constraint(constraint)
+ constraint_table.name = self.table_name
+ constraint_table.schema = self.schema
+
+ return constraint
+ else:
+ raise ValueError(
+ "constraint cannot be produced; "
+ "original constraint is not present"
+ )
+
+ @classmethod
+ def drop_constraint(
+ cls,
+ operations: Operations,
+ constraint_name: str,
+ table_name: str,
+ type_: Optional[str] = None,
+ *,
+ schema: Optional[str] = None,
+ ) -> None:
+ r"""Drop a constraint of the given name, typically via DROP CONSTRAINT.
+
+ :param constraint_name: name of the constraint.
+ :param table_name: table name.
+ :param type\_: optional, required on MySQL. can be
+ 'foreignkey', 'primary', 'unique', or 'check'.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ """
+
+ op = cls(constraint_name, table_name, type_=type_, schema=schema)
+ return operations.invoke(op)
+
+ @classmethod
+ def batch_drop_constraint(
+ cls,
+ operations: BatchOperations,
+ constraint_name: str,
+ type_: Optional[str] = None,
+ ) -> None:
+ """Issue a "drop constraint" instruction using the
+ current batch migration context.
+
+ The batch form of this call omits the ``table_name`` and ``schema``
+ arguments from the call.
+
+ .. seealso::
+
+ :meth:`.Operations.drop_constraint`
+
+ """
+ op = cls(
+ constraint_name,
+ operations.impl.table_name,
+ type_=type_,
+ schema=operations.impl.schema,
+ )
+ return operations.invoke(op)
+
+
+@Operations.register_operation("create_primary_key")
+@BatchOperations.register_operation(
+ "create_primary_key", "batch_create_primary_key"
+)
+@AddConstraintOp.register_add_constraint("primary_key_constraint")
+class CreatePrimaryKeyOp(AddConstraintOp):
+ """Represent a create primary key operation."""
+
+ constraint_type = "primarykey"
+
+ def __init__(
+ self,
+ constraint_name: Optional[sqla_compat._ConstraintNameDefined],
+ table_name: str,
+ columns: Sequence[str],
+ *,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> None:
+ self.constraint_name = constraint_name
+ self.table_name = table_name
+ self.columns = columns
+ self.schema = schema
+ self.kw = kw
+
+ @classmethod
+ def from_constraint(cls, constraint: Constraint) -> CreatePrimaryKeyOp:
+ constraint_table = sqla_compat._table_for_constraint(constraint)
+ pk_constraint = cast("PrimaryKeyConstraint", constraint)
+ return cls(
+ sqla_compat.constraint_name_or_none(pk_constraint.name),
+ constraint_table.name,
+ pk_constraint.columns.keys(),
+ schema=constraint_table.schema,
+ **pk_constraint.dialect_kwargs,
+ )
+
+ def to_constraint(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> PrimaryKeyConstraint:
+ schema_obj = schemaobj.SchemaObjects(migration_context)
+
+ return schema_obj.primary_key_constraint(
+ self.constraint_name,
+ self.table_name,
+ self.columns,
+ schema=self.schema,
+ **self.kw,
+ )
+
+ @classmethod
+ def create_primary_key(
+ cls,
+ operations: Operations,
+ constraint_name: Optional[str],
+ table_name: str,
+ columns: List[str],
+ *,
+ schema: Optional[str] = None,
+ ) -> None:
+ """Issue a "create primary key" instruction using the current
+ migration context.
+
+ e.g.::
+
+ from alembic import op
+
+ op.create_primary_key("pk_my_table", "my_table", ["id", "version"])
+
+ This internally generates a :class:`~sqlalchemy.schema.Table` object
+ containing the necessary columns, then generates a new
+ :class:`~sqlalchemy.schema.PrimaryKeyConstraint`
+ object which it then associates with the
+ :class:`~sqlalchemy.schema.Table`.
+ Any event listeners associated with this action will be fired
+ off normally. The :class:`~sqlalchemy.schema.AddConstraint`
+ construct is ultimately used to generate the ALTER statement.
+
+ :param constraint_name: Name of the primary key constraint. The name
+ is necessary so that an ALTER statement can be emitted. For setups
+ that use an automated naming scheme such as that described at
+ :ref:`sqla:constraint_naming_conventions`
+ ``name`` here can be ``None``, as the event listener will
+ apply the name to the constraint object when it is associated
+ with the table.
+ :param table_name: String name of the target table.
+ :param columns: a list of string column names to be applied to the
+ primary key constraint.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ """
+ op = cls(constraint_name, table_name, columns, schema=schema)
+ return operations.invoke(op)
+
+ @classmethod
+ def batch_create_primary_key(
+ cls,
+ operations: BatchOperations,
+ constraint_name: Optional[str],
+ columns: List[str],
+ ) -> None:
+ """Issue a "create primary key" instruction using the
+ current batch migration context.
+
+ The batch form of this call omits the ``table_name`` and ``schema``
+ arguments from the call.
+
+ .. seealso::
+
+ :meth:`.Operations.create_primary_key`
+
+ """
+ op = cls(
+ constraint_name,
+ operations.impl.table_name,
+ columns,
+ schema=operations.impl.schema,
+ )
+ return operations.invoke(op)
+
+
+@Operations.register_operation("create_unique_constraint")
+@BatchOperations.register_operation(
+ "create_unique_constraint", "batch_create_unique_constraint"
+)
+@AddConstraintOp.register_add_constraint("unique_constraint")
+class CreateUniqueConstraintOp(AddConstraintOp):
+ """Represent a create unique constraint operation."""
+
+ constraint_type = "unique"
+
+ def __init__(
+ self,
+ constraint_name: Optional[sqla_compat._ConstraintNameDefined],
+ table_name: str,
+ columns: Sequence[str],
+ *,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> None:
+ self.constraint_name = constraint_name
+ self.table_name = table_name
+ self.columns = columns
+ self.schema = schema
+ self.kw = kw
+
+ @classmethod
+ def from_constraint(
+ cls, constraint: Constraint
+ ) -> CreateUniqueConstraintOp:
+ constraint_table = sqla_compat._table_for_constraint(constraint)
+
+ uq_constraint = cast("UniqueConstraint", constraint)
+
+ kw: Dict[str, Any] = {}
+ if uq_constraint.deferrable:
+ kw["deferrable"] = uq_constraint.deferrable
+ if uq_constraint.initially:
+ kw["initially"] = uq_constraint.initially
+ kw.update(uq_constraint.dialect_kwargs)
+ return cls(
+ sqla_compat.constraint_name_or_none(uq_constraint.name),
+ constraint_table.name,
+ [c.name for c in uq_constraint.columns],
+ schema=constraint_table.schema,
+ **kw,
+ )
+
+ def to_constraint(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> UniqueConstraint:
+ schema_obj = schemaobj.SchemaObjects(migration_context)
+ return schema_obj.unique_constraint(
+ self.constraint_name,
+ self.table_name,
+ self.columns,
+ schema=self.schema,
+ **self.kw,
+ )
+
+ @classmethod
+ def create_unique_constraint(
+ cls,
+ operations: Operations,
+ constraint_name: Optional[str],
+ table_name: str,
+ columns: Sequence[str],
+ *,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> Any:
+ """Issue a "create unique constraint" instruction using the
+ current migration context.
+
+ e.g.::
+
+ from alembic import op
+ op.create_unique_constraint("uq_user_name", "user", ["name"])
+
+ This internally generates a :class:`~sqlalchemy.schema.Table` object
+ containing the necessary columns, then generates a new
+ :class:`~sqlalchemy.schema.UniqueConstraint`
+ object which it then associates with the
+ :class:`~sqlalchemy.schema.Table`.
+ Any event listeners associated with this action will be fired
+ off normally. The :class:`~sqlalchemy.schema.AddConstraint`
+ construct is ultimately used to generate the ALTER statement.
+
+ :param name: Name of the unique constraint. The name is necessary
+ so that an ALTER statement can be emitted. For setups that
+ use an automated naming scheme such as that described at
+ :ref:`sqla:constraint_naming_conventions`,
+ ``name`` here can be ``None``, as the event listener will
+ apply the name to the constraint object when it is associated
+ with the table.
+ :param table_name: String name of the source table.
+ :param columns: a list of string column names in the
+ source table.
+ :param deferrable: optional bool. If set, emit DEFERRABLE or
+ NOT DEFERRABLE when issuing DDL for this constraint.
+ :param initially: optional string. If set, emit INITIALLY <value>
+ when issuing DDL for this constraint.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ """
+
+ op = cls(constraint_name, table_name, columns, schema=schema, **kw)
+ return operations.invoke(op)
+
+ @classmethod
+ def batch_create_unique_constraint(
+ cls,
+ operations: BatchOperations,
+ constraint_name: str,
+ columns: Sequence[str],
+ **kw: Any,
+ ) -> Any:
+ """Issue a "create unique constraint" instruction using the
+ current batch migration context.
+
+ The batch form of this call omits the ``source`` and ``schema``
+ arguments from the call.
+
+ .. seealso::
+
+ :meth:`.Operations.create_unique_constraint`
+
+ """
+ kw["schema"] = operations.impl.schema
+ op = cls(constraint_name, operations.impl.table_name, columns, **kw)
+ return operations.invoke(op)
+
+
+@Operations.register_operation("create_foreign_key")
+@BatchOperations.register_operation(
+ "create_foreign_key", "batch_create_foreign_key"
+)
+@AddConstraintOp.register_add_constraint("foreign_key_constraint")
+class CreateForeignKeyOp(AddConstraintOp):
+ """Represent a create foreign key constraint operation."""
+
+ constraint_type = "foreignkey"
+
+ def __init__(
+ self,
+ constraint_name: Optional[sqla_compat._ConstraintNameDefined],
+ source_table: str,
+ referent_table: str,
+ local_cols: List[str],
+ remote_cols: List[str],
+ **kw: Any,
+ ) -> None:
+ self.constraint_name = constraint_name
+ self.source_table = source_table
+ self.referent_table = referent_table
+ self.local_cols = local_cols
+ self.remote_cols = remote_cols
+ self.kw = kw
+
+ def to_diff_tuple(self) -> Tuple[str, ForeignKeyConstraint]:
+ return ("add_fk", self.to_constraint())
+
+ @classmethod
+ def from_constraint(cls, constraint: Constraint) -> CreateForeignKeyOp:
+ fk_constraint = cast("ForeignKeyConstraint", constraint)
+ kw: Dict[str, Any] = {}
+ if fk_constraint.onupdate:
+ kw["onupdate"] = fk_constraint.onupdate
+ if fk_constraint.ondelete:
+ kw["ondelete"] = fk_constraint.ondelete
+ if fk_constraint.initially:
+ kw["initially"] = fk_constraint.initially
+ if fk_constraint.deferrable:
+ kw["deferrable"] = fk_constraint.deferrable
+ if fk_constraint.use_alter:
+ kw["use_alter"] = fk_constraint.use_alter
+ if fk_constraint.match:
+ kw["match"] = fk_constraint.match
+
+ (
+ source_schema,
+ source_table,
+ source_columns,
+ target_schema,
+ target_table,
+ target_columns,
+ onupdate,
+ ondelete,
+ deferrable,
+ initially,
+ ) = sqla_compat._fk_spec(fk_constraint)
+
+ kw["source_schema"] = source_schema
+ kw["referent_schema"] = target_schema
+ kw.update(fk_constraint.dialect_kwargs)
+ return cls(
+ sqla_compat.constraint_name_or_none(fk_constraint.name),
+ source_table,
+ target_table,
+ source_columns,
+ target_columns,
+ **kw,
+ )
+
+ def to_constraint(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> ForeignKeyConstraint:
+ schema_obj = schemaobj.SchemaObjects(migration_context)
+ return schema_obj.foreign_key_constraint(
+ self.constraint_name,
+ self.source_table,
+ self.referent_table,
+ self.local_cols,
+ self.remote_cols,
+ **self.kw,
+ )
+
+ @classmethod
+ def create_foreign_key(
+ cls,
+ operations: Operations,
+ constraint_name: Optional[str],
+ source_table: str,
+ referent_table: str,
+ local_cols: List[str],
+ remote_cols: List[str],
+ *,
+ onupdate: Optional[str] = None,
+ ondelete: Optional[str] = None,
+ deferrable: Optional[bool] = None,
+ initially: Optional[str] = None,
+ match: Optional[str] = None,
+ source_schema: Optional[str] = None,
+ referent_schema: Optional[str] = None,
+ **dialect_kw: Any,
+ ) -> None:
+ """Issue a "create foreign key" instruction using the
+ current migration context.
+
+ e.g.::
+
+ from alembic import op
+
+ op.create_foreign_key(
+ "fk_user_address",
+ "address",
+ "user",
+ ["user_id"],
+ ["id"],
+ )
+
+ This internally generates a :class:`~sqlalchemy.schema.Table` object
+ containing the necessary columns, then generates a new
+ :class:`~sqlalchemy.schema.ForeignKeyConstraint`
+ object which it then associates with the
+ :class:`~sqlalchemy.schema.Table`.
+ Any event listeners associated with this action will be fired
+ off normally. The :class:`~sqlalchemy.schema.AddConstraint`
+ construct is ultimately used to generate the ALTER statement.
+
+ :param constraint_name: Name of the foreign key constraint. The name
+ is necessary so that an ALTER statement can be emitted. For setups
+ that use an automated naming scheme such as that described at
+ :ref:`sqla:constraint_naming_conventions`,
+ ``name`` here can be ``None``, as the event listener will
+ apply the name to the constraint object when it is associated
+ with the table.
+ :param source_table: String name of the source table.
+ :param referent_table: String name of the destination table.
+ :param local_cols: a list of string column names in the
+ source table.
+ :param remote_cols: a list of string column names in the
+ remote table.
+ :param onupdate: Optional string. If set, emit ON UPDATE <value> when
+ issuing DDL for this constraint. Typical values include CASCADE,
+ DELETE and RESTRICT.
+ :param ondelete: Optional string. If set, emit ON DELETE <value> when
+ issuing DDL for this constraint. Typical values include CASCADE,
+ DELETE and RESTRICT.
+ :param deferrable: optional bool. If set, emit DEFERRABLE or NOT
+ DEFERRABLE when issuing DDL for this constraint.
+ :param source_schema: Optional schema name of the source table.
+ :param referent_schema: Optional schema name of the destination table.
+
+ """
+
+ op = cls(
+ constraint_name,
+ source_table,
+ referent_table,
+ local_cols,
+ remote_cols,
+ onupdate=onupdate,
+ ondelete=ondelete,
+ deferrable=deferrable,
+ source_schema=source_schema,
+ referent_schema=referent_schema,
+ initially=initially,
+ match=match,
+ **dialect_kw,
+ )
+ return operations.invoke(op)
+
+ @classmethod
+ def batch_create_foreign_key(
+ cls,
+ operations: BatchOperations,
+ constraint_name: Optional[str],
+ referent_table: str,
+ local_cols: List[str],
+ remote_cols: List[str],
+ *,
+ referent_schema: Optional[str] = None,
+ onupdate: Optional[str] = None,
+ ondelete: Optional[str] = None,
+ deferrable: Optional[bool] = None,
+ initially: Optional[str] = None,
+ match: Optional[str] = None,
+ **dialect_kw: Any,
+ ) -> None:
+ """Issue a "create foreign key" instruction using the
+ current batch migration context.
+
+ The batch form of this call omits the ``source`` and ``source_schema``
+ arguments from the call.
+
+ e.g.::
+
+ with batch_alter_table("address") as batch_op:
+ batch_op.create_foreign_key(
+ "fk_user_address",
+ "user",
+ ["user_id"],
+ ["id"],
+ )
+
+ .. seealso::
+
+ :meth:`.Operations.create_foreign_key`
+
+ """
+ op = cls(
+ constraint_name,
+ operations.impl.table_name,
+ referent_table,
+ local_cols,
+ remote_cols,
+ onupdate=onupdate,
+ ondelete=ondelete,
+ deferrable=deferrable,
+ source_schema=operations.impl.schema,
+ referent_schema=referent_schema,
+ initially=initially,
+ match=match,
+ **dialect_kw,
+ )
+ return operations.invoke(op)
+
+
+@Operations.register_operation("create_check_constraint")
+@BatchOperations.register_operation(
+ "create_check_constraint", "batch_create_check_constraint"
+)
+@AddConstraintOp.register_add_constraint("check_constraint")
+@AddConstraintOp.register_add_constraint("table_or_column_check_constraint")
+@AddConstraintOp.register_add_constraint("column_check_constraint")
+class CreateCheckConstraintOp(AddConstraintOp):
+ """Represent a create check constraint operation."""
+
+ constraint_type = "check"
+
+ def __init__(
+ self,
+ constraint_name: Optional[sqla_compat._ConstraintNameDefined],
+ table_name: str,
+ condition: Union[str, TextClause, ColumnElement[Any]],
+ *,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> None:
+ self.constraint_name = constraint_name
+ self.table_name = table_name
+ self.condition = condition
+ self.schema = schema
+ self.kw = kw
+
+ @classmethod
+ def from_constraint(
+ cls, constraint: Constraint
+ ) -> CreateCheckConstraintOp:
+ constraint_table = sqla_compat._table_for_constraint(constraint)
+
+ ck_constraint = cast("CheckConstraint", constraint)
+ return cls(
+ sqla_compat.constraint_name_or_none(ck_constraint.name),
+ constraint_table.name,
+ cast("ColumnElement[Any]", ck_constraint.sqltext),
+ schema=constraint_table.schema,
+ **ck_constraint.dialect_kwargs,
+ )
+
+ def to_constraint(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> CheckConstraint:
+ schema_obj = schemaobj.SchemaObjects(migration_context)
+ return schema_obj.check_constraint(
+ self.constraint_name,
+ self.table_name,
+ self.condition,
+ schema=self.schema,
+ **self.kw,
+ )
+
+ @classmethod
+ def create_check_constraint(
+ cls,
+ operations: Operations,
+ constraint_name: Optional[str],
+ table_name: str,
+ condition: Union[str, ColumnElement[bool], TextClause],
+ *,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> None:
+ """Issue a "create check constraint" instruction using the
+ current migration context.
+
+ e.g.::
+
+ from alembic import op
+ from sqlalchemy.sql import column, func
+
+ op.create_check_constraint(
+ "ck_user_name_len",
+ "user",
+ func.len(column("name")) > 5,
+ )
+
+ CHECK constraints are usually against a SQL expression, so ad-hoc
+ table metadata is usually needed. The function will convert the given
+ arguments into a :class:`sqlalchemy.schema.CheckConstraint` bound
+ to an anonymous table in order to emit the CREATE statement.
+
+ :param name: Name of the check constraint. The name is necessary
+ so that an ALTER statement can be emitted. For setups that
+ use an automated naming scheme such as that described at
+ :ref:`sqla:constraint_naming_conventions`,
+ ``name`` here can be ``None``, as the event listener will
+ apply the name to the constraint object when it is associated
+ with the table.
+ :param table_name: String name of the source table.
+ :param condition: SQL expression that's the condition of the
+ constraint. Can be a string or SQLAlchemy expression language
+ structure.
+ :param deferrable: optional bool. If set, emit DEFERRABLE or
+ NOT DEFERRABLE when issuing DDL for this constraint.
+ :param initially: optional string. If set, emit INITIALLY <value>
+ when issuing DDL for this constraint.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ """
+ op = cls(constraint_name, table_name, condition, schema=schema, **kw)
+ return operations.invoke(op)
+
+ @classmethod
+ def batch_create_check_constraint(
+ cls,
+ operations: BatchOperations,
+ constraint_name: str,
+ condition: Union[str, ColumnElement[bool], TextClause],
+ **kw: Any,
+ ) -> None:
+ """Issue a "create check constraint" instruction using the
+ current batch migration context.
+
+ The batch form of this call omits the ``source`` and ``schema``
+ arguments from the call.
+
+ .. seealso::
+
+ :meth:`.Operations.create_check_constraint`
+
+ """
+ op = cls(
+ constraint_name,
+ operations.impl.table_name,
+ condition,
+ schema=operations.impl.schema,
+ **kw,
+ )
+ return operations.invoke(op)
+
+
+@Operations.register_operation("create_index")
+@BatchOperations.register_operation("create_index", "batch_create_index")
+class CreateIndexOp(MigrateOperation):
+ """Represent a create index operation."""
+
+ def __init__(
+ self,
+ index_name: Optional[str],
+ table_name: str,
+ columns: Sequence[Union[str, TextClause, ColumnElement[Any]]],
+ *,
+ schema: Optional[str] = None,
+ unique: bool = False,
+ if_not_exists: Optional[bool] = None,
+ **kw: Any,
+ ) -> None:
+ self.index_name = index_name
+ self.table_name = table_name
+ self.columns = columns
+ self.schema = schema
+ self.unique = unique
+ self.if_not_exists = if_not_exists
+ self.kw = kw
+
+ def reverse(self) -> DropIndexOp:
+ return DropIndexOp.from_index(self.to_index())
+
+ def to_diff_tuple(self) -> Tuple[str, Index]:
+ return ("add_index", self.to_index())
+
+ @classmethod
+ def from_index(cls, index: Index) -> CreateIndexOp:
+ assert index.table is not None
+ return cls(
+ index.name,
+ index.table.name,
+ index.expressions,
+ schema=index.table.schema,
+ unique=index.unique,
+ **index.kwargs,
+ )
+
+ def to_index(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> Index:
+ schema_obj = schemaobj.SchemaObjects(migration_context)
+
+ idx = schema_obj.index(
+ self.index_name,
+ self.table_name,
+ self.columns,
+ schema=self.schema,
+ unique=self.unique,
+ **self.kw,
+ )
+ return idx
+
+ @classmethod
+ def create_index(
+ cls,
+ operations: Operations,
+ index_name: Optional[str],
+ table_name: str,
+ columns: Sequence[Union[str, TextClause, ColumnElement[Any]]],
+ *,
+ schema: Optional[str] = None,
+ unique: bool = False,
+ if_not_exists: Optional[bool] = None,
+ **kw: Any,
+ ) -> None:
+ r"""Issue a "create index" instruction using the current
+ migration context.
+
+ e.g.::
+
+ from alembic import op
+
+ op.create_index("ik_test", "t1", ["foo", "bar"])
+
+ Functional indexes can be produced by using the
+ :func:`sqlalchemy.sql.expression.text` construct::
+
+ from alembic import op
+ from sqlalchemy import text
+
+ op.create_index("ik_test", "t1", [text("lower(foo)")])
+
+ :param index_name: name of the index.
+ :param table_name: name of the owning table.
+ :param columns: a list consisting of string column names and/or
+ :func:`~sqlalchemy.sql.expression.text` constructs.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+ :param unique: If True, create a unique index.
+
+ :param quote: Force quoting of this column's name on or off,
+ corresponding to ``True`` or ``False``. When left at its default
+ of ``None``, the column identifier will be quoted according to
+ whether the name is case sensitive (identifiers with at least one
+ upper case character are treated as case sensitive), or if it's a
+ reserved word. This flag is only needed to force quoting of a
+ reserved word which is not known by the SQLAlchemy dialect.
+
+ :param if_not_exists: If True, adds IF NOT EXISTS operator when
+ creating the new index.
+
+ .. versionadded:: 1.12.0
+
+ :param \**kw: Additional keyword arguments not mentioned above are
+ dialect specific, and passed in the form
+ ``<dialectname>_<argname>``.
+ See the documentation regarding an individual dialect at
+ :ref:`dialect_toplevel` for detail on documented arguments.
+
+ """
+ op = cls(
+ index_name,
+ table_name,
+ columns,
+ schema=schema,
+ unique=unique,
+ if_not_exists=if_not_exists,
+ **kw,
+ )
+ return operations.invoke(op)
+
+ @classmethod
+ def batch_create_index(
+ cls,
+ operations: BatchOperations,
+ index_name: str,
+ columns: List[str],
+ **kw: Any,
+ ) -> None:
+ """Issue a "create index" instruction using the
+ current batch migration context.
+
+ .. seealso::
+
+ :meth:`.Operations.create_index`
+
+ """
+
+ op = cls(
+ index_name,
+ operations.impl.table_name,
+ columns,
+ schema=operations.impl.schema,
+ **kw,
+ )
+ return operations.invoke(op)
+
+
+@Operations.register_operation("drop_index")
+@BatchOperations.register_operation("drop_index", "batch_drop_index")
+class DropIndexOp(MigrateOperation):
+ """Represent a drop index operation."""
+
+ def __init__(
+ self,
+ index_name: Union[quoted_name, str, conv],
+ table_name: Optional[str] = None,
+ *,
+ schema: Optional[str] = None,
+ if_exists: Optional[bool] = None,
+ _reverse: Optional[CreateIndexOp] = None,
+ **kw: Any,
+ ) -> None:
+ self.index_name = index_name
+ self.table_name = table_name
+ self.schema = schema
+ self.if_exists = if_exists
+ self._reverse = _reverse
+ self.kw = kw
+
+ def to_diff_tuple(self) -> Tuple[str, Index]:
+ return ("remove_index", self.to_index())
+
+ def reverse(self) -> CreateIndexOp:
+ return CreateIndexOp.from_index(self.to_index())
+
+ @classmethod
+ def from_index(cls, index: Index) -> DropIndexOp:
+ assert index.table is not None
+ return cls(
+ index.name, # type: ignore[arg-type]
+ table_name=index.table.name,
+ schema=index.table.schema,
+ _reverse=CreateIndexOp.from_index(index),
+ unique=index.unique,
+ **index.kwargs,
+ )
+
+ def to_index(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> Index:
+ schema_obj = schemaobj.SchemaObjects(migration_context)
+
+ # need a dummy column name here since SQLAlchemy
+ # 0.7.6 and further raises on Index with no columns
+ return schema_obj.index(
+ self.index_name,
+ self.table_name,
+ self._reverse.columns if self._reverse else ["x"],
+ schema=self.schema,
+ **self.kw,
+ )
+
+ @classmethod
+ def drop_index(
+ cls,
+ operations: Operations,
+ index_name: str,
+ table_name: Optional[str] = None,
+ *,
+ schema: Optional[str] = None,
+ if_exists: Optional[bool] = None,
+ **kw: Any,
+ ) -> None:
+ r"""Issue a "drop index" instruction using the current
+ migration context.
+
+ e.g.::
+
+ drop_index("accounts")
+
+ :param index_name: name of the index.
+ :param table_name: name of the owning table. Some
+ backends such as Microsoft SQL Server require this.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ :param if_exists: If True, adds IF EXISTS operator when
+ dropping the index.
+
+ .. versionadded:: 1.12.0
+
+ :param \**kw: Additional keyword arguments not mentioned above are
+ dialect specific, and passed in the form
+ ``<dialectname>_<argname>``.
+ See the documentation regarding an individual dialect at
+ :ref:`dialect_toplevel` for detail on documented arguments.
+
+ """
+ op = cls(
+ index_name,
+ table_name=table_name,
+ schema=schema,
+ if_exists=if_exists,
+ **kw,
+ )
+ return operations.invoke(op)
+
+ @classmethod
+ def batch_drop_index(
+ cls, operations: BatchOperations, index_name: str, **kw: Any
+ ) -> None:
+ """Issue a "drop index" instruction using the
+ current batch migration context.
+
+ .. seealso::
+
+ :meth:`.Operations.drop_index`
+
+ """
+
+ op = cls(
+ index_name,
+ table_name=operations.impl.table_name,
+ schema=operations.impl.schema,
+ **kw,
+ )
+ return operations.invoke(op)
+
+
+@Operations.register_operation("create_table")
+class CreateTableOp(MigrateOperation):
+ """Represent a create table operation."""
+
+ def __init__(
+ self,
+ table_name: str,
+ columns: Sequence[SchemaItem],
+ *,
+ schema: Optional[str] = None,
+ if_not_exists: Optional[bool] = None,
+ _namespace_metadata: Optional[MetaData] = None,
+ _constraints_included: bool = False,
+ **kw: Any,
+ ) -> None:
+ self.table_name = table_name
+ self.columns = columns
+ self.schema = schema
+ self.if_not_exists = if_not_exists
+ self.info = kw.pop("info", {})
+ self.comment = kw.pop("comment", None)
+ self.prefixes = kw.pop("prefixes", None)
+ self.kw = kw
+ self._namespace_metadata = _namespace_metadata
+ self._constraints_included = _constraints_included
+
+ def reverse(self) -> DropTableOp:
+ return DropTableOp.from_table(
+ self.to_table(), _namespace_metadata=self._namespace_metadata
+ )
+
+ def to_diff_tuple(self) -> Tuple[str, Table]:
+ return ("add_table", self.to_table())
+
+ @classmethod
+ def from_table(
+ cls, table: Table, *, _namespace_metadata: Optional[MetaData] = None
+ ) -> CreateTableOp:
+ if _namespace_metadata is None:
+ _namespace_metadata = table.metadata
+
+ return cls(
+ table.name,
+ list(table.c) + list(table.constraints),
+ schema=table.schema,
+ _namespace_metadata=_namespace_metadata,
+ # given a Table() object, this Table will contain full Index()
+ # and UniqueConstraint objects already constructed in response to
+ # each unique=True / index=True flag on a Column. Carry this
+ # state along so that when we re-convert back into a Table, we
+ # skip unique=True/index=True so that these constraints are
+ # not doubled up. see #844 #848
+ _constraints_included=True,
+ comment=table.comment,
+ info=dict(table.info),
+ prefixes=list(table._prefixes),
+ **table.kwargs,
+ )
+
+ def to_table(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> Table:
+ schema_obj = schemaobj.SchemaObjects(migration_context)
+
+ return schema_obj.table(
+ self.table_name,
+ *self.columns,
+ schema=self.schema,
+ prefixes=list(self.prefixes) if self.prefixes else [],
+ comment=self.comment,
+ info=self.info.copy() if self.info else {},
+ _constraints_included=self._constraints_included,
+ **self.kw,
+ )
+
+ @classmethod
+ def create_table(
+ cls,
+ operations: Operations,
+ table_name: str,
+ *columns: SchemaItem,
+ if_not_exists: Optional[bool] = None,
+ **kw: Any,
+ ) -> Table:
+ r"""Issue a "create table" instruction using the current migration
+ context.
+
+ This directive receives an argument list similar to that of the
+ traditional :class:`sqlalchemy.schema.Table` construct, but without the
+ metadata::
+
+ from sqlalchemy import INTEGER, VARCHAR, NVARCHAR, Column
+ from alembic import op
+
+ op.create_table(
+ "account",
+ Column("id", INTEGER, primary_key=True),
+ Column("name", VARCHAR(50), nullable=False),
+ Column("description", NVARCHAR(200)),
+ Column("timestamp", TIMESTAMP, server_default=func.now()),
+ )
+
+ Note that :meth:`.create_table` accepts
+ :class:`~sqlalchemy.schema.Column`
+ constructs directly from the SQLAlchemy library. In particular,
+ default values to be created on the database side are
+ specified using the ``server_default`` parameter, and not
+ ``default`` which only specifies Python-side defaults::
+
+ from alembic import op
+ from sqlalchemy import Column, TIMESTAMP, func
+
+ # specify "DEFAULT NOW" along with the "timestamp" column
+ op.create_table(
+ "account",
+ Column("id", INTEGER, primary_key=True),
+ Column("timestamp", TIMESTAMP, server_default=func.now()),
+ )
+
+ The function also returns a newly created
+ :class:`~sqlalchemy.schema.Table` object, corresponding to the table
+ specification given, which is suitable for
+ immediate SQL operations, in particular
+ :meth:`.Operations.bulk_insert`::
+
+ from sqlalchemy import INTEGER, VARCHAR, NVARCHAR, Column
+ from alembic import op
+
+ account_table = op.create_table(
+ "account",
+ Column("id", INTEGER, primary_key=True),
+ Column("name", VARCHAR(50), nullable=False),
+ Column("description", NVARCHAR(200)),
+ Column("timestamp", TIMESTAMP, server_default=func.now()),
+ )
+
+ op.bulk_insert(
+ account_table,
+ [
+ {"name": "A1", "description": "account 1"},
+ {"name": "A2", "description": "account 2"},
+ ],
+ )
+
+ :param table_name: Name of the table
+ :param \*columns: collection of :class:`~sqlalchemy.schema.Column`
+ objects within
+ the table, as well as optional :class:`~sqlalchemy.schema.Constraint`
+ objects
+ and :class:`~.sqlalchemy.schema.Index` objects.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+ :param if_not_exists: If True, adds IF NOT EXISTS operator when
+ creating the new table.
+
+ .. versionadded:: 1.13.3
+ :param \**kw: Other keyword arguments are passed to the underlying
+ :class:`sqlalchemy.schema.Table` object created for the command.
+
+ :return: the :class:`~sqlalchemy.schema.Table` object corresponding
+ to the parameters given.
+
+ """
+ op = cls(table_name, columns, if_not_exists=if_not_exists, **kw)
+ return operations.invoke(op)
+
+
+@Operations.register_operation("drop_table")
+class DropTableOp(MigrateOperation):
+ """Represent a drop table operation."""
+
+ def __init__(
+ self,
+ table_name: str,
+ *,
+ schema: Optional[str] = None,
+ if_exists: Optional[bool] = None,
+ table_kw: Optional[MutableMapping[Any, Any]] = None,
+ _reverse: Optional[CreateTableOp] = None,
+ ) -> None:
+ self.table_name = table_name
+ self.schema = schema
+ self.if_exists = if_exists
+ self.table_kw = table_kw or {}
+ self.comment = self.table_kw.pop("comment", None)
+ self.info = self.table_kw.pop("info", None)
+ self.prefixes = self.table_kw.pop("prefixes", None)
+ self._reverse = _reverse
+
+ def to_diff_tuple(self) -> Tuple[str, Table]:
+ return ("remove_table", self.to_table())
+
+ def reverse(self) -> CreateTableOp:
+ return CreateTableOp.from_table(self.to_table())
+
+ @classmethod
+ def from_table(
+ cls, table: Table, *, _namespace_metadata: Optional[MetaData] = None
+ ) -> DropTableOp:
+ return cls(
+ table.name,
+ schema=table.schema,
+ table_kw={
+ "comment": table.comment,
+ "info": dict(table.info),
+ "prefixes": list(table._prefixes),
+ **table.kwargs,
+ },
+ _reverse=CreateTableOp.from_table(
+ table, _namespace_metadata=_namespace_metadata
+ ),
+ )
+
+ def to_table(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> Table:
+ if self._reverse:
+ cols_and_constraints = self._reverse.columns
+ else:
+ cols_and_constraints = []
+
+ schema_obj = schemaobj.SchemaObjects(migration_context)
+ t = schema_obj.table(
+ self.table_name,
+ *cols_and_constraints,
+ comment=self.comment,
+ info=self.info.copy() if self.info else {},
+ prefixes=list(self.prefixes) if self.prefixes else [],
+ schema=self.schema,
+ _constraints_included=(
+ self._reverse._constraints_included if self._reverse else False
+ ),
+ **self.table_kw,
+ )
+ return t
+
+ @classmethod
+ def drop_table(
+ cls,
+ operations: Operations,
+ table_name: str,
+ *,
+ schema: Optional[str] = None,
+ if_exists: Optional[bool] = None,
+ **kw: Any,
+ ) -> None:
+ r"""Issue a "drop table" instruction using the current
+ migration context.
+
+
+ e.g.::
+
+ drop_table("accounts")
+
+ :param table_name: Name of the table
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+ :param if_exists: If True, adds IF EXISTS operator when
+ dropping the table.
+
+ .. versionadded:: 1.13.3
+ :param \**kw: Other keyword arguments are passed to the underlying
+ :class:`sqlalchemy.schema.Table` object created for the command.
+
+ """
+ op = cls(table_name, schema=schema, if_exists=if_exists, table_kw=kw)
+ operations.invoke(op)
+
+
+class AlterTableOp(MigrateOperation):
+ """Represent an alter table operation."""
+
+ def __init__(
+ self,
+ table_name: str,
+ *,
+ schema: Optional[str] = None,
+ ) -> None:
+ self.table_name = table_name
+ self.schema = schema
+
+
+@Operations.register_operation("rename_table")
+class RenameTableOp(AlterTableOp):
+ """Represent a rename table operation."""
+
+ def __init__(
+ self,
+ old_table_name: str,
+ new_table_name: str,
+ *,
+ schema: Optional[str] = None,
+ ) -> None:
+ super().__init__(old_table_name, schema=schema)
+ self.new_table_name = new_table_name
+
+ @classmethod
+ def rename_table(
+ cls,
+ operations: Operations,
+ old_table_name: str,
+ new_table_name: str,
+ *,
+ schema: Optional[str] = None,
+ ) -> None:
+ """Emit an ALTER TABLE to rename a table.
+
+ :param old_table_name: old name.
+ :param new_table_name: new name.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ """
+ op = cls(old_table_name, new_table_name, schema=schema)
+ return operations.invoke(op)
+
+
+@Operations.register_operation("create_table_comment")
+@BatchOperations.register_operation(
+ "create_table_comment", "batch_create_table_comment"
+)
+class CreateTableCommentOp(AlterTableOp):
+ """Represent a COMMENT ON `table` operation."""
+
+ def __init__(
+ self,
+ table_name: str,
+ comment: Optional[str],
+ *,
+ schema: Optional[str] = None,
+ existing_comment: Optional[str] = None,
+ ) -> None:
+ self.table_name = table_name
+ self.comment = comment
+ self.existing_comment = existing_comment
+ self.schema = schema
+
+ @classmethod
+ def create_table_comment(
+ cls,
+ operations: Operations,
+ table_name: str,
+ comment: Optional[str],
+ *,
+ existing_comment: Optional[str] = None,
+ schema: Optional[str] = None,
+ ) -> None:
+ """Emit a COMMENT ON operation to set the comment for a table.
+
+ :param table_name: string name of the target table.
+ :param comment: string value of the comment being registered against
+ the specified table.
+ :param existing_comment: String value of a comment
+ already registered on the specified table, used within autogenerate
+ so that the operation is reversible, but not required for direct
+ use.
+
+ .. seealso::
+
+ :meth:`.Operations.drop_table_comment`
+
+ :paramref:`.Operations.alter_column.comment`
+
+ """
+
+ op = cls(
+ table_name,
+ comment,
+ existing_comment=existing_comment,
+ schema=schema,
+ )
+ return operations.invoke(op)
+
+ @classmethod
+ def batch_create_table_comment(
+ cls,
+ operations: BatchOperations,
+ comment: Optional[str],
+ *,
+ existing_comment: Optional[str] = None,
+ ) -> None:
+ """Emit a COMMENT ON operation to set the comment for a table
+ using the current batch migration context.
+
+ :param comment: string value of the comment being registered against
+ the specified table.
+ :param existing_comment: String value of a comment
+ already registered on the specified table, used within autogenerate
+ so that the operation is reversible, but not required for direct
+ use.
+
+ """
+
+ op = cls(
+ operations.impl.table_name,
+ comment,
+ existing_comment=existing_comment,
+ schema=operations.impl.schema,
+ )
+ return operations.invoke(op)
+
+ def reverse(self) -> Union[CreateTableCommentOp, DropTableCommentOp]:
+ """Reverses the COMMENT ON operation against a table."""
+ if self.existing_comment is None:
+ return DropTableCommentOp(
+ self.table_name,
+ existing_comment=self.comment,
+ schema=self.schema,
+ )
+ else:
+ return CreateTableCommentOp(
+ self.table_name,
+ self.existing_comment,
+ existing_comment=self.comment,
+ schema=self.schema,
+ )
+
+ def to_table(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> Table:
+ schema_obj = schemaobj.SchemaObjects(migration_context)
+
+ return schema_obj.table(
+ self.table_name, schema=self.schema, comment=self.comment
+ )
+
+ def to_diff_tuple(self) -> Tuple[Any, ...]:
+ return ("add_table_comment", self.to_table(), self.existing_comment)
+
+
+@Operations.register_operation("drop_table_comment")
+@BatchOperations.register_operation(
+ "drop_table_comment", "batch_drop_table_comment"
+)
+class DropTableCommentOp(AlterTableOp):
+ """Represent an operation to remove the comment from a table."""
+
+ def __init__(
+ self,
+ table_name: str,
+ *,
+ schema: Optional[str] = None,
+ existing_comment: Optional[str] = None,
+ ) -> None:
+ self.table_name = table_name
+ self.existing_comment = existing_comment
+ self.schema = schema
+
+ @classmethod
+ def drop_table_comment(
+ cls,
+ operations: Operations,
+ table_name: str,
+ *,
+ existing_comment: Optional[str] = None,
+ schema: Optional[str] = None,
+ ) -> None:
+ """Issue a "drop table comment" operation to
+ remove an existing comment set on a table.
+
+ :param table_name: string name of the target table.
+ :param existing_comment: An optional string value of a comment already
+ registered on the specified table.
+
+ .. seealso::
+
+ :meth:`.Operations.create_table_comment`
+
+ :paramref:`.Operations.alter_column.comment`
+
+ """
+
+ op = cls(table_name, existing_comment=existing_comment, schema=schema)
+ return operations.invoke(op)
+
+ @classmethod
+ def batch_drop_table_comment(
+ cls,
+ operations: BatchOperations,
+ *,
+ existing_comment: Optional[str] = None,
+ ) -> None:
+ """Issue a "drop table comment" operation to
+ remove an existing comment set on a table using the current
+ batch operations context.
+
+ :param existing_comment: An optional string value of a comment already
+ registered on the specified table.
+
+ """
+
+ op = cls(
+ operations.impl.table_name,
+ existing_comment=existing_comment,
+ schema=operations.impl.schema,
+ )
+ return operations.invoke(op)
+
+ def reverse(self) -> CreateTableCommentOp:
+ """Reverses the COMMENT ON operation against a table."""
+ return CreateTableCommentOp(
+ self.table_name, self.existing_comment, schema=self.schema
+ )
+
+ def to_table(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> Table:
+ schema_obj = schemaobj.SchemaObjects(migration_context)
+
+ return schema_obj.table(self.table_name, schema=self.schema)
+
+ def to_diff_tuple(self) -> Tuple[Any, ...]:
+ return ("remove_table_comment", self.to_table())
+
+
+@Operations.register_operation("alter_column")
+@BatchOperations.register_operation("alter_column", "batch_alter_column")
+class AlterColumnOp(AlterTableOp):
+ """Represent an alter column operation."""
+
+ def __init__(
+ self,
+ table_name: str,
+ column_name: str,
+ *,
+ schema: Optional[str] = None,
+ existing_type: Optional[Any] = None,
+ existing_server_default: Any = False,
+ existing_nullable: Optional[bool] = None,
+ existing_comment: Optional[str] = None,
+ modify_nullable: Optional[bool] = None,
+ modify_comment: Optional[Union[str, Literal[False]]] = False,
+ modify_server_default: Any = False,
+ modify_name: Optional[str] = None,
+ modify_type: Optional[Any] = None,
+ **kw: Any,
+ ) -> None:
+ super().__init__(table_name, schema=schema)
+ self.column_name = column_name
+ self.existing_type = existing_type
+ self.existing_server_default = existing_server_default
+ self.existing_nullable = existing_nullable
+ self.existing_comment = existing_comment
+ self.modify_nullable = modify_nullable
+ self.modify_comment = modify_comment
+ self.modify_server_default = modify_server_default
+ self.modify_name = modify_name
+ self.modify_type = modify_type
+ self.kw = kw
+
+ def to_diff_tuple(self) -> Any:
+ col_diff = []
+ schema, tname, cname = self.schema, self.table_name, self.column_name
+
+ if self.modify_type is not None:
+ col_diff.append(
+ (
+ "modify_type",
+ schema,
+ tname,
+ cname,
+ {
+ "existing_nullable": self.existing_nullable,
+ "existing_server_default": (
+ self.existing_server_default
+ ),
+ "existing_comment": self.existing_comment,
+ },
+ self.existing_type,
+ self.modify_type,
+ )
+ )
+
+ if self.modify_nullable is not None:
+ col_diff.append(
+ (
+ "modify_nullable",
+ schema,
+ tname,
+ cname,
+ {
+ "existing_type": self.existing_type,
+ "existing_server_default": (
+ self.existing_server_default
+ ),
+ "existing_comment": self.existing_comment,
+ },
+ self.existing_nullable,
+ self.modify_nullable,
+ )
+ )
+
+ if self.modify_server_default is not False:
+ col_diff.append(
+ (
+ "modify_default",
+ schema,
+ tname,
+ cname,
+ {
+ "existing_nullable": self.existing_nullable,
+ "existing_type": self.existing_type,
+ "existing_comment": self.existing_comment,
+ },
+ self.existing_server_default,
+ self.modify_server_default,
+ )
+ )
+
+ if self.modify_comment is not False:
+ col_diff.append(
+ (
+ "modify_comment",
+ schema,
+ tname,
+ cname,
+ {
+ "existing_nullable": self.existing_nullable,
+ "existing_type": self.existing_type,
+ "existing_server_default": (
+ self.existing_server_default
+ ),
+ },
+ self.existing_comment,
+ self.modify_comment,
+ )
+ )
+
+ return col_diff
+
+ def has_changes(self) -> bool:
+ hc1 = (
+ self.modify_nullable is not None
+ or self.modify_server_default is not False
+ or self.modify_type is not None
+ or self.modify_comment is not False
+ )
+ if hc1:
+ return True
+ for kw in self.kw:
+ if kw.startswith("modify_"):
+ return True
+ else:
+ return False
+
+ def reverse(self) -> AlterColumnOp:
+ kw = self.kw.copy()
+ kw["existing_type"] = self.existing_type
+ kw["existing_nullable"] = self.existing_nullable
+ kw["existing_server_default"] = self.existing_server_default
+ kw["existing_comment"] = self.existing_comment
+ if self.modify_type is not None:
+ kw["modify_type"] = self.modify_type
+ if self.modify_nullable is not None:
+ kw["modify_nullable"] = self.modify_nullable
+ if self.modify_server_default is not False:
+ kw["modify_server_default"] = self.modify_server_default
+ if self.modify_comment is not False:
+ kw["modify_comment"] = self.modify_comment
+
+ # TODO: make this a little simpler
+ all_keys = {
+ m.group(1)
+ for m in [re.match(r"^(?:existing_|modify_)(.+)$", k) for k in kw]
+ if m
+ }
+
+ for k in all_keys:
+ if "modify_%s" % k in kw:
+ swap = kw["existing_%s" % k]
+ kw["existing_%s" % k] = kw["modify_%s" % k]
+ kw["modify_%s" % k] = swap
+
+ return self.__class__(
+ self.table_name, self.column_name, schema=self.schema, **kw
+ )
+
+ @classmethod
+ def alter_column(
+ cls,
+ operations: Operations,
+ table_name: str,
+ column_name: str,
+ *,
+ nullable: Optional[bool] = None,
+ comment: Optional[Union[str, Literal[False]]] = False,
+ server_default: Any = False,
+ new_column_name: Optional[str] = None,
+ type_: Optional[Union[TypeEngine[Any], Type[TypeEngine[Any]]]] = None,
+ existing_type: Optional[
+ Union[TypeEngine[Any], Type[TypeEngine[Any]]]
+ ] = None,
+ existing_server_default: Optional[
+ Union[str, bool, Identity, Computed]
+ ] = False,
+ existing_nullable: Optional[bool] = None,
+ existing_comment: Optional[str] = None,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> None:
+ r"""Issue an "alter column" instruction using the
+ current migration context.
+
+ Generally, only that aspect of the column which
+ is being changed, i.e. name, type, nullability,
+ default, needs to be specified. Multiple changes
+ can also be specified at once and the backend should
+ "do the right thing", emitting each change either
+ separately or together as the backend allows.
+
+ MySQL has special requirements here, since MySQL
+ cannot ALTER a column without a full specification.
+ When producing MySQL-compatible migration files,
+ it is recommended that the ``existing_type``,
+ ``existing_server_default``, and ``existing_nullable``
+ parameters be present, if not being altered.
+
+ Type changes which are against the SQLAlchemy
+ "schema" types :class:`~sqlalchemy.types.Boolean`
+ and :class:`~sqlalchemy.types.Enum` may also
+ add or drop constraints which accompany those
+ types on backends that don't support them natively.
+ The ``existing_type`` argument is
+ used in this case to identify and remove a previous
+ constraint that was bound to the type object.
+
+ :param table_name: string name of the target table.
+ :param column_name: string name of the target column,
+ as it exists before the operation begins.
+ :param nullable: Optional; specify ``True`` or ``False``
+ to alter the column's nullability.
+ :param server_default: Optional; specify a string
+ SQL expression, :func:`~sqlalchemy.sql.expression.text`,
+ or :class:`~sqlalchemy.schema.DefaultClause` to indicate
+ an alteration to the column's default value.
+ Set to ``None`` to have the default removed.
+ :param comment: optional string text of a new comment to add to the
+ column.
+ :param new_column_name: Optional; specify a string name here to
+ indicate the new name within a column rename operation.
+ :param type\_: Optional; a :class:`~sqlalchemy.types.TypeEngine`
+ type object to specify a change to the column's type.
+ For SQLAlchemy types that also indicate a constraint (i.e.
+ :class:`~sqlalchemy.types.Boolean`, :class:`~sqlalchemy.types.Enum`),
+ the constraint is also generated.
+ :param autoincrement: set the ``AUTO_INCREMENT`` flag of the column;
+ currently understood by the MySQL dialect.
+ :param existing_type: Optional; a
+ :class:`~sqlalchemy.types.TypeEngine`
+ type object to specify the previous type. This
+ is required for all MySQL column alter operations that
+ don't otherwise specify a new type, as well as for
+ when nullability is being changed on a SQL Server
+ column. It is also used if the type is a so-called
+ SQLAlchemy "schema" type which may define a constraint (i.e.
+ :class:`~sqlalchemy.types.Boolean`,
+ :class:`~sqlalchemy.types.Enum`),
+ so that the constraint can be dropped.
+ :param existing_server_default: Optional; The existing
+ default value of the column. Required on MySQL if
+ an existing default is not being changed; else MySQL
+ removes the default.
+ :param existing_nullable: Optional; the existing nullability
+ of the column. Required on MySQL if the existing nullability
+ is not being changed; else MySQL sets this to NULL.
+ :param existing_autoincrement: Optional; the existing autoincrement
+ of the column. Used for MySQL's system of altering a column
+ that specifies ``AUTO_INCREMENT``.
+ :param existing_comment: string text of the existing comment on the
+ column to be maintained. Required on MySQL if the existing comment
+ on the column is not being changed.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+ :param postgresql_using: String argument which will indicate a
+ SQL expression to render within the Postgresql-specific USING clause
+ within ALTER COLUMN. This string is taken directly as raw SQL which
+ must explicitly include any necessary quoting or escaping of tokens
+ within the expression.
+
+ """
+
+ alt = cls(
+ table_name,
+ column_name,
+ schema=schema,
+ existing_type=existing_type,
+ existing_server_default=existing_server_default,
+ existing_nullable=existing_nullable,
+ existing_comment=existing_comment,
+ modify_name=new_column_name,
+ modify_type=type_,
+ modify_server_default=server_default,
+ modify_nullable=nullable,
+ modify_comment=comment,
+ **kw,
+ )
+
+ return operations.invoke(alt)
+
+ @classmethod
+ def batch_alter_column(
+ cls,
+ operations: BatchOperations,
+ column_name: str,
+ *,
+ nullable: Optional[bool] = None,
+ comment: Optional[Union[str, Literal[False]]] = False,
+ server_default: Any = False,
+ new_column_name: Optional[str] = None,
+ type_: Optional[Union[TypeEngine[Any], Type[TypeEngine[Any]]]] = None,
+ existing_type: Optional[
+ Union[TypeEngine[Any], Type[TypeEngine[Any]]]
+ ] = None,
+ existing_server_default: Optional[
+ Union[str, bool, Identity, Computed]
+ ] = False,
+ existing_nullable: Optional[bool] = None,
+ existing_comment: Optional[str] = None,
+ insert_before: Optional[str] = None,
+ insert_after: Optional[str] = None,
+ **kw: Any,
+ ) -> None:
+ """Issue an "alter column" instruction using the current
+ batch migration context.
+
+ Parameters are the same as that of :meth:`.Operations.alter_column`,
+ as well as the following option(s):
+
+ :param insert_before: String name of an existing column which this
+ column should be placed before, when creating the new table.
+
+ :param insert_after: String name of an existing column which this
+ column should be placed after, when creating the new table. If
+ both :paramref:`.BatchOperations.alter_column.insert_before`
+ and :paramref:`.BatchOperations.alter_column.insert_after` are
+ omitted, the column is inserted after the last existing column
+ in the table.
+
+ .. seealso::
+
+ :meth:`.Operations.alter_column`
+
+
+ """
+ alt = cls(
+ operations.impl.table_name,
+ column_name,
+ schema=operations.impl.schema,
+ existing_type=existing_type,
+ existing_server_default=existing_server_default,
+ existing_nullable=existing_nullable,
+ existing_comment=existing_comment,
+ modify_name=new_column_name,
+ modify_type=type_,
+ modify_server_default=server_default,
+ modify_nullable=nullable,
+ modify_comment=comment,
+ insert_before=insert_before,
+ insert_after=insert_after,
+ **kw,
+ )
+
+ return operations.invoke(alt)
+
+
+@Operations.register_operation("add_column")
+@BatchOperations.register_operation("add_column", "batch_add_column")
+class AddColumnOp(AlterTableOp):
+ """Represent an add column operation."""
+
+ def __init__(
+ self,
+ table_name: str,
+ column: Column[Any],
+ *,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> None:
+ super().__init__(table_name, schema=schema)
+ self.column = column
+ self.kw = kw
+
+ def reverse(self) -> DropColumnOp:
+ return DropColumnOp.from_column_and_tablename(
+ self.schema, self.table_name, self.column
+ )
+
+ def to_diff_tuple(
+ self,
+ ) -> Tuple[str, Optional[str], str, Column[Any]]:
+ return ("add_column", self.schema, self.table_name, self.column)
+
+ def to_column(self) -> Column[Any]:
+ return self.column
+
+ @classmethod
+ def from_column(cls, col: Column[Any]) -> AddColumnOp:
+ return cls(col.table.name, col, schema=col.table.schema)
+
+ @classmethod
+ def from_column_and_tablename(
+ cls,
+ schema: Optional[str],
+ tname: str,
+ col: Column[Any],
+ ) -> AddColumnOp:
+ return cls(tname, col, schema=schema)
+
+ @classmethod
+ def add_column(
+ cls,
+ operations: Operations,
+ table_name: str,
+ column: Column[Any],
+ *,
+ schema: Optional[str] = None,
+ ) -> None:
+ """Issue an "add column" instruction using the current
+ migration context.
+
+ e.g.::
+
+ from alembic import op
+ from sqlalchemy import Column, String
+
+ op.add_column("organization", Column("name", String()))
+
+ The :meth:`.Operations.add_column` method typically corresponds
+ to the SQL command "ALTER TABLE... ADD COLUMN". Within the scope
+ of this command, the column's name, datatype, nullability,
+ and optional server-generated defaults may be indicated.
+
+ .. note::
+
+ With the exception of NOT NULL constraints or single-column FOREIGN
+ KEY constraints, other kinds of constraints such as PRIMARY KEY,
+ UNIQUE or CHECK constraints **cannot** be generated using this
+ method; for these constraints, refer to operations such as
+ :meth:`.Operations.create_primary_key` and
+ :meth:`.Operations.create_check_constraint`. In particular, the
+ following :class:`~sqlalchemy.schema.Column` parameters are
+ **ignored**:
+
+ * :paramref:`~sqlalchemy.schema.Column.primary_key` - SQL databases
+ typically do not support an ALTER operation that can add
+ individual columns one at a time to an existing primary key
+ constraint, therefore it's less ambiguous to use the
+ :meth:`.Operations.create_primary_key` method, which assumes no
+ existing primary key constraint is present.
+ * :paramref:`~sqlalchemy.schema.Column.unique` - use the
+ :meth:`.Operations.create_unique_constraint` method
+ * :paramref:`~sqlalchemy.schema.Column.index` - use the
+ :meth:`.Operations.create_index` method
+
+
+ The provided :class:`~sqlalchemy.schema.Column` object may include a
+ :class:`~sqlalchemy.schema.ForeignKey` constraint directive,
+ referencing a remote table name. For this specific type of constraint,
+ Alembic will automatically emit a second ALTER statement in order to
+ add the single-column FOREIGN KEY constraint separately::
+
+ from alembic import op
+ from sqlalchemy import Column, INTEGER, ForeignKey
+
+ op.add_column(
+ "organization",
+ Column("account_id", INTEGER, ForeignKey("accounts.id")),
+ )
+
+ The column argument passed to :meth:`.Operations.add_column` is a
+ :class:`~sqlalchemy.schema.Column` construct, used in the same way it's
+ used in SQLAlchemy. In particular, values or functions to be indicated
+ as producing the column's default value on the database side are
+ specified using the ``server_default`` parameter, and not ``default``
+ which only specifies Python-side defaults::
+
+ from alembic import op
+ from sqlalchemy import Column, TIMESTAMP, func
+
+ # specify "DEFAULT NOW" along with the column add
+ op.add_column(
+ "account",
+ Column("timestamp", TIMESTAMP, server_default=func.now()),
+ )
+
+ :param table_name: String name of the parent table.
+ :param column: a :class:`sqlalchemy.schema.Column` object
+ representing the new column.
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+
+ """
+
+ op = cls(table_name, column, schema=schema)
+ return operations.invoke(op)
+
+ @classmethod
+ def batch_add_column(
+ cls,
+ operations: BatchOperations,
+ column: Column[Any],
+ *,
+ insert_before: Optional[str] = None,
+ insert_after: Optional[str] = None,
+ ) -> None:
+ """Issue an "add column" instruction using the current
+ batch migration context.
+
+ .. seealso::
+
+ :meth:`.Operations.add_column`
+
+ """
+
+ kw = {}
+ if insert_before:
+ kw["insert_before"] = insert_before
+ if insert_after:
+ kw["insert_after"] = insert_after
+
+ op = cls(
+ operations.impl.table_name,
+ column,
+ schema=operations.impl.schema,
+ **kw,
+ )
+ return operations.invoke(op)
+
+
+@Operations.register_operation("drop_column")
+@BatchOperations.register_operation("drop_column", "batch_drop_column")
+class DropColumnOp(AlterTableOp):
+ """Represent a drop column operation."""
+
+ def __init__(
+ self,
+ table_name: str,
+ column_name: str,
+ *,
+ schema: Optional[str] = None,
+ _reverse: Optional[AddColumnOp] = None,
+ **kw: Any,
+ ) -> None:
+ super().__init__(table_name, schema=schema)
+ self.column_name = column_name
+ self.kw = kw
+ self._reverse = _reverse
+
+ def to_diff_tuple(
+ self,
+ ) -> Tuple[str, Optional[str], str, Column[Any]]:
+ return (
+ "remove_column",
+ self.schema,
+ self.table_name,
+ self.to_column(),
+ )
+
+ def reverse(self) -> AddColumnOp:
+ if self._reverse is None:
+ raise ValueError(
+ "operation is not reversible; "
+ "original column is not present"
+ )
+
+ return AddColumnOp.from_column_and_tablename(
+ self.schema, self.table_name, self._reverse.column
+ )
+
+ @classmethod
+ def from_column_and_tablename(
+ cls,
+ schema: Optional[str],
+ tname: str,
+ col: Column[Any],
+ ) -> DropColumnOp:
+ return cls(
+ tname,
+ col.name,
+ schema=schema,
+ _reverse=AddColumnOp.from_column_and_tablename(schema, tname, col),
+ )
+
+ def to_column(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> Column[Any]:
+ if self._reverse is not None:
+ return self._reverse.column
+ schema_obj = schemaobj.SchemaObjects(migration_context)
+ return schema_obj.column(self.column_name, NULLTYPE)
+
+ @classmethod
+ def drop_column(
+ cls,
+ operations: Operations,
+ table_name: str,
+ column_name: str,
+ *,
+ schema: Optional[str] = None,
+ **kw: Any,
+ ) -> None:
+ """Issue a "drop column" instruction using the current
+ migration context.
+
+ e.g.::
+
+ drop_column("organization", "account_id")
+
+ :param table_name: name of table
+ :param column_name: name of column
+ :param schema: Optional schema name to operate within. To control
+ quoting of the schema outside of the default behavior, use
+ the SQLAlchemy construct
+ :class:`~sqlalchemy.sql.elements.quoted_name`.
+ :param mssql_drop_check: Optional boolean. When ``True``, on
+ Microsoft SQL Server only, first
+ drop the CHECK constraint on the column using a
+ SQL-script-compatible
+ block that selects into a @variable from sys.check_constraints,
+ then exec's a separate DROP CONSTRAINT for that constraint.
+ :param mssql_drop_default: Optional boolean. When ``True``, on
+ Microsoft SQL Server only, first
+ drop the DEFAULT constraint on the column using a
+ SQL-script-compatible
+ block that selects into a @variable from sys.default_constraints,
+ then exec's a separate DROP CONSTRAINT for that default.
+ :param mssql_drop_foreign_key: Optional boolean. When ``True``, on
+ Microsoft SQL Server only, first
+ drop a single FOREIGN KEY constraint on the column using a
+ SQL-script-compatible
+ block that selects into a @variable from
+ sys.foreign_keys/sys.foreign_key_columns,
+ then exec's a separate DROP CONSTRAINT for that default. Only
+ works if the column has exactly one FK constraint which refers to
+ it, at the moment.
+
+ """
+
+ op = cls(table_name, column_name, schema=schema, **kw)
+ return operations.invoke(op)
+
+ @classmethod
+ def batch_drop_column(
+ cls, operations: BatchOperations, column_name: str, **kw: Any
+ ) -> None:
+ """Issue a "drop column" instruction using the current
+ batch migration context.
+
+ .. seealso::
+
+ :meth:`.Operations.drop_column`
+
+ """
+ op = cls(
+ operations.impl.table_name,
+ column_name,
+ schema=operations.impl.schema,
+ **kw,
+ )
+ return operations.invoke(op)
+
+
+@Operations.register_operation("bulk_insert")
+class BulkInsertOp(MigrateOperation):
+ """Represent a bulk insert operation."""
+
+ def __init__(
+ self,
+ table: Union[Table, TableClause],
+ rows: List[Dict[str, Any]],
+ *,
+ multiinsert: bool = True,
+ ) -> None:
+ self.table = table
+ self.rows = rows
+ self.multiinsert = multiinsert
+
+ @classmethod
+ def bulk_insert(
+ cls,
+ operations: Operations,
+ table: Union[Table, TableClause],
+ rows: List[Dict[str, Any]],
+ *,
+ multiinsert: bool = True,
+ ) -> None:
+ """Issue a "bulk insert" operation using the current
+ migration context.
+
+ This provides a means of representing an INSERT of multiple rows
+ which works equally well in the context of executing on a live
+ connection as well as that of generating a SQL script. In the
+ case of a SQL script, the values are rendered inline into the
+ statement.
+
+ e.g.::
+
+ from alembic import op
+ from datetime import date
+ from sqlalchemy.sql import table, column
+ from sqlalchemy import String, Integer, Date
+
+ # Create an ad-hoc table to use for the insert statement.
+ accounts_table = table(
+ "account",
+ column("id", Integer),
+ column("name", String),
+ column("create_date", Date),
+ )
+
+ op.bulk_insert(
+ accounts_table,
+ [
+ {
+ "id": 1,
+ "name": "John Smith",
+ "create_date": date(2010, 10, 5),
+ },
+ {
+ "id": 2,
+ "name": "Ed Williams",
+ "create_date": date(2007, 5, 27),
+ },
+ {
+ "id": 3,
+ "name": "Wendy Jones",
+ "create_date": date(2008, 8, 15),
+ },
+ ],
+ )
+
+ When using --sql mode, some datatypes may not render inline
+ automatically, such as dates and other special types. When this
+ issue is present, :meth:`.Operations.inline_literal` may be used::
+
+ op.bulk_insert(
+ accounts_table,
+ [
+ {
+ "id": 1,
+ "name": "John Smith",
+ "create_date": op.inline_literal("2010-10-05"),
+ },
+ {
+ "id": 2,
+ "name": "Ed Williams",
+ "create_date": op.inline_literal("2007-05-27"),
+ },
+ {
+ "id": 3,
+ "name": "Wendy Jones",
+ "create_date": op.inline_literal("2008-08-15"),
+ },
+ ],
+ multiinsert=False,
+ )
+
+ When using :meth:`.Operations.inline_literal` in conjunction with
+ :meth:`.Operations.bulk_insert`, in order for the statement to work
+ in "online" (e.g. non --sql) mode, the
+ :paramref:`~.Operations.bulk_insert.multiinsert`
+ flag should be set to ``False``, which will have the effect of
+ individual INSERT statements being emitted to the database, each
+ with a distinct VALUES clause, so that the "inline" values can
+ still be rendered, rather than attempting to pass the values
+ as bound parameters.
+
+ :param table: a table object which represents the target of the INSERT.
+
+ :param rows: a list of dictionaries indicating rows.
+
+ :param multiinsert: when at its default of True and --sql mode is not
+ enabled, the INSERT statement will be executed using
+ "executemany()" style, where all elements in the list of
+ dictionaries are passed as bound parameters in a single
+ list. Setting this to False results in individual INSERT
+ statements being emitted per parameter set, and is needed
+ in those cases where non-literal values are present in the
+ parameter sets.
+
+ """
+
+ op = cls(table, rows, multiinsert=multiinsert)
+ operations.invoke(op)
+
+
+@Operations.register_operation("execute")
+@BatchOperations.register_operation("execute", "batch_execute")
+class ExecuteSQLOp(MigrateOperation):
+ """Represent an execute SQL operation."""
+
+ def __init__(
+ self,
+ sqltext: Union[Executable, str],
+ *,
+ execution_options: Optional[dict[str, Any]] = None,
+ ) -> None:
+ self.sqltext = sqltext
+ self.execution_options = execution_options
+
+ @classmethod
+ def execute(
+ cls,
+ operations: Operations,
+ sqltext: Union[Executable, str],
+ *,
+ execution_options: Optional[dict[str, Any]] = None,
+ ) -> None:
+ r"""Execute the given SQL using the current migration context.
+
+ The given SQL can be a plain string, e.g.::
+
+ op.execute("INSERT INTO table (foo) VALUES ('some value')")
+
+ Or it can be any kind of Core SQL Expression construct, such as
+ below where we use an update construct::
+
+ from sqlalchemy.sql import table, column
+ from sqlalchemy import String
+ from alembic import op
+
+ account = table("account", column("name", String))
+ op.execute(
+ account.update()
+ .where(account.c.name == op.inline_literal("account 1"))
+ .values({"name": op.inline_literal("account 2")})
+ )
+
+ Above, we made use of the SQLAlchemy
+ :func:`sqlalchemy.sql.expression.table` and
+ :func:`sqlalchemy.sql.expression.column` constructs to make a brief,
+ ad-hoc table construct just for our UPDATE statement. A full
+ :class:`~sqlalchemy.schema.Table` construct of course works perfectly
+ fine as well, though note it's a recommended practice to at least
+ ensure the definition of a table is self-contained within the migration
+ script, rather than imported from a module that may break compatibility
+ with older migrations.
+
+ In a SQL script context, the statement is emitted directly to the
+ output stream. There is *no* return result, however, as this
+ function is oriented towards generating a change script
+ that can run in "offline" mode. Additionally, parameterized
+ statements are discouraged here, as they *will not work* in offline
+ mode. Above, we use :meth:`.inline_literal` where parameters are
+ to be used.
+
+ For full interaction with a connected database where parameters can
+ also be used normally, use the "bind" available from the context::
+
+ from alembic import op
+
+ connection = op.get_bind()
+
+ connection.execute(
+ account.update()
+ .where(account.c.name == "account 1")
+ .values({"name": "account 2"})
+ )
+
+ Additionally, when passing the statement as a plain string, it is first
+ coerced into a :func:`sqlalchemy.sql.expression.text` construct
+ before being passed along. In the less likely case that the
+ literal SQL string contains a colon, it must be escaped with a
+ backslash, as::
+
+ op.execute(r"INSERT INTO table (foo) VALUES ('\:colon_value')")
+
+
+ :param sqltext: Any legal SQLAlchemy expression, including:
+
+ * a string
+ * a :func:`sqlalchemy.sql.expression.text` construct.
+ * a :func:`sqlalchemy.sql.expression.insert` construct.
+ * a :func:`sqlalchemy.sql.expression.update` construct.
+ * a :func:`sqlalchemy.sql.expression.delete` construct.
+ * Any "executable" described in SQLAlchemy Core documentation,
+ noting that no result set is returned.
+
+ .. note:: when passing a plain string, the statement is coerced into
+ a :func:`sqlalchemy.sql.expression.text` construct. This construct
+ considers symbols with colons, e.g. ``:foo`` to be bound parameters.
+ To avoid this, ensure that colon symbols are escaped, e.g.
+ ``\:foo``.
+
+ :param execution_options: Optional dictionary of
+ execution options, will be passed to
+ :meth:`sqlalchemy.engine.Connection.execution_options`.
+ """
+ op = cls(sqltext, execution_options=execution_options)
+ return operations.invoke(op)
+
+ @classmethod
+ def batch_execute(
+ cls,
+ operations: Operations,
+ sqltext: Union[Executable, str],
+ *,
+ execution_options: Optional[dict[str, Any]] = None,
+ ) -> None:
+ """Execute the given SQL using the current migration context.
+
+ .. seealso::
+
+ :meth:`.Operations.execute`
+
+ """
+ return cls.execute(
+ operations, sqltext, execution_options=execution_options
+ )
+
+ def to_diff_tuple(self) -> Tuple[str, Union[Executable, str]]:
+ return ("execute", self.sqltext)
+
+
+class OpContainer(MigrateOperation):
+ """Represent a sequence of operations operation."""
+
+ def __init__(self, ops: Sequence[MigrateOperation] = ()) -> None:
+ self.ops = list(ops)
+
+ def is_empty(self) -> bool:
+ return not self.ops
+
+ def as_diffs(self) -> Any:
+ return list(OpContainer._ops_as_diffs(self))
+
+ @classmethod
+ def _ops_as_diffs(
+ cls, migrations: OpContainer
+ ) -> Iterator[Tuple[Any, ...]]:
+ for op in migrations.ops:
+ if hasattr(op, "ops"):
+ yield from cls._ops_as_diffs(cast("OpContainer", op))
+ else:
+ yield op.to_diff_tuple()
+
+
+class ModifyTableOps(OpContainer):
+ """Contains a sequence of operations that all apply to a single Table."""
+
+ def __init__(
+ self,
+ table_name: str,
+ ops: Sequence[MigrateOperation],
+ *,
+ schema: Optional[str] = None,
+ ) -> None:
+ super().__init__(ops)
+ self.table_name = table_name
+ self.schema = schema
+
+ def reverse(self) -> ModifyTableOps:
+ return ModifyTableOps(
+ self.table_name,
+ ops=list(reversed([op.reverse() for op in self.ops])),
+ schema=self.schema,
+ )
+
+
+class UpgradeOps(OpContainer):
+ """contains a sequence of operations that would apply to the
+ 'upgrade' stream of a script.
+
+ .. seealso::
+
+ :ref:`customizing_revision`
+
+ """
+
+ def __init__(
+ self,
+ ops: Sequence[MigrateOperation] = (),
+ upgrade_token: str = "upgrades",
+ ) -> None:
+ super().__init__(ops=ops)
+ self.upgrade_token = upgrade_token
+
+ def reverse_into(self, downgrade_ops: DowngradeOps) -> DowngradeOps:
+ downgrade_ops.ops[:] = list(
+ reversed([op.reverse() for op in self.ops])
+ )
+ return downgrade_ops
+
+ def reverse(self) -> DowngradeOps:
+ return self.reverse_into(DowngradeOps(ops=[]))
+
+
+class DowngradeOps(OpContainer):
+ """contains a sequence of operations that would apply to the
+ 'downgrade' stream of a script.
+
+ .. seealso::
+
+ :ref:`customizing_revision`
+
+ """
+
+ def __init__(
+ self,
+ ops: Sequence[MigrateOperation] = (),
+ downgrade_token: str = "downgrades",
+ ) -> None:
+ super().__init__(ops=ops)
+ self.downgrade_token = downgrade_token
+
+ def reverse(self) -> UpgradeOps:
+ return UpgradeOps(
+ ops=list(reversed([op.reverse() for op in self.ops]))
+ )
+
+
+class MigrationScript(MigrateOperation):
+ """represents a migration script.
+
+ E.g. when autogenerate encounters this object, this corresponds to the
+ production of an actual script file.
+
+ A normal :class:`.MigrationScript` object would contain a single
+ :class:`.UpgradeOps` and a single :class:`.DowngradeOps` directive.
+ These are accessible via the ``.upgrade_ops`` and ``.downgrade_ops``
+ attributes.
+
+ In the case of an autogenerate operation that runs multiple times,
+ such as the multiple database example in the "multidb" template,
+ the ``.upgrade_ops`` and ``.downgrade_ops`` attributes are disabled,
+ and instead these objects should be accessed via the ``.upgrade_ops_list``
+ and ``.downgrade_ops_list`` list-based attributes. These latter
+ attributes are always available at the very least as single-element lists.
+
+ .. seealso::
+
+ :ref:`customizing_revision`
+
+ """
+
+ _needs_render: Optional[bool]
+ _upgrade_ops: List[UpgradeOps]
+ _downgrade_ops: List[DowngradeOps]
+
+ def __init__(
+ self,
+ rev_id: Optional[str],
+ upgrade_ops: UpgradeOps,
+ downgrade_ops: DowngradeOps,
+ *,
+ message: Optional[str] = None,
+ imports: Set[str] = set(),
+ head: Optional[str] = None,
+ splice: Optional[bool] = None,
+ branch_label: Optional[_RevIdType] = None,
+ version_path: Optional[str] = None,
+ depends_on: Optional[_RevIdType] = None,
+ ) -> None:
+ self.rev_id = rev_id
+ self.message = message
+ self.imports = imports
+ self.head = head
+ self.splice = splice
+ self.branch_label = branch_label
+ self.version_path = version_path
+ self.depends_on = depends_on
+ self.upgrade_ops = upgrade_ops
+ self.downgrade_ops = downgrade_ops
+
+ @property
+ def upgrade_ops(self) -> Optional[UpgradeOps]:
+ """An instance of :class:`.UpgradeOps`.
+
+ .. seealso::
+
+ :attr:`.MigrationScript.upgrade_ops_list`
+ """
+ if len(self._upgrade_ops) > 1:
+ raise ValueError(
+ "This MigrationScript instance has a multiple-entry "
+ "list for UpgradeOps; please use the "
+ "upgrade_ops_list attribute."
+ )
+ elif not self._upgrade_ops:
+ return None
+ else:
+ return self._upgrade_ops[0]
+
+ @upgrade_ops.setter
+ def upgrade_ops(
+ self, upgrade_ops: Union[UpgradeOps, List[UpgradeOps]]
+ ) -> None:
+ self._upgrade_ops = util.to_list(upgrade_ops)
+ for elem in self._upgrade_ops:
+ assert isinstance(elem, UpgradeOps)
+
+ @property
+ def downgrade_ops(self) -> Optional[DowngradeOps]:
+ """An instance of :class:`.DowngradeOps`.
+
+ .. seealso::
+
+ :attr:`.MigrationScript.downgrade_ops_list`
+ """
+ if len(self._downgrade_ops) > 1:
+ raise ValueError(
+ "This MigrationScript instance has a multiple-entry "
+ "list for DowngradeOps; please use the "
+ "downgrade_ops_list attribute."
+ )
+ elif not self._downgrade_ops:
+ return None
+ else:
+ return self._downgrade_ops[0]
+
+ @downgrade_ops.setter
+ def downgrade_ops(
+ self, downgrade_ops: Union[DowngradeOps, List[DowngradeOps]]
+ ) -> None:
+ self._downgrade_ops = util.to_list(downgrade_ops)
+ for elem in self._downgrade_ops:
+ assert isinstance(elem, DowngradeOps)
+
+ @property
+ def upgrade_ops_list(self) -> List[UpgradeOps]:
+ """A list of :class:`.UpgradeOps` instances.
+
+ This is used in place of the :attr:`.MigrationScript.upgrade_ops`
+ attribute when dealing with a revision operation that does
+ multiple autogenerate passes.
+
+ """
+ return self._upgrade_ops
+
+ @property
+ def downgrade_ops_list(self) -> List[DowngradeOps]:
+ """A list of :class:`.DowngradeOps` instances.
+
+ This is used in place of the :attr:`.MigrationScript.downgrade_ops`
+ attribute when dealing with a revision operation that does
+ multiple autogenerate passes.
+
+ """
+ return self._downgrade_ops
diff --git a/.venv/lib/python3.12/site-packages/alembic/operations/schemaobj.py b/.venv/lib/python3.12/site-packages/alembic/operations/schemaobj.py
new file mode 100644
index 00000000..59c1002f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/operations/schemaobj.py
@@ -0,0 +1,290 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from __future__ import annotations
+
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy import schema as sa_schema
+from sqlalchemy.sql.schema import Column
+from sqlalchemy.sql.schema import Constraint
+from sqlalchemy.sql.schema import Index
+from sqlalchemy.types import Integer
+from sqlalchemy.types import NULLTYPE
+
+from .. import util
+from ..util import sqla_compat
+
+if TYPE_CHECKING:
+ from sqlalchemy.sql.elements import ColumnElement
+ from sqlalchemy.sql.elements import TextClause
+ from sqlalchemy.sql.schema import CheckConstraint
+ from sqlalchemy.sql.schema import ForeignKey
+ from sqlalchemy.sql.schema import ForeignKeyConstraint
+ from sqlalchemy.sql.schema import MetaData
+ from sqlalchemy.sql.schema import PrimaryKeyConstraint
+ from sqlalchemy.sql.schema import Table
+ from sqlalchemy.sql.schema import UniqueConstraint
+ from sqlalchemy.sql.type_api import TypeEngine
+
+ from ..runtime.migration import MigrationContext
+
+
+class SchemaObjects:
+ def __init__(
+ self, migration_context: Optional[MigrationContext] = None
+ ) -> None:
+ self.migration_context = migration_context
+
+ def primary_key_constraint(
+ self,
+ name: Optional[sqla_compat._ConstraintNameDefined],
+ table_name: str,
+ cols: Sequence[str],
+ schema: Optional[str] = None,
+ **dialect_kw,
+ ) -> PrimaryKeyConstraint:
+ m = self.metadata()
+ columns = [sa_schema.Column(n, NULLTYPE) for n in cols]
+ t = sa_schema.Table(table_name, m, *columns, schema=schema)
+ # SQLAlchemy primary key constraint name arg is wrongly typed on
+ # the SQLAlchemy side through 2.0.5 at least
+ p = sa_schema.PrimaryKeyConstraint(
+ *[t.c[n] for n in cols], name=name, **dialect_kw # type: ignore
+ )
+ return p
+
+ def foreign_key_constraint(
+ self,
+ name: Optional[sqla_compat._ConstraintNameDefined],
+ source: str,
+ referent: str,
+ local_cols: List[str],
+ remote_cols: List[str],
+ onupdate: Optional[str] = None,
+ ondelete: Optional[str] = None,
+ deferrable: Optional[bool] = None,
+ source_schema: Optional[str] = None,
+ referent_schema: Optional[str] = None,
+ initially: Optional[str] = None,
+ match: Optional[str] = None,
+ **dialect_kw,
+ ) -> ForeignKeyConstraint:
+ m = self.metadata()
+ if source == referent and source_schema == referent_schema:
+ t1_cols = local_cols + remote_cols
+ else:
+ t1_cols = local_cols
+ sa_schema.Table(
+ referent,
+ m,
+ *[sa_schema.Column(n, NULLTYPE) for n in remote_cols],
+ schema=referent_schema,
+ )
+
+ t1 = sa_schema.Table(
+ source,
+ m,
+ *[
+ sa_schema.Column(n, NULLTYPE)
+ for n in util.unique_list(t1_cols)
+ ],
+ schema=source_schema,
+ )
+
+ tname = (
+ "%s.%s" % (referent_schema, referent)
+ if referent_schema
+ else referent
+ )
+
+ dialect_kw["match"] = match
+
+ f = sa_schema.ForeignKeyConstraint(
+ local_cols,
+ ["%s.%s" % (tname, n) for n in remote_cols],
+ name=name,
+ onupdate=onupdate,
+ ondelete=ondelete,
+ deferrable=deferrable,
+ initially=initially,
+ **dialect_kw,
+ )
+ t1.append_constraint(f)
+
+ return f
+
+ def unique_constraint(
+ self,
+ name: Optional[sqla_compat._ConstraintNameDefined],
+ source: str,
+ local_cols: Sequence[str],
+ schema: Optional[str] = None,
+ **kw,
+ ) -> UniqueConstraint:
+ t = sa_schema.Table(
+ source,
+ self.metadata(),
+ *[sa_schema.Column(n, NULLTYPE) for n in local_cols],
+ schema=schema,
+ )
+ kw["name"] = name
+ uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw)
+ # TODO: need event tests to ensure the event
+ # is fired off here
+ t.append_constraint(uq)
+ return uq
+
+ def check_constraint(
+ self,
+ name: Optional[sqla_compat._ConstraintNameDefined],
+ source: str,
+ condition: Union[str, TextClause, ColumnElement[Any]],
+ schema: Optional[str] = None,
+ **kw,
+ ) -> Union[CheckConstraint]:
+ t = sa_schema.Table(
+ source,
+ self.metadata(),
+ sa_schema.Column("x", Integer),
+ schema=schema,
+ )
+ ck = sa_schema.CheckConstraint(condition, name=name, **kw)
+ t.append_constraint(ck)
+ return ck
+
+ def generic_constraint(
+ self,
+ name: Optional[sqla_compat._ConstraintNameDefined],
+ table_name: str,
+ type_: Optional[str],
+ schema: Optional[str] = None,
+ **kw,
+ ) -> Any:
+ t = self.table(table_name, schema=schema)
+ types: Dict[Optional[str], Any] = {
+ "foreignkey": lambda name: sa_schema.ForeignKeyConstraint(
+ [], [], name=name
+ ),
+ "primary": sa_schema.PrimaryKeyConstraint,
+ "unique": sa_schema.UniqueConstraint,
+ "check": lambda name: sa_schema.CheckConstraint("", name=name),
+ None: sa_schema.Constraint,
+ }
+ try:
+ const = types[type_]
+ except KeyError as ke:
+ raise TypeError(
+ "'type' can be one of %s"
+ % ", ".join(sorted(repr(x) for x in types))
+ ) from ke
+ else:
+ const = const(name=name)
+ t.append_constraint(const)
+ return const
+
+ def metadata(self) -> MetaData:
+ kw = {}
+ if (
+ self.migration_context is not None
+ and "target_metadata" in self.migration_context.opts
+ ):
+ mt = self.migration_context.opts["target_metadata"]
+ if hasattr(mt, "naming_convention"):
+ kw["naming_convention"] = mt.naming_convention
+ return sa_schema.MetaData(**kw)
+
+ def table(self, name: str, *columns, **kw) -> Table:
+ m = self.metadata()
+
+ cols = [
+ sqla_compat._copy(c) if c.table is not None else c
+ for c in columns
+ if isinstance(c, Column)
+ ]
+ # these flags have already added their UniqueConstraint /
+ # Index objects to the table, so flip them off here.
+ # SQLAlchemy tometadata() avoids this instead by preserving the
+ # flags and skipping the constraints that have _type_bound on them,
+ # but for a migration we'd rather list out the constraints
+ # explicitly.
+ _constraints_included = kw.pop("_constraints_included", False)
+ if _constraints_included:
+ for c in cols:
+ c.unique = c.index = False
+
+ t = sa_schema.Table(name, m, *cols, **kw)
+
+ constraints = [
+ (
+ sqla_compat._copy(elem, target_table=t)
+ if getattr(elem, "parent", None) is not t
+ and getattr(elem, "parent", None) is not None
+ else elem
+ )
+ for elem in columns
+ if isinstance(elem, (Constraint, Index))
+ ]
+
+ for const in constraints:
+ t.append_constraint(const)
+
+ for f in t.foreign_keys:
+ self._ensure_table_for_fk(m, f)
+ return t
+
+ def column(self, name: str, type_: TypeEngine, **kw) -> Column:
+ return sa_schema.Column(name, type_, **kw)
+
+ def index(
+ self,
+ name: Optional[str],
+ tablename: Optional[str],
+ columns: Sequence[Union[str, TextClause, ColumnElement[Any]]],
+ schema: Optional[str] = None,
+ **kw,
+ ) -> Index:
+ t = sa_schema.Table(
+ tablename or "no_table",
+ self.metadata(),
+ schema=schema,
+ )
+ kw["_table"] = t
+ idx = sa_schema.Index(
+ name,
+ *[util.sqla_compat._textual_index_column(t, n) for n in columns],
+ **kw,
+ )
+ return idx
+
+ def _parse_table_key(self, table_key: str) -> Tuple[Optional[str], str]:
+ if "." in table_key:
+ tokens = table_key.split(".")
+ sname: Optional[str] = ".".join(tokens[0:-1])
+ tname = tokens[-1]
+ else:
+ tname = table_key
+ sname = None
+ return (sname, tname)
+
+ def _ensure_table_for_fk(self, metadata: MetaData, fk: ForeignKey) -> None:
+ """create a placeholder Table object for the referent of a
+ ForeignKey.
+
+ """
+ if isinstance(fk._colspec, str):
+ table_key, cname = fk._colspec.rsplit(".", 1)
+ sname, tname = self._parse_table_key(table_key)
+ if table_key not in metadata.tables:
+ rel_t = sa_schema.Table(tname, metadata, schema=sname)
+ else:
+ rel_t = metadata.tables[table_key]
+ if cname not in rel_t.c:
+ rel_t.append_column(sa_schema.Column(cname, NULLTYPE))
diff --git a/.venv/lib/python3.12/site-packages/alembic/operations/toimpl.py b/.venv/lib/python3.12/site-packages/alembic/operations/toimpl.py
new file mode 100644
index 00000000..528c0542
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/operations/toimpl.py
@@ -0,0 +1,225 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from typing import TYPE_CHECKING
+
+from sqlalchemy import schema as sa_schema
+
+from . import ops
+from .base import Operations
+from ..util.sqla_compat import _copy
+
+if TYPE_CHECKING:
+ from sqlalchemy.sql.schema import Table
+
+
+@Operations.implementation_for(ops.AlterColumnOp)
+def alter_column(
+ operations: "Operations", operation: "ops.AlterColumnOp"
+) -> None:
+ compiler = operations.impl.dialect.statement_compiler(
+ operations.impl.dialect, None
+ )
+
+ existing_type = operation.existing_type
+ existing_nullable = operation.existing_nullable
+ existing_server_default = operation.existing_server_default
+ type_ = operation.modify_type
+ column_name = operation.column_name
+ table_name = operation.table_name
+ schema = operation.schema
+ server_default = operation.modify_server_default
+ new_column_name = operation.modify_name
+ nullable = operation.modify_nullable
+ comment = operation.modify_comment
+ existing_comment = operation.existing_comment
+
+ def _count_constraint(constraint):
+ return not isinstance(constraint, sa_schema.PrimaryKeyConstraint) and (
+ not constraint._create_rule or constraint._create_rule(compiler)
+ )
+
+ if existing_type and type_:
+ t = operations.schema_obj.table(
+ table_name,
+ sa_schema.Column(column_name, existing_type),
+ schema=schema,
+ )
+ for constraint in t.constraints:
+ if _count_constraint(constraint):
+ operations.impl.drop_constraint(constraint)
+
+ operations.impl.alter_column(
+ table_name,
+ column_name,
+ nullable=nullable,
+ server_default=server_default,
+ name=new_column_name,
+ type_=type_,
+ schema=schema,
+ existing_type=existing_type,
+ existing_server_default=existing_server_default,
+ existing_nullable=existing_nullable,
+ comment=comment,
+ existing_comment=existing_comment,
+ **operation.kw,
+ )
+
+ if type_:
+ t = operations.schema_obj.table(
+ table_name,
+ operations.schema_obj.column(column_name, type_),
+ schema=schema,
+ )
+ for constraint in t.constraints:
+ if _count_constraint(constraint):
+ operations.impl.add_constraint(constraint)
+
+
+@Operations.implementation_for(ops.DropTableOp)
+def drop_table(operations: "Operations", operation: "ops.DropTableOp") -> None:
+ kw = {}
+ if operation.if_exists is not None:
+ kw["if_exists"] = operation.if_exists
+ operations.impl.drop_table(
+ operation.to_table(operations.migration_context), **kw
+ )
+
+
+@Operations.implementation_for(ops.DropColumnOp)
+def drop_column(
+ operations: "Operations", operation: "ops.DropColumnOp"
+) -> None:
+ column = operation.to_column(operations.migration_context)
+ operations.impl.drop_column(
+ operation.table_name, column, schema=operation.schema, **operation.kw
+ )
+
+
+@Operations.implementation_for(ops.CreateIndexOp)
+def create_index(
+ operations: "Operations", operation: "ops.CreateIndexOp"
+) -> None:
+ idx = operation.to_index(operations.migration_context)
+ kw = {}
+ if operation.if_not_exists is not None:
+ kw["if_not_exists"] = operation.if_not_exists
+ operations.impl.create_index(idx, **kw)
+
+
+@Operations.implementation_for(ops.DropIndexOp)
+def drop_index(operations: "Operations", operation: "ops.DropIndexOp") -> None:
+ kw = {}
+ if operation.if_exists is not None:
+ kw["if_exists"] = operation.if_exists
+
+ operations.impl.drop_index(
+ operation.to_index(operations.migration_context),
+ **kw,
+ )
+
+
+@Operations.implementation_for(ops.CreateTableOp)
+def create_table(
+ operations: "Operations", operation: "ops.CreateTableOp"
+) -> "Table":
+ kw = {}
+ if operation.if_not_exists is not None:
+ kw["if_not_exists"] = operation.if_not_exists
+ table = operation.to_table(operations.migration_context)
+ operations.impl.create_table(table, **kw)
+ return table
+
+
+@Operations.implementation_for(ops.RenameTableOp)
+def rename_table(
+ operations: "Operations", operation: "ops.RenameTableOp"
+) -> None:
+ operations.impl.rename_table(
+ operation.table_name, operation.new_table_name, schema=operation.schema
+ )
+
+
+@Operations.implementation_for(ops.CreateTableCommentOp)
+def create_table_comment(
+ operations: "Operations", operation: "ops.CreateTableCommentOp"
+) -> None:
+ table = operation.to_table(operations.migration_context)
+ operations.impl.create_table_comment(table)
+
+
+@Operations.implementation_for(ops.DropTableCommentOp)
+def drop_table_comment(
+ operations: "Operations", operation: "ops.DropTableCommentOp"
+) -> None:
+ table = operation.to_table(operations.migration_context)
+ operations.impl.drop_table_comment(table)
+
+
+@Operations.implementation_for(ops.AddColumnOp)
+def add_column(operations: "Operations", operation: "ops.AddColumnOp") -> None:
+ table_name = operation.table_name
+ column = operation.column
+ schema = operation.schema
+ kw = operation.kw
+
+ if column.table is not None:
+ column = _copy(column)
+
+ t = operations.schema_obj.table(table_name, column, schema=schema)
+ operations.impl.add_column(table_name, column, schema=schema, **kw)
+
+ for constraint in t.constraints:
+ if not isinstance(constraint, sa_schema.PrimaryKeyConstraint):
+ operations.impl.add_constraint(constraint)
+ for index in t.indexes:
+ operations.impl.create_index(index)
+
+ with_comment = (
+ operations.impl.dialect.supports_comments
+ and not operations.impl.dialect.inline_comments
+ )
+ comment = column.comment
+ if comment and with_comment:
+ operations.impl.create_column_comment(column)
+
+
+@Operations.implementation_for(ops.AddConstraintOp)
+def create_constraint(
+ operations: "Operations", operation: "ops.AddConstraintOp"
+) -> None:
+ operations.impl.add_constraint(
+ operation.to_constraint(operations.migration_context)
+ )
+
+
+@Operations.implementation_for(ops.DropConstraintOp)
+def drop_constraint(
+ operations: "Operations", operation: "ops.DropConstraintOp"
+) -> None:
+ operations.impl.drop_constraint(
+ operations.schema_obj.generic_constraint(
+ operation.constraint_name,
+ operation.table_name,
+ operation.constraint_type,
+ schema=operation.schema,
+ )
+ )
+
+
+@Operations.implementation_for(ops.BulkInsertOp)
+def bulk_insert(
+ operations: "Operations", operation: "ops.BulkInsertOp"
+) -> None:
+ operations.impl.bulk_insert( # type: ignore[union-attr]
+ operation.table, operation.rows, multiinsert=operation.multiinsert
+ )
+
+
+@Operations.implementation_for(ops.ExecuteSQLOp)
+def execute_sql(
+ operations: "Operations", operation: "ops.ExecuteSQLOp"
+) -> None:
+ operations.migration_context.impl.execute(
+ operation.sqltext, execution_options=operation.execution_options
+ )