about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/alembic/autogenerate
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/autogenerate')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/autogenerate/__init__.py10
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py650
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/autogenerate/compare.py1317
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/autogenerate/render.py1125
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/autogenerate/rewriter.py240
5 files changed, 3342 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/autogenerate/__init__.py b/.venv/lib/python3.12/site-packages/alembic/autogenerate/__init__.py
new file mode 100644
index 00000000..445ddb25
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/autogenerate/__init__.py
@@ -0,0 +1,10 @@
+from .api import _render_migration_diffs as _render_migration_diffs
+from .api import compare_metadata as compare_metadata
+from .api import produce_migrations as produce_migrations
+from .api import render_python_code as render_python_code
+from .api import RevisionContext as RevisionContext
+from .compare import _produce_net_changes as _produce_net_changes
+from .compare import comparators as comparators
+from .render import render_op_text as render_op_text
+from .render import renderers as renderers
+from .rewriter import Rewriter as Rewriter
diff --git a/.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py b/.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py
new file mode 100644
index 00000000..811462e8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py
@@ -0,0 +1,650 @@
+from __future__ import annotations
+
+import contextlib
+from typing import Any
+from typing import Dict
+from typing import Iterator
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Set
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy import inspect
+
+from . import compare
+from . import render
+from .. import util
+from ..operations import ops
+from ..util import sqla_compat
+
+"""Provide the 'autogenerate' feature which can produce migration operations
+automatically."""
+
+if TYPE_CHECKING:
+    from sqlalchemy.engine import Connection
+    from sqlalchemy.engine import Dialect
+    from sqlalchemy.engine import Inspector
+    from sqlalchemy.sql.schema import MetaData
+    from sqlalchemy.sql.schema import SchemaItem
+    from sqlalchemy.sql.schema import Table
+
+    from ..config import Config
+    from ..operations.ops import DowngradeOps
+    from ..operations.ops import MigrationScript
+    from ..operations.ops import UpgradeOps
+    from ..runtime.environment import NameFilterParentNames
+    from ..runtime.environment import NameFilterType
+    from ..runtime.environment import ProcessRevisionDirectiveFn
+    from ..runtime.environment import RenderItemFn
+    from ..runtime.migration import MigrationContext
+    from ..script.base import Script
+    from ..script.base import ScriptDirectory
+    from ..script.revision import _GetRevArg
+
+
+def compare_metadata(context: MigrationContext, metadata: MetaData) -> Any:
+    """Compare a database schema to that given in a
+    :class:`~sqlalchemy.schema.MetaData` instance.
+
+    The database connection is presented in the context
+    of a :class:`.MigrationContext` object, which
+    provides database connectivity as well as optional
+    comparison functions to use for datatypes and
+    server defaults - see the "autogenerate" arguments
+    at :meth:`.EnvironmentContext.configure`
+    for details on these.
+
+    The return format is a list of "diff" directives,
+    each representing individual differences::
+
+        from alembic.migration import MigrationContext
+        from alembic.autogenerate import compare_metadata
+        from sqlalchemy import (
+            create_engine,
+            MetaData,
+            Column,
+            Integer,
+            String,
+            Table,
+            text,
+        )
+        import pprint
+
+        engine = create_engine("sqlite://")
+
+        with engine.begin() as conn:
+            conn.execute(
+                text(
+                    '''
+                        create table foo (
+                            id integer not null primary key,
+                            old_data varchar,
+                            x integer
+                        )
+                    '''
+                )
+            )
+            conn.execute(text("create table bar (data varchar)"))
+
+        metadata = MetaData()
+        Table(
+            "foo",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("data", Integer),
+            Column("x", Integer, nullable=False),
+        )
+        Table("bat", metadata, Column("info", String))
+
+        mc = MigrationContext.configure(engine.connect())
+
+        diff = compare_metadata(mc, metadata)
+        pprint.pprint(diff, indent=2, width=20)
+
+    Output::
+
+        [
+            (
+                "add_table",
+                Table(
+                    "bat",
+                    MetaData(),
+                    Column("info", String(), table=<bat>),
+                    schema=None,
+                ),
+            ),
+            (
+                "remove_table",
+                Table(
+                    "bar",
+                    MetaData(),
+                    Column("data", VARCHAR(), table=<bar>),
+                    schema=None,
+                ),
+            ),
+            (
+                "add_column",
+                None,
+                "foo",
+                Column("data", Integer(), table=<foo>),
+            ),
+            [
+                (
+                    "modify_nullable",
+                    None,
+                    "foo",
+                    "x",
+                    {
+                        "existing_comment": None,
+                        "existing_server_default": False,
+                        "existing_type": INTEGER(),
+                    },
+                    True,
+                    False,
+                )
+            ],
+            (
+                "remove_column",
+                None,
+                "foo",
+                Column("old_data", VARCHAR(), table=<foo>),
+            ),
+        ]
+
+    :param context: a :class:`.MigrationContext`
+     instance.
+    :param metadata: a :class:`~sqlalchemy.schema.MetaData`
+     instance.
+
+    .. seealso::
+
+        :func:`.produce_migrations` - produces a :class:`.MigrationScript`
+        structure based on metadata comparison.
+
+    """
+
+    migration_script = produce_migrations(context, metadata)
+    assert migration_script.upgrade_ops is not None
+    return migration_script.upgrade_ops.as_diffs()
+
+
+def produce_migrations(
+    context: MigrationContext, metadata: MetaData
+) -> MigrationScript:
+    """Produce a :class:`.MigrationScript` structure based on schema
+    comparison.
+
+    This function does essentially what :func:`.compare_metadata` does,
+    but then runs the resulting list of diffs to produce the full
+    :class:`.MigrationScript` object.   For an example of what this looks like,
+    see the example in :ref:`customizing_revision`.
+
+    .. seealso::
+
+        :func:`.compare_metadata` - returns more fundamental "diff"
+        data from comparing a schema.
+
+    """
+
+    autogen_context = AutogenContext(context, metadata=metadata)
+
+    migration_script = ops.MigrationScript(
+        rev_id=None,
+        upgrade_ops=ops.UpgradeOps([]),
+        downgrade_ops=ops.DowngradeOps([]),
+    )
+
+    compare._populate_migration_script(autogen_context, migration_script)
+
+    return migration_script
+
+
+def render_python_code(
+    up_or_down_op: Union[UpgradeOps, DowngradeOps],
+    sqlalchemy_module_prefix: str = "sa.",
+    alembic_module_prefix: str = "op.",
+    render_as_batch: bool = False,
+    imports: Sequence[str] = (),
+    render_item: Optional[RenderItemFn] = None,
+    migration_context: Optional[MigrationContext] = None,
+    user_module_prefix: Optional[str] = None,
+) -> str:
+    """Render Python code given an :class:`.UpgradeOps` or
+    :class:`.DowngradeOps` object.
+
+    This is a convenience function that can be used to test the
+    autogenerate output of a user-defined :class:`.MigrationScript` structure.
+
+    :param up_or_down_op: :class:`.UpgradeOps` or :class:`.DowngradeOps` object
+    :param sqlalchemy_module_prefix: module prefix for SQLAlchemy objects
+    :param alembic_module_prefix: module prefix for Alembic constructs
+    :param render_as_batch: use "batch operations" style for rendering
+    :param imports: sequence of import symbols to add
+    :param render_item: callable to render items
+    :param migration_context: optional :class:`.MigrationContext`
+    :param user_module_prefix: optional string prefix for user-defined types
+
+     .. versionadded:: 1.11.0
+
+    """
+    opts = {
+        "sqlalchemy_module_prefix": sqlalchemy_module_prefix,
+        "alembic_module_prefix": alembic_module_prefix,
+        "render_item": render_item,
+        "render_as_batch": render_as_batch,
+        "user_module_prefix": user_module_prefix,
+    }
+
+    if migration_context is None:
+        from ..runtime.migration import MigrationContext
+        from sqlalchemy.engine.default import DefaultDialect
+
+        migration_context = MigrationContext.configure(
+            dialect=DefaultDialect()
+        )
+
+    autogen_context = AutogenContext(migration_context, opts=opts)
+    autogen_context.imports = set(imports)
+    return render._indent(
+        render._render_cmd_body(up_or_down_op, autogen_context)
+    )
+
+
+def _render_migration_diffs(
+    context: MigrationContext, template_args: Dict[Any, Any]
+) -> None:
+    """legacy, used by test_autogen_composition at the moment"""
+
+    autogen_context = AutogenContext(context)
+
+    upgrade_ops = ops.UpgradeOps([])
+    compare._produce_net_changes(autogen_context, upgrade_ops)
+
+    migration_script = ops.MigrationScript(
+        rev_id=None,
+        upgrade_ops=upgrade_ops,
+        downgrade_ops=upgrade_ops.reverse(),
+    )
+
+    render._render_python_into_templatevars(
+        autogen_context, migration_script, template_args
+    )
+
+
+class AutogenContext:
+    """Maintains configuration and state that's specific to an
+    autogenerate operation."""
+
+    metadata: Union[MetaData, Sequence[MetaData], None] = None
+    """The :class:`~sqlalchemy.schema.MetaData` object
+    representing the destination.
+
+    This object is the one that is passed within ``env.py``
+    to the :paramref:`.EnvironmentContext.configure.target_metadata`
+    parameter.  It represents the structure of :class:`.Table` and other
+    objects as stated in the current database model, and represents the
+    destination structure for the database being examined.
+
+    While the :class:`~sqlalchemy.schema.MetaData` object is primarily
+    known as a collection of :class:`~sqlalchemy.schema.Table` objects,
+    it also has an :attr:`~sqlalchemy.schema.MetaData.info` dictionary
+    that may be used by end-user schemes to store additional schema-level
+    objects that are to be compared in custom autogeneration schemes.
+
+    """
+
+    connection: Optional[Connection] = None
+    """The :class:`~sqlalchemy.engine.base.Connection` object currently
+    connected to the database backend being compared.
+
+    This is obtained from the :attr:`.MigrationContext.bind` and is
+    ultimately set up in the ``env.py`` script.
+
+    """
+
+    dialect: Optional[Dialect] = None
+    """The :class:`~sqlalchemy.engine.Dialect` object currently in use.
+
+    This is normally obtained from the
+    :attr:`~sqlalchemy.engine.base.Connection.dialect` attribute.
+
+    """
+
+    imports: Set[str] = None  # type: ignore[assignment]
+    """A ``set()`` which contains string Python import directives.
+
+    The directives are to be rendered into the ``${imports}`` section
+    of a script template.  The set is normally empty and can be modified
+    within hooks such as the
+    :paramref:`.EnvironmentContext.configure.render_item` hook.
+
+    .. seealso::
+
+        :ref:`autogen_render_types`
+
+    """
+
+    migration_context: MigrationContext = None  # type: ignore[assignment]
+    """The :class:`.MigrationContext` established by the ``env.py`` script."""
+
+    def __init__(
+        self,
+        migration_context: MigrationContext,
+        metadata: Union[MetaData, Sequence[MetaData], None] = None,
+        opts: Optional[Dict[str, Any]] = None,
+        autogenerate: bool = True,
+    ) -> None:
+        if (
+            autogenerate
+            and migration_context is not None
+            and migration_context.as_sql
+        ):
+            raise util.CommandError(
+                "autogenerate can't use as_sql=True as it prevents querying "
+                "the database for schema information"
+            )
+
+        if opts is None:
+            opts = migration_context.opts
+
+        self.metadata = metadata = (
+            opts.get("target_metadata", None) if metadata is None else metadata
+        )
+
+        if (
+            autogenerate
+            and metadata is None
+            and migration_context is not None
+            and migration_context.script is not None
+        ):
+            raise util.CommandError(
+                "Can't proceed with --autogenerate option; environment "
+                "script %s does not provide "
+                "a MetaData object or sequence of objects to the context."
+                % (migration_context.script.env_py_location)
+            )
+
+        include_object = opts.get("include_object", None)
+        include_name = opts.get("include_name", None)
+
+        object_filters = []
+        name_filters = []
+        if include_object:
+            object_filters.append(include_object)
+        if include_name:
+            name_filters.append(include_name)
+
+        self._object_filters = object_filters
+        self._name_filters = name_filters
+
+        self.migration_context = migration_context
+        if self.migration_context is not None:
+            self.connection = self.migration_context.bind
+            self.dialect = self.migration_context.dialect
+
+        self.imports = set()
+        self.opts: Dict[str, Any] = opts
+        self._has_batch: bool = False
+
+    @util.memoized_property
+    def inspector(self) -> Inspector:
+        if self.connection is None:
+            raise TypeError(
+                "can't return inspector as this "
+                "AutogenContext has no database connection"
+            )
+        return inspect(self.connection)
+
+    @contextlib.contextmanager
+    def _within_batch(self) -> Iterator[None]:
+        self._has_batch = True
+        yield
+        self._has_batch = False
+
+    def run_name_filters(
+        self,
+        name: Optional[str],
+        type_: NameFilterType,
+        parent_names: NameFilterParentNames,
+    ) -> bool:
+        """Run the context's name filters and return True if the targets
+        should be part of the autogenerate operation.
+
+        This method should be run for every kind of name encountered within the
+        reflection side of an autogenerate operation, giving the environment
+        the chance to filter what names should be reflected as database
+        objects.  The filters here are produced directly via the
+        :paramref:`.EnvironmentContext.configure.include_name` parameter.
+
+        """
+        if "schema_name" in parent_names:
+            if type_ == "table":
+                table_name = name
+            else:
+                table_name = parent_names.get("table_name", None)
+            if table_name:
+                schema_name = parent_names["schema_name"]
+                if schema_name:
+                    parent_names["schema_qualified_table_name"] = "%s.%s" % (
+                        schema_name,
+                        table_name,
+                    )
+                else:
+                    parent_names["schema_qualified_table_name"] = table_name
+
+        for fn in self._name_filters:
+            if not fn(name, type_, parent_names):
+                return False
+        else:
+            return True
+
+    def run_object_filters(
+        self,
+        object_: SchemaItem,
+        name: sqla_compat._ConstraintName,
+        type_: NameFilterType,
+        reflected: bool,
+        compare_to: Optional[SchemaItem],
+    ) -> bool:
+        """Run the context's object filters and return True if the targets
+        should be part of the autogenerate operation.
+
+        This method should be run for every kind of object encountered within
+        an autogenerate operation, giving the environment the chance
+        to filter what objects should be included in the comparison.
+        The filters here are produced directly via the
+        :paramref:`.EnvironmentContext.configure.include_object` parameter.
+
+        """
+        for fn in self._object_filters:
+            if not fn(object_, name, type_, reflected, compare_to):
+                return False
+        else:
+            return True
+
+    run_filters = run_object_filters
+
+    @util.memoized_property
+    def sorted_tables(self) -> List[Table]:
+        """Return an aggregate of the :attr:`.MetaData.sorted_tables`
+        collection(s).
+
+        For a sequence of :class:`.MetaData` objects, this
+        concatenates the :attr:`.MetaData.sorted_tables` collection
+        for each individual :class:`.MetaData`  in the order of the
+        sequence.  It does **not** collate the sorted tables collections.
+
+        """
+        result = []
+        for m in util.to_list(self.metadata):
+            result.extend(m.sorted_tables)
+        return result
+
+    @util.memoized_property
+    def table_key_to_table(self) -> Dict[str, Table]:
+        """Return an aggregate  of the :attr:`.MetaData.tables` dictionaries.
+
+        The :attr:`.MetaData.tables` collection is a dictionary of table key
+        to :class:`.Table`; this method aggregates the dictionary across
+        multiple :class:`.MetaData` objects into one dictionary.
+
+        Duplicate table keys are **not** supported; if two :class:`.MetaData`
+        objects contain the same table key, an exception is raised.
+
+        """
+        result: Dict[str, Table] = {}
+        for m in util.to_list(self.metadata):
+            intersect = set(result).intersection(set(m.tables))
+            if intersect:
+                raise ValueError(
+                    "Duplicate table keys across multiple "
+                    "MetaData objects: %s"
+                    % (", ".join('"%s"' % key for key in sorted(intersect)))
+                )
+
+            result.update(m.tables)
+        return result
+
+
+class RevisionContext:
+    """Maintains configuration and state that's specific to a revision
+    file generation operation."""
+
+    generated_revisions: List[MigrationScript]
+    process_revision_directives: Optional[ProcessRevisionDirectiveFn]
+
+    def __init__(
+        self,
+        config: Config,
+        script_directory: ScriptDirectory,
+        command_args: Dict[str, Any],
+        process_revision_directives: Optional[
+            ProcessRevisionDirectiveFn
+        ] = None,
+    ) -> None:
+        self.config = config
+        self.script_directory = script_directory
+        self.command_args = command_args
+        self.process_revision_directives = process_revision_directives
+        self.template_args = {
+            "config": config  # Let templates use config for
+            # e.g. multiple databases
+        }
+        self.generated_revisions = [self._default_revision()]
+
+    def _to_script(
+        self, migration_script: MigrationScript
+    ) -> Optional[Script]:
+        template_args: Dict[str, Any] = self.template_args.copy()
+
+        if getattr(migration_script, "_needs_render", False):
+            autogen_context = self._last_autogen_context
+
+            # clear out existing imports if we are doing multiple
+            # renders
+            autogen_context.imports = set()
+            if migration_script.imports:
+                autogen_context.imports.update(migration_script.imports)
+            render._render_python_into_templatevars(
+                autogen_context, migration_script, template_args
+            )
+
+        assert migration_script.rev_id is not None
+        return self.script_directory.generate_revision(
+            migration_script.rev_id,
+            migration_script.message,
+            refresh=True,
+            head=migration_script.head,
+            splice=migration_script.splice,
+            branch_labels=migration_script.branch_label,
+            version_path=migration_script.version_path,
+            depends_on=migration_script.depends_on,
+            **template_args,
+        )
+
+    def run_autogenerate(
+        self, rev: _GetRevArg, migration_context: MigrationContext
+    ) -> None:
+        self._run_environment(rev, migration_context, True)
+
+    def run_no_autogenerate(
+        self, rev: _GetRevArg, migration_context: MigrationContext
+    ) -> None:
+        self._run_environment(rev, migration_context, False)
+
+    def _run_environment(
+        self,
+        rev: _GetRevArg,
+        migration_context: MigrationContext,
+        autogenerate: bool,
+    ) -> None:
+        if autogenerate:
+            if self.command_args["sql"]:
+                raise util.CommandError(
+                    "Using --sql with --autogenerate does not make any sense"
+                )
+            if set(self.script_directory.get_revisions(rev)) != set(
+                self.script_directory.get_revisions("heads")
+            ):
+                raise util.CommandError("Target database is not up to date.")
+
+        upgrade_token = migration_context.opts["upgrade_token"]
+        downgrade_token = migration_context.opts["downgrade_token"]
+
+        migration_script = self.generated_revisions[-1]
+        if not getattr(migration_script, "_needs_render", False):
+            migration_script.upgrade_ops_list[-1].upgrade_token = upgrade_token
+            migration_script.downgrade_ops_list[-1].downgrade_token = (
+                downgrade_token
+            )
+            migration_script._needs_render = True
+        else:
+            migration_script._upgrade_ops.append(
+                ops.UpgradeOps([], upgrade_token=upgrade_token)
+            )
+            migration_script._downgrade_ops.append(
+                ops.DowngradeOps([], downgrade_token=downgrade_token)
+            )
+
+        autogen_context = AutogenContext(
+            migration_context, autogenerate=autogenerate
+        )
+        self._last_autogen_context: AutogenContext = autogen_context
+
+        if autogenerate:
+            compare._populate_migration_script(
+                autogen_context, migration_script
+            )
+
+        if self.process_revision_directives:
+            self.process_revision_directives(
+                migration_context, rev, self.generated_revisions
+            )
+
+        hook = migration_context.opts["process_revision_directives"]
+        if hook:
+            hook(migration_context, rev, self.generated_revisions)
+
+        for migration_script in self.generated_revisions:
+            migration_script._needs_render = True
+
+    def _default_revision(self) -> MigrationScript:
+        command_args: Dict[str, Any] = self.command_args
+        op = ops.MigrationScript(
+            rev_id=command_args["rev_id"] or util.rev_id(),
+            message=command_args["message"],
+            upgrade_ops=ops.UpgradeOps([]),
+            downgrade_ops=ops.DowngradeOps([]),
+            head=command_args["head"],
+            splice=command_args["splice"],
+            branch_label=command_args["branch_label"],
+            version_path=command_args["version_path"],
+            depends_on=command_args["depends_on"],
+        )
+        return op
+
+    def generate_scripts(self) -> Iterator[Optional[Script]]:
+        for generated_revision in self.generated_revisions:
+            yield self._to_script(generated_revision)
diff --git a/.venv/lib/python3.12/site-packages/alembic/autogenerate/compare.py b/.venv/lib/python3.12/site-packages/alembic/autogenerate/compare.py
new file mode 100644
index 00000000..8d6d8f1b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/autogenerate/compare.py
@@ -0,0 +1,1317 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from __future__ import annotations
+
+import contextlib
+import logging
+import re
+from typing import Any
+from typing import cast
+from typing import Dict
+from typing import Iterator
+from typing import Mapping
+from typing import Optional
+from typing import Set
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from sqlalchemy import event
+from sqlalchemy import inspect
+from sqlalchemy import schema as sa_schema
+from sqlalchemy import text
+from sqlalchemy import types as sqltypes
+from sqlalchemy.sql import expression
+from sqlalchemy.sql.schema import ForeignKeyConstraint
+from sqlalchemy.sql.schema import Index
+from sqlalchemy.sql.schema import UniqueConstraint
+from sqlalchemy.util import OrderedSet
+
+from .. import util
+from ..ddl._autogen import is_index_sig
+from ..ddl._autogen import is_uq_sig
+from ..operations import ops
+from ..util import sqla_compat
+
+if TYPE_CHECKING:
+    from typing import Literal
+
+    from sqlalchemy.engine.reflection import Inspector
+    from sqlalchemy.sql.elements import quoted_name
+    from sqlalchemy.sql.elements import TextClause
+    from sqlalchemy.sql.schema import Column
+    from sqlalchemy.sql.schema import Table
+
+    from alembic.autogenerate.api import AutogenContext
+    from alembic.ddl.impl import DefaultImpl
+    from alembic.operations.ops import AlterColumnOp
+    from alembic.operations.ops import MigrationScript
+    from alembic.operations.ops import ModifyTableOps
+    from alembic.operations.ops import UpgradeOps
+    from ..ddl._autogen import _constraint_sig
+
+
+log = logging.getLogger(__name__)
+
+
+def _populate_migration_script(
+    autogen_context: AutogenContext, migration_script: MigrationScript
+) -> None:
+    upgrade_ops = migration_script.upgrade_ops_list[-1]
+    downgrade_ops = migration_script.downgrade_ops_list[-1]
+
+    _produce_net_changes(autogen_context, upgrade_ops)
+    upgrade_ops.reverse_into(downgrade_ops)
+
+
+comparators = util.Dispatcher(uselist=True)
+
+
+def _produce_net_changes(
+    autogen_context: AutogenContext, upgrade_ops: UpgradeOps
+) -> None:
+    connection = autogen_context.connection
+    assert connection is not None
+    include_schemas = autogen_context.opts.get("include_schemas", False)
+
+    inspector: Inspector = inspect(connection)
+
+    default_schema = connection.dialect.default_schema_name
+    schemas: Set[Optional[str]]
+    if include_schemas:
+        schemas = set(inspector.get_schema_names())
+        # replace default schema name with None
+        schemas.discard("information_schema")
+        # replace the "default" schema with None
+        schemas.discard(default_schema)
+        schemas.add(None)
+    else:
+        schemas = {None}
+
+    schemas = {
+        s for s in schemas if autogen_context.run_name_filters(s, "schema", {})
+    }
+
+    assert autogen_context.dialect is not None
+    comparators.dispatch("schema", autogen_context.dialect.name)(
+        autogen_context, upgrade_ops, schemas
+    )
+
+
+@comparators.dispatch_for("schema")
+def _autogen_for_tables(
+    autogen_context: AutogenContext,
+    upgrade_ops: UpgradeOps,
+    schemas: Union[Set[None], Set[Optional[str]]],
+) -> None:
+    inspector = autogen_context.inspector
+
+    conn_table_names: Set[Tuple[Optional[str], str]] = set()
+
+    version_table_schema = (
+        autogen_context.migration_context.version_table_schema
+    )
+    version_table = autogen_context.migration_context.version_table
+
+    for schema_name in schemas:
+        tables = set(inspector.get_table_names(schema=schema_name))
+        if schema_name == version_table_schema:
+            tables = tables.difference(
+                [autogen_context.migration_context.version_table]
+            )
+
+        conn_table_names.update(
+            (schema_name, tname)
+            for tname in tables
+            if autogen_context.run_name_filters(
+                tname, "table", {"schema_name": schema_name}
+            )
+        )
+
+    metadata_table_names = OrderedSet(
+        [(table.schema, table.name) for table in autogen_context.sorted_tables]
+    ).difference([(version_table_schema, version_table)])
+
+    _compare_tables(
+        conn_table_names,
+        metadata_table_names,
+        inspector,
+        upgrade_ops,
+        autogen_context,
+    )
+
+
+def _compare_tables(
+    conn_table_names: set,
+    metadata_table_names: set,
+    inspector: Inspector,
+    upgrade_ops: UpgradeOps,
+    autogen_context: AutogenContext,
+) -> None:
+    default_schema = inspector.bind.dialect.default_schema_name
+
+    # tables coming from the connection will not have "schema"
+    # set if it matches default_schema_name; so we need a list
+    # of table names from local metadata that also have "None" if schema
+    # == default_schema_name.  Most setups will be like this anyway but
+    # some are not (see #170)
+    metadata_table_names_no_dflt_schema = OrderedSet(
+        [
+            (schema if schema != default_schema else None, tname)
+            for schema, tname in metadata_table_names
+        ]
+    )
+
+    # to adjust for the MetaData collection storing the tables either
+    # as "schemaname.tablename" or just "tablename", create a new lookup
+    # which will match the "non-default-schema" keys to the Table object.
+    tname_to_table = {
+        no_dflt_schema: autogen_context.table_key_to_table[
+            sa_schema._get_table_key(tname, schema)
+        ]
+        for no_dflt_schema, (schema, tname) in zip(
+            metadata_table_names_no_dflt_schema, metadata_table_names
+        )
+    }
+    metadata_table_names = metadata_table_names_no_dflt_schema
+
+    for s, tname in metadata_table_names.difference(conn_table_names):
+        name = "%s.%s" % (s, tname) if s else tname
+        metadata_table = tname_to_table[(s, tname)]
+        if autogen_context.run_object_filters(
+            metadata_table, tname, "table", False, None
+        ):
+            upgrade_ops.ops.append(
+                ops.CreateTableOp.from_table(metadata_table)
+            )
+            log.info("Detected added table %r", name)
+            modify_table_ops = ops.ModifyTableOps(tname, [], schema=s)
+
+            comparators.dispatch("table")(
+                autogen_context,
+                modify_table_ops,
+                s,
+                tname,
+                None,
+                metadata_table,
+            )
+            if not modify_table_ops.is_empty():
+                upgrade_ops.ops.append(modify_table_ops)
+
+    removal_metadata = sa_schema.MetaData()
+    for s, tname in conn_table_names.difference(metadata_table_names):
+        name = sa_schema._get_table_key(tname, s)
+        exists = name in removal_metadata.tables
+        t = sa_schema.Table(tname, removal_metadata, schema=s)
+
+        if not exists:
+            event.listen(
+                t,
+                "column_reflect",
+                # fmt: off
+                autogen_context.migration_context.impl.
+                _compat_autogen_column_reflect
+                (inspector),
+                # fmt: on
+            )
+            inspector.reflect_table(t, include_columns=None)
+        if autogen_context.run_object_filters(t, tname, "table", True, None):
+            modify_table_ops = ops.ModifyTableOps(tname, [], schema=s)
+
+            comparators.dispatch("table")(
+                autogen_context, modify_table_ops, s, tname, t, None
+            )
+            if not modify_table_ops.is_empty():
+                upgrade_ops.ops.append(modify_table_ops)
+
+            upgrade_ops.ops.append(ops.DropTableOp.from_table(t))
+            log.info("Detected removed table %r", name)
+
+    existing_tables = conn_table_names.intersection(metadata_table_names)
+
+    existing_metadata = sa_schema.MetaData()
+    conn_column_info = {}
+    for s, tname in existing_tables:
+        name = sa_schema._get_table_key(tname, s)
+        exists = name in existing_metadata.tables
+        t = sa_schema.Table(tname, existing_metadata, schema=s)
+        if not exists:
+            event.listen(
+                t,
+                "column_reflect",
+                # fmt: off
+                autogen_context.migration_context.impl.
+                _compat_autogen_column_reflect(inspector),
+                # fmt: on
+            )
+            inspector.reflect_table(t, include_columns=None)
+        conn_column_info[(s, tname)] = t
+
+    for s, tname in sorted(existing_tables, key=lambda x: (x[0] or "", x[1])):
+        s = s or None
+        name = "%s.%s" % (s, tname) if s else tname
+        metadata_table = tname_to_table[(s, tname)]
+        conn_table = existing_metadata.tables[name]
+
+        if autogen_context.run_object_filters(
+            metadata_table, tname, "table", False, conn_table
+        ):
+            modify_table_ops = ops.ModifyTableOps(tname, [], schema=s)
+            with _compare_columns(
+                s,
+                tname,
+                conn_table,
+                metadata_table,
+                modify_table_ops,
+                autogen_context,
+                inspector,
+            ):
+                comparators.dispatch("table")(
+                    autogen_context,
+                    modify_table_ops,
+                    s,
+                    tname,
+                    conn_table,
+                    metadata_table,
+                )
+
+            if not modify_table_ops.is_empty():
+                upgrade_ops.ops.append(modify_table_ops)
+
+
+_IndexColumnSortingOps: Mapping[str, Any] = util.immutabledict(
+    {
+        "asc": expression.asc,
+        "desc": expression.desc,
+        "nulls_first": expression.nullsfirst,
+        "nulls_last": expression.nullslast,
+        "nullsfirst": expression.nullsfirst,  # 1_3 name
+        "nullslast": expression.nullslast,  # 1_3 name
+    }
+)
+
+
+def _make_index(
+    impl: DefaultImpl, params: Dict[str, Any], conn_table: Table
+) -> Optional[Index]:
+    exprs: list[Union[Column[Any], TextClause]] = []
+    sorting = params.get("column_sorting")
+
+    for num, col_name in enumerate(params["column_names"]):
+        item: Union[Column[Any], TextClause]
+        if col_name is None:
+            assert "expressions" in params
+            name = params["expressions"][num]
+            item = text(name)
+        else:
+            name = col_name
+            item = conn_table.c[col_name]
+        if sorting and name in sorting:
+            for operator in sorting[name]:
+                if operator in _IndexColumnSortingOps:
+                    item = _IndexColumnSortingOps[operator](item)
+        exprs.append(item)
+    ix = sa_schema.Index(
+        params["name"],
+        *exprs,
+        unique=params["unique"],
+        _table=conn_table,
+        **impl.adjust_reflected_dialect_options(params, "index"),
+    )
+    if "duplicates_constraint" in params:
+        ix.info["duplicates_constraint"] = params["duplicates_constraint"]
+    return ix
+
+
+def _make_unique_constraint(
+    impl: DefaultImpl, params: Dict[str, Any], conn_table: Table
+) -> UniqueConstraint:
+    uq = sa_schema.UniqueConstraint(
+        *[conn_table.c[cname] for cname in params["column_names"]],
+        name=params["name"],
+        **impl.adjust_reflected_dialect_options(params, "unique_constraint"),
+    )
+    if "duplicates_index" in params:
+        uq.info["duplicates_index"] = params["duplicates_index"]
+
+    return uq
+
+
+def _make_foreign_key(
+    params: Dict[str, Any], conn_table: Table
+) -> ForeignKeyConstraint:
+    tname = params["referred_table"]
+    if params["referred_schema"]:
+        tname = "%s.%s" % (params["referred_schema"], tname)
+
+    options = params.get("options", {})
+
+    const = sa_schema.ForeignKeyConstraint(
+        [conn_table.c[cname] for cname in params["constrained_columns"]],
+        ["%s.%s" % (tname, n) for n in params["referred_columns"]],
+        onupdate=options.get("onupdate"),
+        ondelete=options.get("ondelete"),
+        deferrable=options.get("deferrable"),
+        initially=options.get("initially"),
+        name=params["name"],
+    )
+    # needed by 0.7
+    conn_table.append_constraint(const)
+    return const
+
+
+@contextlib.contextmanager
+def _compare_columns(
+    schema: Optional[str],
+    tname: Union[quoted_name, str],
+    conn_table: Table,
+    metadata_table: Table,
+    modify_table_ops: ModifyTableOps,
+    autogen_context: AutogenContext,
+    inspector: Inspector,
+) -> Iterator[None]:
+    name = "%s.%s" % (schema, tname) if schema else tname
+    metadata_col_names = OrderedSet(
+        c.name for c in metadata_table.c if not c.system
+    )
+    metadata_cols_by_name = {
+        c.name: c for c in metadata_table.c if not c.system
+    }
+
+    conn_col_names = {
+        c.name: c
+        for c in conn_table.c
+        if autogen_context.run_name_filters(
+            c.name, "column", {"table_name": tname, "schema_name": schema}
+        )
+    }
+
+    for cname in metadata_col_names.difference(conn_col_names):
+        if autogen_context.run_object_filters(
+            metadata_cols_by_name[cname], cname, "column", False, None
+        ):
+            modify_table_ops.ops.append(
+                ops.AddColumnOp.from_column_and_tablename(
+                    schema, tname, metadata_cols_by_name[cname]
+                )
+            )
+            log.info("Detected added column '%s.%s'", name, cname)
+
+    for colname in metadata_col_names.intersection(conn_col_names):
+        metadata_col = metadata_cols_by_name[colname]
+        conn_col = conn_table.c[colname]
+        if not autogen_context.run_object_filters(
+            metadata_col, colname, "column", False, conn_col
+        ):
+            continue
+        alter_column_op = ops.AlterColumnOp(tname, colname, schema=schema)
+
+        comparators.dispatch("column")(
+            autogen_context,
+            alter_column_op,
+            schema,
+            tname,
+            colname,
+            conn_col,
+            metadata_col,
+        )
+
+        if alter_column_op.has_changes():
+            modify_table_ops.ops.append(alter_column_op)
+
+    yield
+
+    for cname in set(conn_col_names).difference(metadata_col_names):
+        if autogen_context.run_object_filters(
+            conn_table.c[cname], cname, "column", True, None
+        ):
+            modify_table_ops.ops.append(
+                ops.DropColumnOp.from_column_and_tablename(
+                    schema, tname, conn_table.c[cname]
+                )
+            )
+            log.info("Detected removed column '%s.%s'", name, cname)
+
+
+_C = TypeVar("_C", bound=Union[UniqueConstraint, ForeignKeyConstraint, Index])
+
+
+@comparators.dispatch_for("table")
+def _compare_indexes_and_uniques(
+    autogen_context: AutogenContext,
+    modify_ops: ModifyTableOps,
+    schema: Optional[str],
+    tname: Union[quoted_name, str],
+    conn_table: Optional[Table],
+    metadata_table: Optional[Table],
+) -> None:
+    inspector = autogen_context.inspector
+    is_create_table = conn_table is None
+    is_drop_table = metadata_table is None
+    impl = autogen_context.migration_context.impl
+
+    # 1a. get raw indexes and unique constraints from metadata ...
+    if metadata_table is not None:
+        metadata_unique_constraints = {
+            uq
+            for uq in metadata_table.constraints
+            if isinstance(uq, sa_schema.UniqueConstraint)
+        }
+        metadata_indexes = set(metadata_table.indexes)
+    else:
+        metadata_unique_constraints = set()
+        metadata_indexes = set()
+
+    conn_uniques = conn_indexes = frozenset()  # type:ignore[var-annotated]
+
+    supports_unique_constraints = False
+
+    unique_constraints_duplicate_unique_indexes = False
+
+    if conn_table is not None:
+        # 1b. ... and from connection, if the table exists
+        try:
+            conn_uniques = inspector.get_unique_constraints(  # type:ignore[assignment] # noqa
+                tname, schema=schema
+            )
+            supports_unique_constraints = True
+        except NotImplementedError:
+            pass
+        except TypeError:
+            # number of arguments is off for the base
+            # method in SQLAlchemy due to the cache decorator
+            # not being present
+            pass
+        else:
+            conn_uniques = [  # type:ignore[assignment]
+                uq
+                for uq in conn_uniques
+                if autogen_context.run_name_filters(
+                    uq["name"],
+                    "unique_constraint",
+                    {"table_name": tname, "schema_name": schema},
+                )
+            ]
+            for uq in conn_uniques:
+                if uq.get("duplicates_index"):
+                    unique_constraints_duplicate_unique_indexes = True
+        try:
+            conn_indexes = inspector.get_indexes(  # type:ignore[assignment]
+                tname, schema=schema
+            )
+        except NotImplementedError:
+            pass
+        else:
+            conn_indexes = [  # type:ignore[assignment]
+                ix
+                for ix in conn_indexes
+                if autogen_context.run_name_filters(
+                    ix["name"],
+                    "index",
+                    {"table_name": tname, "schema_name": schema},
+                )
+            ]
+
+        # 2. convert conn-level objects from raw inspector records
+        # into schema objects
+        if is_drop_table:
+            # for DROP TABLE uniques are inline, don't need them
+            conn_uniques = set()  # type:ignore[assignment]
+        else:
+            conn_uniques = {  # type:ignore[assignment]
+                _make_unique_constraint(impl, uq_def, conn_table)
+                for uq_def in conn_uniques
+            }
+
+        conn_indexes = {  # type:ignore[assignment]
+            index
+            for index in (
+                _make_index(impl, ix, conn_table) for ix in conn_indexes
+            )
+            if index is not None
+        }
+
+    # 2a. if the dialect dupes unique indexes as unique constraints
+    # (mysql and oracle), correct for that
+
+    if unique_constraints_duplicate_unique_indexes:
+        _correct_for_uq_duplicates_uix(
+            conn_uniques,
+            conn_indexes,
+            metadata_unique_constraints,
+            metadata_indexes,
+            autogen_context.dialect,
+            impl,
+        )
+
+    # 3. give the dialect a chance to omit indexes and constraints that
+    # we know are either added implicitly by the DB or that the DB
+    # can't accurately report on
+    impl.correct_for_autogen_constraints(
+        conn_uniques,  # type: ignore[arg-type]
+        conn_indexes,  # type: ignore[arg-type]
+        metadata_unique_constraints,
+        metadata_indexes,
+    )
+
+    # 4. organize the constraints into "signature" collections, the
+    # _constraint_sig() objects provide a consistent facade over both
+    # Index and UniqueConstraint so we can easily work with them
+    # interchangeably
+    metadata_unique_constraints_sig = {
+        impl._create_metadata_constraint_sig(uq)
+        for uq in metadata_unique_constraints
+    }
+
+    metadata_indexes_sig = {
+        impl._create_metadata_constraint_sig(ix) for ix in metadata_indexes
+    }
+
+    conn_unique_constraints = {
+        impl._create_reflected_constraint_sig(uq) for uq in conn_uniques
+    }
+
+    conn_indexes_sig = {
+        impl._create_reflected_constraint_sig(ix) for ix in conn_indexes
+    }
+
+    # 5. index things by name, for those objects that have names
+    metadata_names = {
+        cast(str, c.md_name_to_sql_name(autogen_context)): c
+        for c in metadata_unique_constraints_sig.union(metadata_indexes_sig)
+        if c.is_named
+    }
+
+    conn_uniques_by_name: Dict[sqla_compat._ConstraintName, _constraint_sig]
+    conn_indexes_by_name: Dict[sqla_compat._ConstraintName, _constraint_sig]
+
+    conn_uniques_by_name = {c.name: c for c in conn_unique_constraints}
+    conn_indexes_by_name = {c.name: c for c in conn_indexes_sig}
+    conn_names = {
+        c.name: c
+        for c in conn_unique_constraints.union(conn_indexes_sig)
+        if sqla_compat.constraint_name_string(c.name)
+    }
+
+    doubled_constraints = {
+        name: (conn_uniques_by_name[name], conn_indexes_by_name[name])
+        for name in set(conn_uniques_by_name).intersection(
+            conn_indexes_by_name
+        )
+    }
+
+    # 6. index things by "column signature", to help with unnamed unique
+    # constraints.
+    conn_uniques_by_sig = {uq.unnamed: uq for uq in conn_unique_constraints}
+    metadata_uniques_by_sig = {
+        uq.unnamed: uq for uq in metadata_unique_constraints_sig
+    }
+    unnamed_metadata_uniques = {
+        uq.unnamed: uq
+        for uq in metadata_unique_constraints_sig
+        if not sqla_compat._constraint_is_named(
+            uq.const, autogen_context.dialect
+        )
+    }
+
+    # assumptions:
+    # 1. a unique constraint or an index from the connection *always*
+    #    has a name.
+    # 2. an index on the metadata side *always* has a name.
+    # 3. a unique constraint on the metadata side *might* have a name.
+    # 4. The backend may double up indexes as unique constraints and
+    #    vice versa (e.g. MySQL, Postgresql)
+
+    def obj_added(obj: _constraint_sig):
+        if is_index_sig(obj):
+            if autogen_context.run_object_filters(
+                obj.const, obj.name, "index", False, None
+            ):
+                modify_ops.ops.append(ops.CreateIndexOp.from_index(obj.const))
+                log.info(
+                    "Detected added index '%r' on '%s'",
+                    obj.name,
+                    obj.column_names,
+                )
+        elif is_uq_sig(obj):
+            if not supports_unique_constraints:
+                # can't report unique indexes as added if we don't
+                # detect them
+                return
+            if is_create_table or is_drop_table:
+                # unique constraints are created inline with table defs
+                return
+            if autogen_context.run_object_filters(
+                obj.const, obj.name, "unique_constraint", False, None
+            ):
+                modify_ops.ops.append(
+                    ops.AddConstraintOp.from_constraint(obj.const)
+                )
+                log.info(
+                    "Detected added unique constraint %r on '%s'",
+                    obj.name,
+                    obj.column_names,
+                )
+        else:
+            assert False
+
+    def obj_removed(obj: _constraint_sig):
+        if is_index_sig(obj):
+            if obj.is_unique and not supports_unique_constraints:
+                # many databases double up unique constraints
+                # as unique indexes.  without that list we can't
+                # be sure what we're doing here
+                return
+
+            if autogen_context.run_object_filters(
+                obj.const, obj.name, "index", True, None
+            ):
+                modify_ops.ops.append(ops.DropIndexOp.from_index(obj.const))
+                log.info("Detected removed index %r on %r", obj.name, tname)
+        elif is_uq_sig(obj):
+            if is_create_table or is_drop_table:
+                # if the whole table is being dropped, we don't need to
+                # consider unique constraint separately
+                return
+            if autogen_context.run_object_filters(
+                obj.const, obj.name, "unique_constraint", True, None
+            ):
+                modify_ops.ops.append(
+                    ops.DropConstraintOp.from_constraint(obj.const)
+                )
+                log.info(
+                    "Detected removed unique constraint %r on %r",
+                    obj.name,
+                    tname,
+                )
+        else:
+            assert False
+
+    def obj_changed(
+        old: _constraint_sig,
+        new: _constraint_sig,
+        msg: str,
+    ):
+        if is_index_sig(old):
+            assert is_index_sig(new)
+
+            if autogen_context.run_object_filters(
+                new.const, new.name, "index", False, old.const
+            ):
+                log.info(
+                    "Detected changed index %r on %r: %s", old.name, tname, msg
+                )
+                modify_ops.ops.append(ops.DropIndexOp.from_index(old.const))
+                modify_ops.ops.append(ops.CreateIndexOp.from_index(new.const))
+        elif is_uq_sig(old):
+            assert is_uq_sig(new)
+
+            if autogen_context.run_object_filters(
+                new.const, new.name, "unique_constraint", False, old.const
+            ):
+                log.info(
+                    "Detected changed unique constraint %r on %r: %s",
+                    old.name,
+                    tname,
+                    msg,
+                )
+                modify_ops.ops.append(
+                    ops.DropConstraintOp.from_constraint(old.const)
+                )
+                modify_ops.ops.append(
+                    ops.AddConstraintOp.from_constraint(new.const)
+                )
+        else:
+            assert False
+
+    for removed_name in sorted(set(conn_names).difference(metadata_names)):
+        conn_obj = conn_names[removed_name]
+        if (
+            is_uq_sig(conn_obj)
+            and conn_obj.unnamed in unnamed_metadata_uniques
+        ):
+            continue
+        elif removed_name in doubled_constraints:
+            conn_uq, conn_idx = doubled_constraints[removed_name]
+            if (
+                all(
+                    conn_idx.unnamed != meta_idx.unnamed
+                    for meta_idx in metadata_indexes_sig
+                )
+                and conn_uq.unnamed not in metadata_uniques_by_sig
+            ):
+                obj_removed(conn_uq)
+                obj_removed(conn_idx)
+        else:
+            obj_removed(conn_obj)
+
+    for existing_name in sorted(set(metadata_names).intersection(conn_names)):
+        metadata_obj = metadata_names[existing_name]
+
+        if existing_name in doubled_constraints:
+            conn_uq, conn_idx = doubled_constraints[existing_name]
+            if is_index_sig(metadata_obj):
+                conn_obj = conn_idx
+            else:
+                conn_obj = conn_uq
+        else:
+            conn_obj = conn_names[existing_name]
+
+        if type(conn_obj) != type(metadata_obj):
+            obj_removed(conn_obj)
+            obj_added(metadata_obj)
+        else:
+            comparison = metadata_obj.compare_to_reflected(conn_obj)
+
+            if comparison.is_different:
+                # constraint are different
+                obj_changed(conn_obj, metadata_obj, comparison.message)
+            elif comparison.is_skip:
+                # constraint cannot be compared, skip them
+                thing = (
+                    "index" if is_index_sig(conn_obj) else "unique constraint"
+                )
+                log.info(
+                    "Cannot compare %s %r, assuming equal and skipping. %s",
+                    thing,
+                    conn_obj.name,
+                    comparison.message,
+                )
+            else:
+                # constraint are equal
+                assert comparison.is_equal
+
+    for added_name in sorted(set(metadata_names).difference(conn_names)):
+        obj = metadata_names[added_name]
+        obj_added(obj)
+
+    for uq_sig in unnamed_metadata_uniques:
+        if uq_sig not in conn_uniques_by_sig:
+            obj_added(unnamed_metadata_uniques[uq_sig])
+
+
+def _correct_for_uq_duplicates_uix(
+    conn_unique_constraints,
+    conn_indexes,
+    metadata_unique_constraints,
+    metadata_indexes,
+    dialect,
+    impl,
+):
+    # dedupe unique indexes vs. constraints, since MySQL / Oracle
+    # doesn't really have unique constraints as a separate construct.
+    # but look in the metadata and try to maintain constructs
+    # that already seem to be defined one way or the other
+    # on that side.  This logic was formerly local to MySQL dialect,
+    # generalized to Oracle and others. See #276
+
+    # resolve final rendered name for unique constraints defined in the
+    # metadata.   this includes truncation of long names.  naming convention
+    # names currently should already be set as cons.name, however leave this
+    # to the sqla_compat to decide.
+    metadata_cons_names = [
+        (sqla_compat._get_constraint_final_name(cons, dialect), cons)
+        for cons in metadata_unique_constraints
+    ]
+
+    metadata_uq_names = {
+        name for name, cons in metadata_cons_names if name is not None
+    }
+
+    unnamed_metadata_uqs = {
+        impl._create_metadata_constraint_sig(cons).unnamed
+        for name, cons in metadata_cons_names
+        if name is None
+    }
+
+    metadata_ix_names = {
+        sqla_compat._get_constraint_final_name(cons, dialect)
+        for cons in metadata_indexes
+        if cons.unique
+    }
+
+    # for reflection side, names are in their final database form
+    # already since they're from the database
+    conn_ix_names = {cons.name: cons for cons in conn_indexes if cons.unique}
+
+    uqs_dupe_indexes = {
+        cons.name: cons
+        for cons in conn_unique_constraints
+        if cons.info["duplicates_index"]
+    }
+
+    for overlap in uqs_dupe_indexes:
+        if overlap not in metadata_uq_names:
+            if (
+                impl._create_reflected_constraint_sig(
+                    uqs_dupe_indexes[overlap]
+                ).unnamed
+                not in unnamed_metadata_uqs
+            ):
+                conn_unique_constraints.discard(uqs_dupe_indexes[overlap])
+        elif overlap not in metadata_ix_names:
+            conn_indexes.discard(conn_ix_names[overlap])
+
+
+@comparators.dispatch_for("column")
+def _compare_nullable(
+    autogen_context: AutogenContext,
+    alter_column_op: AlterColumnOp,
+    schema: Optional[str],
+    tname: Union[quoted_name, str],
+    cname: Union[quoted_name, str],
+    conn_col: Column[Any],
+    metadata_col: Column[Any],
+) -> None:
+    metadata_col_nullable = metadata_col.nullable
+    conn_col_nullable = conn_col.nullable
+    alter_column_op.existing_nullable = conn_col_nullable
+
+    if conn_col_nullable is not metadata_col_nullable:
+        if (
+            sqla_compat._server_default_is_computed(
+                metadata_col.server_default, conn_col.server_default
+            )
+            and sqla_compat._nullability_might_be_unset(metadata_col)
+            or (
+                sqla_compat._server_default_is_identity(
+                    metadata_col.server_default, conn_col.server_default
+                )
+            )
+        ):
+            log.info(
+                "Ignoring nullable change on identity column '%s.%s'",
+                tname,
+                cname,
+            )
+        else:
+            alter_column_op.modify_nullable = metadata_col_nullable
+            log.info(
+                "Detected %s on column '%s.%s'",
+                "NULL" if metadata_col_nullable else "NOT NULL",
+                tname,
+                cname,
+            )
+
+
+@comparators.dispatch_for("column")
+def _setup_autoincrement(
+    autogen_context: AutogenContext,
+    alter_column_op: AlterColumnOp,
+    schema: Optional[str],
+    tname: Union[quoted_name, str],
+    cname: quoted_name,
+    conn_col: Column[Any],
+    metadata_col: Column[Any],
+) -> None:
+    if metadata_col.table._autoincrement_column is metadata_col:
+        alter_column_op.kw["autoincrement"] = True
+    elif metadata_col.autoincrement is True:
+        alter_column_op.kw["autoincrement"] = True
+    elif metadata_col.autoincrement is False:
+        alter_column_op.kw["autoincrement"] = False
+
+
+@comparators.dispatch_for("column")
+def _compare_type(
+    autogen_context: AutogenContext,
+    alter_column_op: AlterColumnOp,
+    schema: Optional[str],
+    tname: Union[quoted_name, str],
+    cname: Union[quoted_name, str],
+    conn_col: Column[Any],
+    metadata_col: Column[Any],
+) -> None:
+    conn_type = conn_col.type
+    alter_column_op.existing_type = conn_type
+    metadata_type = metadata_col.type
+    if conn_type._type_affinity is sqltypes.NullType:
+        log.info(
+            "Couldn't determine database type " "for column '%s.%s'",
+            tname,
+            cname,
+        )
+        return
+    if metadata_type._type_affinity is sqltypes.NullType:
+        log.info(
+            "Column '%s.%s' has no type within " "the model; can't compare",
+            tname,
+            cname,
+        )
+        return
+
+    isdiff = autogen_context.migration_context._compare_type(
+        conn_col, metadata_col
+    )
+
+    if isdiff:
+        alter_column_op.modify_type = metadata_type
+        log.info(
+            "Detected type change from %r to %r on '%s.%s'",
+            conn_type,
+            metadata_type,
+            tname,
+            cname,
+        )
+
+
+def _render_server_default_for_compare(
+    metadata_default: Optional[Any], autogen_context: AutogenContext
+) -> Optional[str]:
+    if isinstance(metadata_default, sa_schema.DefaultClause):
+        if isinstance(metadata_default.arg, str):
+            metadata_default = metadata_default.arg
+        else:
+            metadata_default = str(
+                metadata_default.arg.compile(
+                    dialect=autogen_context.dialect,
+                    compile_kwargs={"literal_binds": True},
+                )
+            )
+    if isinstance(metadata_default, str):
+        return metadata_default
+    else:
+        return None
+
+
+def _normalize_computed_default(sqltext: str) -> str:
+    """we want to warn if a computed sql expression has changed.  however
+    we don't want false positives and the warning is not that critical.
+    so filter out most forms of variability from the SQL text.
+
+    """
+
+    return re.sub(r"[ \(\)'\"`\[\]\t\r\n]", "", sqltext).lower()
+
+
+def _compare_computed_default(
+    autogen_context: AutogenContext,
+    alter_column_op: AlterColumnOp,
+    schema: Optional[str],
+    tname: str,
+    cname: str,
+    conn_col: Column[Any],
+    metadata_col: Column[Any],
+) -> None:
+    rendered_metadata_default = str(
+        cast(sa_schema.Computed, metadata_col.server_default).sqltext.compile(
+            dialect=autogen_context.dialect,
+            compile_kwargs={"literal_binds": True},
+        )
+    )
+
+    # since we cannot change computed columns, we do only a crude comparison
+    # here where we try to eliminate syntactical differences in order to
+    # get a minimal comparison just to emit a warning.
+
+    rendered_metadata_default = _normalize_computed_default(
+        rendered_metadata_default
+    )
+
+    if isinstance(conn_col.server_default, sa_schema.Computed):
+        rendered_conn_default = str(
+            conn_col.server_default.sqltext.compile(
+                dialect=autogen_context.dialect,
+                compile_kwargs={"literal_binds": True},
+            )
+        )
+        if rendered_conn_default is None:
+            rendered_conn_default = ""
+        else:
+            rendered_conn_default = _normalize_computed_default(
+                rendered_conn_default
+            )
+    else:
+        rendered_conn_default = ""
+
+    if rendered_metadata_default != rendered_conn_default:
+        _warn_computed_not_supported(tname, cname)
+
+
+def _warn_computed_not_supported(tname: str, cname: str) -> None:
+    util.warn("Computed default on %s.%s cannot be modified" % (tname, cname))
+
+
+def _compare_identity_default(
+    autogen_context,
+    alter_column_op,
+    schema,
+    tname,
+    cname,
+    conn_col,
+    metadata_col,
+):
+    impl = autogen_context.migration_context.impl
+    diff, ignored_attr, is_alter = impl._compare_identity_default(
+        metadata_col.server_default, conn_col.server_default
+    )
+
+    return diff, is_alter
+
+
+@comparators.dispatch_for("column")
+def _compare_server_default(
+    autogen_context: AutogenContext,
+    alter_column_op: AlterColumnOp,
+    schema: Optional[str],
+    tname: Union[quoted_name, str],
+    cname: Union[quoted_name, str],
+    conn_col: Column[Any],
+    metadata_col: Column[Any],
+) -> Optional[bool]:
+    metadata_default = metadata_col.server_default
+    conn_col_default = conn_col.server_default
+    if conn_col_default is None and metadata_default is None:
+        return False
+
+    if sqla_compat._server_default_is_computed(metadata_default):
+        return _compare_computed_default(  # type:ignore[func-returns-value]
+            autogen_context,
+            alter_column_op,
+            schema,
+            tname,
+            cname,
+            conn_col,
+            metadata_col,
+        )
+    if sqla_compat._server_default_is_computed(conn_col_default):
+        _warn_computed_not_supported(tname, cname)
+        return False
+
+    if sqla_compat._server_default_is_identity(
+        metadata_default, conn_col_default
+    ):
+        alter_column_op.existing_server_default = conn_col_default
+        diff, is_alter = _compare_identity_default(
+            autogen_context,
+            alter_column_op,
+            schema,
+            tname,
+            cname,
+            conn_col,
+            metadata_col,
+        )
+        if is_alter:
+            alter_column_op.modify_server_default = metadata_default
+            if diff:
+                log.info(
+                    "Detected server default on column '%s.%s': "
+                    "identity options attributes %s",
+                    tname,
+                    cname,
+                    sorted(diff),
+                )
+    else:
+        rendered_metadata_default = _render_server_default_for_compare(
+            metadata_default, autogen_context
+        )
+
+        rendered_conn_default = (
+            cast(Any, conn_col_default).arg.text if conn_col_default else None
+        )
+
+        alter_column_op.existing_server_default = conn_col_default
+
+        is_diff = autogen_context.migration_context._compare_server_default(
+            conn_col,
+            metadata_col,
+            rendered_metadata_default,
+            rendered_conn_default,
+        )
+        if is_diff:
+            alter_column_op.modify_server_default = metadata_default
+            log.info("Detected server default on column '%s.%s'", tname, cname)
+
+    return None
+
+
+@comparators.dispatch_for("column")
+def _compare_column_comment(
+    autogen_context: AutogenContext,
+    alter_column_op: AlterColumnOp,
+    schema: Optional[str],
+    tname: Union[quoted_name, str],
+    cname: quoted_name,
+    conn_col: Column[Any],
+    metadata_col: Column[Any],
+) -> Optional[Literal[False]]:
+    assert autogen_context.dialect is not None
+    if not autogen_context.dialect.supports_comments:
+        return None
+
+    metadata_comment = metadata_col.comment
+    conn_col_comment = conn_col.comment
+    if conn_col_comment is None and metadata_comment is None:
+        return False
+
+    alter_column_op.existing_comment = conn_col_comment
+
+    if conn_col_comment != metadata_comment:
+        alter_column_op.modify_comment = metadata_comment
+        log.info("Detected column comment '%s.%s'", tname, cname)
+
+    return None
+
+
+@comparators.dispatch_for("table")
+def _compare_foreign_keys(
+    autogen_context: AutogenContext,
+    modify_table_ops: ModifyTableOps,
+    schema: Optional[str],
+    tname: Union[quoted_name, str],
+    conn_table: Table,
+    metadata_table: Table,
+) -> None:
+    # if we're doing CREATE TABLE, all FKs are created
+    # inline within the table def
+    if conn_table is None or metadata_table is None:
+        return
+
+    inspector = autogen_context.inspector
+    metadata_fks = {
+        fk
+        for fk in metadata_table.constraints
+        if isinstance(fk, sa_schema.ForeignKeyConstraint)
+    }
+
+    conn_fks_list = [
+        fk
+        for fk in inspector.get_foreign_keys(tname, schema=schema)
+        if autogen_context.run_name_filters(
+            fk["name"],
+            "foreign_key_constraint",
+            {"table_name": tname, "schema_name": schema},
+        )
+    ]
+
+    conn_fks = {
+        _make_foreign_key(const, conn_table)  # type: ignore[arg-type]
+        for const in conn_fks_list
+    }
+
+    impl = autogen_context.migration_context.impl
+
+    # give the dialect a chance to correct the FKs to match more
+    # closely
+    autogen_context.migration_context.impl.correct_for_autogen_foreignkeys(
+        conn_fks, metadata_fks
+    )
+
+    metadata_fks_sig = {
+        impl._create_metadata_constraint_sig(fk) for fk in metadata_fks
+    }
+
+    conn_fks_sig = {
+        impl._create_reflected_constraint_sig(fk) for fk in conn_fks
+    }
+
+    # check if reflected FKs include options, indicating the backend
+    # can reflect FK options
+    if conn_fks_list and "options" in conn_fks_list[0]:
+        conn_fks_by_sig = {c.unnamed: c for c in conn_fks_sig}
+        metadata_fks_by_sig = {c.unnamed: c for c in metadata_fks_sig}
+    else:
+        # otherwise compare by sig without options added
+        conn_fks_by_sig = {c.unnamed_no_options: c for c in conn_fks_sig}
+        metadata_fks_by_sig = {
+            c.unnamed_no_options: c for c in metadata_fks_sig
+        }
+
+    metadata_fks_by_name = {
+        c.name: c for c in metadata_fks_sig if c.name is not None
+    }
+    conn_fks_by_name = {c.name: c for c in conn_fks_sig if c.name is not None}
+
+    def _add_fk(obj, compare_to):
+        if autogen_context.run_object_filters(
+            obj.const, obj.name, "foreign_key_constraint", False, compare_to
+        ):
+            modify_table_ops.ops.append(
+                ops.CreateForeignKeyOp.from_constraint(const.const)  # type: ignore[has-type]  # noqa: E501
+            )
+
+            log.info(
+                "Detected added foreign key (%s)(%s) on table %s%s",
+                ", ".join(obj.source_columns),
+                ", ".join(obj.target_columns),
+                "%s." % obj.source_schema if obj.source_schema else "",
+                obj.source_table,
+            )
+
+    def _remove_fk(obj, compare_to):
+        if autogen_context.run_object_filters(
+            obj.const, obj.name, "foreign_key_constraint", True, compare_to
+        ):
+            modify_table_ops.ops.append(
+                ops.DropConstraintOp.from_constraint(obj.const)
+            )
+            log.info(
+                "Detected removed foreign key (%s)(%s) on table %s%s",
+                ", ".join(obj.source_columns),
+                ", ".join(obj.target_columns),
+                "%s." % obj.source_schema if obj.source_schema else "",
+                obj.source_table,
+            )
+
+    # so far it appears we don't need to do this by name at all.
+    # SQLite doesn't preserve constraint names anyway
+
+    for removed_sig in set(conn_fks_by_sig).difference(metadata_fks_by_sig):
+        const = conn_fks_by_sig[removed_sig]
+        if removed_sig not in metadata_fks_by_sig:
+            compare_to = (
+                metadata_fks_by_name[const.name].const
+                if const.name in metadata_fks_by_name
+                else None
+            )
+            _remove_fk(const, compare_to)
+
+    for added_sig in set(metadata_fks_by_sig).difference(conn_fks_by_sig):
+        const = metadata_fks_by_sig[added_sig]
+        if added_sig not in conn_fks_by_sig:
+            compare_to = (
+                conn_fks_by_name[const.name].const
+                if const.name in conn_fks_by_name
+                else None
+            )
+            _add_fk(const, compare_to)
+
+
+@comparators.dispatch_for("table")
+def _compare_table_comment(
+    autogen_context: AutogenContext,
+    modify_table_ops: ModifyTableOps,
+    schema: Optional[str],
+    tname: Union[quoted_name, str],
+    conn_table: Optional[Table],
+    metadata_table: Optional[Table],
+) -> None:
+    assert autogen_context.dialect is not None
+    if not autogen_context.dialect.supports_comments:
+        return
+
+    # if we're doing CREATE TABLE, comments will be created inline
+    # with the create_table op.
+    if conn_table is None or metadata_table is None:
+        return
+
+    if conn_table.comment is None and metadata_table.comment is None:
+        return
+
+    if metadata_table.comment is None and conn_table.comment is not None:
+        modify_table_ops.ops.append(
+            ops.DropTableCommentOp(
+                tname, existing_comment=conn_table.comment, schema=schema
+            )
+        )
+    elif metadata_table.comment != conn_table.comment:
+        modify_table_ops.ops.append(
+            ops.CreateTableCommentOp(
+                tname,
+                metadata_table.comment,
+                existing_comment=conn_table.comment,
+                schema=schema,
+            )
+        )
diff --git a/.venv/lib/python3.12/site-packages/alembic/autogenerate/render.py b/.venv/lib/python3.12/site-packages/alembic/autogenerate/render.py
new file mode 100644
index 00000000..50c51fa9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/autogenerate/render.py
@@ -0,0 +1,1125 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from __future__ import annotations
+
+from io import StringIO
+import re
+from typing import Any
+from typing import cast
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
+from mako.pygen import PythonPrinter
+from sqlalchemy import schema as sa_schema
+from sqlalchemy import sql
+from sqlalchemy import types as sqltypes
+from sqlalchemy.sql.elements import conv
+from sqlalchemy.sql.elements import Label
+from sqlalchemy.sql.elements import quoted_name
+
+from .. import util
+from ..operations import ops
+from ..util import sqla_compat
+
+if TYPE_CHECKING:
+    from typing import Literal
+
+    from sqlalchemy import Computed
+    from sqlalchemy import Identity
+    from sqlalchemy.sql.base import DialectKWArgs
+    from sqlalchemy.sql.elements import ColumnElement
+    from sqlalchemy.sql.elements import TextClause
+    from sqlalchemy.sql.schema import CheckConstraint
+    from sqlalchemy.sql.schema import Column
+    from sqlalchemy.sql.schema import Constraint
+    from sqlalchemy.sql.schema import FetchedValue
+    from sqlalchemy.sql.schema import ForeignKey
+    from sqlalchemy.sql.schema import ForeignKeyConstraint
+    from sqlalchemy.sql.schema import Index
+    from sqlalchemy.sql.schema import MetaData
+    from sqlalchemy.sql.schema import PrimaryKeyConstraint
+    from sqlalchemy.sql.schema import UniqueConstraint
+    from sqlalchemy.sql.sqltypes import ARRAY
+    from sqlalchemy.sql.type_api import TypeEngine
+
+    from alembic.autogenerate.api import AutogenContext
+    from alembic.config import Config
+    from alembic.operations.ops import MigrationScript
+    from alembic.operations.ops import ModifyTableOps
+
+
+MAX_PYTHON_ARGS = 255
+
+
+def _render_gen_name(
+    autogen_context: AutogenContext,
+    name: sqla_compat._ConstraintName,
+) -> Optional[Union[quoted_name, str, _f_name]]:
+    if isinstance(name, conv):
+        return _f_name(_alembic_autogenerate_prefix(autogen_context), name)
+    else:
+        return sqla_compat.constraint_name_or_none(name)
+
+
+def _indent(text: str) -> str:
+    text = re.compile(r"^", re.M).sub("    ", text).strip()
+    text = re.compile(r" +$", re.M).sub("", text)
+    return text
+
+
+def _render_python_into_templatevars(
+    autogen_context: AutogenContext,
+    migration_script: MigrationScript,
+    template_args: Dict[str, Union[str, Config]],
+) -> None:
+    imports = autogen_context.imports
+
+    for upgrade_ops, downgrade_ops in zip(
+        migration_script.upgrade_ops_list, migration_script.downgrade_ops_list
+    ):
+        template_args[upgrade_ops.upgrade_token] = _indent(
+            _render_cmd_body(upgrade_ops, autogen_context)
+        )
+        template_args[downgrade_ops.downgrade_token] = _indent(
+            _render_cmd_body(downgrade_ops, autogen_context)
+        )
+    template_args["imports"] = "\n".join(sorted(imports))
+
+
+default_renderers = renderers = util.Dispatcher()
+
+
+def _render_cmd_body(
+    op_container: ops.OpContainer,
+    autogen_context: AutogenContext,
+) -> str:
+    buf = StringIO()
+    printer = PythonPrinter(buf)
+
+    printer.writeline(
+        "# ### commands auto generated by Alembic - please adjust! ###"
+    )
+
+    has_lines = False
+    for op in op_container.ops:
+        lines = render_op(autogen_context, op)
+        has_lines = has_lines or bool(lines)
+
+        for line in lines:
+            printer.writeline(line)
+
+    if not has_lines:
+        printer.writeline("pass")
+
+    printer.writeline("# ### end Alembic commands ###")
+
+    return buf.getvalue()
+
+
+def render_op(
+    autogen_context: AutogenContext, op: ops.MigrateOperation
+) -> List[str]:
+    renderer = renderers.dispatch(op)
+    lines = util.to_list(renderer(autogen_context, op))
+    return lines
+
+
+def render_op_text(
+    autogen_context: AutogenContext, op: ops.MigrateOperation
+) -> str:
+    return "\n".join(render_op(autogen_context, op))
+
+
+@renderers.dispatch_for(ops.ModifyTableOps)
+def _render_modify_table(
+    autogen_context: AutogenContext, op: ModifyTableOps
+) -> List[str]:
+    opts = autogen_context.opts
+    render_as_batch = opts.get("render_as_batch", False)
+
+    if op.ops:
+        lines = []
+        if render_as_batch:
+            with autogen_context._within_batch():
+                lines.append(
+                    "with op.batch_alter_table(%r, schema=%r) as batch_op:"
+                    % (op.table_name, op.schema)
+                )
+                for t_op in op.ops:
+                    t_lines = render_op(autogen_context, t_op)
+                    lines.extend(t_lines)
+                lines.append("")
+        else:
+            for t_op in op.ops:
+                t_lines = render_op(autogen_context, t_op)
+                lines.extend(t_lines)
+
+        return lines
+    else:
+        return []
+
+
+@renderers.dispatch_for(ops.CreateTableCommentOp)
+def _render_create_table_comment(
+    autogen_context: AutogenContext, op: ops.CreateTableCommentOp
+) -> str:
+    if autogen_context._has_batch:
+        templ = (
+            "{prefix}create_table_comment(\n"
+            "{indent}{comment},\n"
+            "{indent}existing_comment={existing}\n"
+            ")"
+        )
+    else:
+        templ = (
+            "{prefix}create_table_comment(\n"
+            "{indent}'{tname}',\n"
+            "{indent}{comment},\n"
+            "{indent}existing_comment={existing},\n"
+            "{indent}schema={schema}\n"
+            ")"
+        )
+    return templ.format(
+        prefix=_alembic_autogenerate_prefix(autogen_context),
+        tname=op.table_name,
+        comment="%r" % op.comment if op.comment is not None else None,
+        existing=(
+            "%r" % op.existing_comment
+            if op.existing_comment is not None
+            else None
+        ),
+        schema="'%s'" % op.schema if op.schema is not None else None,
+        indent="    ",
+    )
+
+
+@renderers.dispatch_for(ops.DropTableCommentOp)
+def _render_drop_table_comment(
+    autogen_context: AutogenContext, op: ops.DropTableCommentOp
+) -> str:
+    if autogen_context._has_batch:
+        templ = (
+            "{prefix}drop_table_comment(\n"
+            "{indent}existing_comment={existing}\n"
+            ")"
+        )
+    else:
+        templ = (
+            "{prefix}drop_table_comment(\n"
+            "{indent}'{tname}',\n"
+            "{indent}existing_comment={existing},\n"
+            "{indent}schema={schema}\n"
+            ")"
+        )
+    return templ.format(
+        prefix=_alembic_autogenerate_prefix(autogen_context),
+        tname=op.table_name,
+        existing=(
+            "%r" % op.existing_comment
+            if op.existing_comment is not None
+            else None
+        ),
+        schema="'%s'" % op.schema if op.schema is not None else None,
+        indent="    ",
+    )
+
+
+@renderers.dispatch_for(ops.CreateTableOp)
+def _add_table(autogen_context: AutogenContext, op: ops.CreateTableOp) -> str:
+    table = op.to_table()
+
+    args = [
+        col
+        for col in [
+            _render_column(col, autogen_context) for col in table.columns
+        ]
+        if col
+    ] + sorted(
+        [
+            rcons
+            for rcons in [
+                _render_constraint(
+                    cons, autogen_context, op._namespace_metadata
+                )
+                for cons in table.constraints
+            ]
+            if rcons is not None
+        ]
+    )
+
+    if len(args) > MAX_PYTHON_ARGS:
+        args_str = "*[" + ",\n".join(args) + "]"
+    else:
+        args_str = ",\n".join(args)
+
+    text = "%(prefix)screate_table(%(tablename)r,\n%(args)s" % {
+        "tablename": _ident(op.table_name),
+        "prefix": _alembic_autogenerate_prefix(autogen_context),
+        "args": args_str,
+    }
+    if op.schema:
+        text += ",\nschema=%r" % _ident(op.schema)
+
+    comment = table.comment
+    if comment:
+        text += ",\ncomment=%r" % _ident(comment)
+
+    info = table.info
+    if info:
+        text += f",\ninfo={info!r}"
+
+    for k in sorted(op.kw):
+        text += ",\n%s=%r" % (k.replace(" ", "_"), op.kw[k])
+
+    if table._prefixes:
+        prefixes = ", ".join("'%s'" % p for p in table._prefixes)
+        text += ",\nprefixes=[%s]" % prefixes
+
+    if op.if_not_exists is not None:
+        text += ",\nif_not_exists=%r" % bool(op.if_not_exists)
+
+    text += "\n)"
+    return text
+
+
+@renderers.dispatch_for(ops.DropTableOp)
+def _drop_table(autogen_context: AutogenContext, op: ops.DropTableOp) -> str:
+    text = "%(prefix)sdrop_table(%(tname)r" % {
+        "prefix": _alembic_autogenerate_prefix(autogen_context),
+        "tname": _ident(op.table_name),
+    }
+    if op.schema:
+        text += ", schema=%r" % _ident(op.schema)
+
+    if op.if_exists is not None:
+        text += ", if_exists=%r" % bool(op.if_exists)
+
+    text += ")"
+    return text
+
+
+def _render_dialect_kwargs_items(
+    autogen_context: AutogenContext, item: DialectKWArgs
+) -> list[str]:
+    return [
+        f"{key}={_render_potential_expr(val, autogen_context)}"
+        for key, val in item.dialect_kwargs.items()
+    ]
+
+
+@renderers.dispatch_for(ops.CreateIndexOp)
+def _add_index(autogen_context: AutogenContext, op: ops.CreateIndexOp) -> str:
+    index = op.to_index()
+
+    has_batch = autogen_context._has_batch
+
+    if has_batch:
+        tmpl = (
+            "%(prefix)screate_index(%(name)r, [%(columns)s], "
+            "unique=%(unique)r%(kwargs)s)"
+        )
+    else:
+        tmpl = (
+            "%(prefix)screate_index(%(name)r, %(table)r, [%(columns)s], "
+            "unique=%(unique)r%(schema)s%(kwargs)s)"
+        )
+
+    assert index.table is not None
+
+    opts = _render_dialect_kwargs_items(autogen_context, index)
+    if op.if_not_exists is not None:
+        opts.append("if_not_exists=%r" % bool(op.if_not_exists))
+    text = tmpl % {
+        "prefix": _alembic_autogenerate_prefix(autogen_context),
+        "name": _render_gen_name(autogen_context, index.name),
+        "table": _ident(index.table.name),
+        "columns": ", ".join(
+            _get_index_rendered_expressions(index, autogen_context)
+        ),
+        "unique": index.unique or False,
+        "schema": (
+            (", schema=%r" % _ident(index.table.schema))
+            if index.table.schema
+            else ""
+        ),
+        "kwargs": ", " + ", ".join(opts) if opts else "",
+    }
+    return text
+
+
+@renderers.dispatch_for(ops.DropIndexOp)
+def _drop_index(autogen_context: AutogenContext, op: ops.DropIndexOp) -> str:
+    index = op.to_index()
+
+    has_batch = autogen_context._has_batch
+
+    if has_batch:
+        tmpl = "%(prefix)sdrop_index(%(name)r%(kwargs)s)"
+    else:
+        tmpl = (
+            "%(prefix)sdrop_index(%(name)r, "
+            "table_name=%(table_name)r%(schema)s%(kwargs)s)"
+        )
+    opts = _render_dialect_kwargs_items(autogen_context, index)
+    if op.if_exists is not None:
+        opts.append("if_exists=%r" % bool(op.if_exists))
+    text = tmpl % {
+        "prefix": _alembic_autogenerate_prefix(autogen_context),
+        "name": _render_gen_name(autogen_context, op.index_name),
+        "table_name": _ident(op.table_name),
+        "schema": ((", schema=%r" % _ident(op.schema)) if op.schema else ""),
+        "kwargs": ", " + ", ".join(opts) if opts else "",
+    }
+    return text
+
+
+@renderers.dispatch_for(ops.CreateUniqueConstraintOp)
+def _add_unique_constraint(
+    autogen_context: AutogenContext, op: ops.CreateUniqueConstraintOp
+) -> List[str]:
+    return [_uq_constraint(op.to_constraint(), autogen_context, True)]
+
+
+@renderers.dispatch_for(ops.CreateForeignKeyOp)
+def _add_fk_constraint(
+    autogen_context: AutogenContext, op: ops.CreateForeignKeyOp
+) -> str:
+    args = [repr(_render_gen_name(autogen_context, op.constraint_name))]
+    if not autogen_context._has_batch:
+        args.append(repr(_ident(op.source_table)))
+
+    args.extend(
+        [
+            repr(_ident(op.referent_table)),
+            repr([_ident(col) for col in op.local_cols]),
+            repr([_ident(col) for col in op.remote_cols]),
+        ]
+    )
+    kwargs = [
+        "referent_schema",
+        "onupdate",
+        "ondelete",
+        "initially",
+        "deferrable",
+        "use_alter",
+        "match",
+    ]
+    if not autogen_context._has_batch:
+        kwargs.insert(0, "source_schema")
+
+    for k in kwargs:
+        if k in op.kw:
+            value = op.kw[k]
+            if value is not None:
+                args.append("%s=%r" % (k, value))
+
+    return "%(prefix)screate_foreign_key(%(args)s)" % {
+        "prefix": _alembic_autogenerate_prefix(autogen_context),
+        "args": ", ".join(args),
+    }
+
+
+@renderers.dispatch_for(ops.CreatePrimaryKeyOp)
+def _add_pk_constraint(constraint, autogen_context):
+    raise NotImplementedError()
+
+
+@renderers.dispatch_for(ops.CreateCheckConstraintOp)
+def _add_check_constraint(constraint, autogen_context):
+    raise NotImplementedError()
+
+
+@renderers.dispatch_for(ops.DropConstraintOp)
+def _drop_constraint(
+    autogen_context: AutogenContext, op: ops.DropConstraintOp
+) -> str:
+    prefix = _alembic_autogenerate_prefix(autogen_context)
+    name = _render_gen_name(autogen_context, op.constraint_name)
+    schema = _ident(op.schema) if op.schema else None
+    type_ = _ident(op.constraint_type) if op.constraint_type else None
+
+    params_strs = []
+    params_strs.append(repr(name))
+    if not autogen_context._has_batch:
+        params_strs.append(repr(_ident(op.table_name)))
+        if schema is not None:
+            params_strs.append(f"schema={schema!r}")
+    if type_ is not None:
+        params_strs.append(f"type_={type_!r}")
+
+    return f"{prefix}drop_constraint({', '.join(params_strs)})"
+
+
+@renderers.dispatch_for(ops.AddColumnOp)
+def _add_column(autogen_context: AutogenContext, op: ops.AddColumnOp) -> str:
+    schema, tname, column = op.schema, op.table_name, op.column
+    if autogen_context._has_batch:
+        template = "%(prefix)sadd_column(%(column)s)"
+    else:
+        template = "%(prefix)sadd_column(%(tname)r, %(column)s"
+        if schema:
+            template += ", schema=%(schema)r"
+        template += ")"
+    text = template % {
+        "prefix": _alembic_autogenerate_prefix(autogen_context),
+        "tname": tname,
+        "column": _render_column(column, autogen_context),
+        "schema": schema,
+    }
+    return text
+
+
+@renderers.dispatch_for(ops.DropColumnOp)
+def _drop_column(autogen_context: AutogenContext, op: ops.DropColumnOp) -> str:
+    schema, tname, column_name = op.schema, op.table_name, op.column_name
+
+    if autogen_context._has_batch:
+        template = "%(prefix)sdrop_column(%(cname)r)"
+    else:
+        template = "%(prefix)sdrop_column(%(tname)r, %(cname)r"
+        if schema:
+            template += ", schema=%(schema)r"
+        template += ")"
+
+    text = template % {
+        "prefix": _alembic_autogenerate_prefix(autogen_context),
+        "tname": _ident(tname),
+        "cname": _ident(column_name),
+        "schema": _ident(schema),
+    }
+    return text
+
+
+@renderers.dispatch_for(ops.AlterColumnOp)
+def _alter_column(
+    autogen_context: AutogenContext, op: ops.AlterColumnOp
+) -> str:
+    tname = op.table_name
+    cname = op.column_name
+    server_default = op.modify_server_default
+    type_ = op.modify_type
+    nullable = op.modify_nullable
+    comment = op.modify_comment
+    autoincrement = op.kw.get("autoincrement", None)
+    existing_type = op.existing_type
+    existing_nullable = op.existing_nullable
+    existing_comment = op.existing_comment
+    existing_server_default = op.existing_server_default
+    schema = op.schema
+
+    indent = " " * 11
+
+    if autogen_context._has_batch:
+        template = "%(prefix)salter_column(%(cname)r"
+    else:
+        template = "%(prefix)salter_column(%(tname)r, %(cname)r"
+
+    text = template % {
+        "prefix": _alembic_autogenerate_prefix(autogen_context),
+        "tname": tname,
+        "cname": cname,
+    }
+    if existing_type is not None:
+        text += ",\n%sexisting_type=%s" % (
+            indent,
+            _repr_type(existing_type, autogen_context),
+        )
+    if server_default is not False:
+        rendered = _render_server_default(server_default, autogen_context)
+        text += ",\n%sserver_default=%s" % (indent, rendered)
+
+    if type_ is not None:
+        text += ",\n%stype_=%s" % (indent, _repr_type(type_, autogen_context))
+    if nullable is not None:
+        text += ",\n%snullable=%r" % (indent, nullable)
+    if comment is not False:
+        text += ",\n%scomment=%r" % (indent, comment)
+    if existing_comment is not None:
+        text += ",\n%sexisting_comment=%r" % (indent, existing_comment)
+    if nullable is None and existing_nullable is not None:
+        text += ",\n%sexisting_nullable=%r" % (indent, existing_nullable)
+    if autoincrement is not None:
+        text += ",\n%sautoincrement=%r" % (indent, autoincrement)
+    if server_default is False and existing_server_default:
+        rendered = _render_server_default(
+            existing_server_default, autogen_context
+        )
+        text += ",\n%sexisting_server_default=%s" % (indent, rendered)
+    if schema and not autogen_context._has_batch:
+        text += ",\n%sschema=%r" % (indent, schema)
+    text += ")"
+    return text
+
+
+class _f_name:
+    def __init__(self, prefix: str, name: conv) -> None:
+        self.prefix = prefix
+        self.name = name
+
+    def __repr__(self) -> str:
+        return "%sf(%r)" % (self.prefix, _ident(self.name))
+
+
+def _ident(name: Optional[Union[quoted_name, str]]) -> Optional[str]:
+    """produce a __repr__() object for a string identifier that may
+    use quoted_name() in SQLAlchemy 0.9 and greater.
+
+    The issue worked around here is that quoted_name() doesn't have
+    very good repr() behavior by itself when unicode is involved.
+
+    """
+    if name is None:
+        return name
+    elif isinstance(name, quoted_name):
+        return str(name)
+    elif isinstance(name, str):
+        return name
+
+
+def _render_potential_expr(
+    value: Any,
+    autogen_context: AutogenContext,
+    *,
+    wrap_in_element: bool = True,
+    is_server_default: bool = False,
+    is_index: bool = False,
+) -> str:
+    if isinstance(value, sql.ClauseElement):
+        sql_text = autogen_context.migration_context.impl.render_ddl_sql_expr(
+            value, is_server_default=is_server_default, is_index=is_index
+        )
+        if wrap_in_element:
+            prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
+            element = "literal_column" if is_index else "text"
+            value_str = f"{prefix}{element}({sql_text!r})"
+            if (
+                is_index
+                and isinstance(value, Label)
+                and type(value.name) is str
+            ):
+                return value_str + f".label({value.name!r})"
+            else:
+                return value_str
+        else:
+            return repr(sql_text)
+    else:
+        return repr(value)
+
+
+def _get_index_rendered_expressions(
+    idx: Index, autogen_context: AutogenContext
+) -> List[str]:
+    return [
+        (
+            repr(_ident(getattr(exp, "name", None)))
+            if isinstance(exp, sa_schema.Column)
+            else _render_potential_expr(exp, autogen_context, is_index=True)
+        )
+        for exp in idx.expressions
+    ]
+
+
+def _uq_constraint(
+    constraint: UniqueConstraint,
+    autogen_context: AutogenContext,
+    alter: bool,
+) -> str:
+    opts: List[Tuple[str, Any]] = []
+
+    has_batch = autogen_context._has_batch
+
+    if constraint.deferrable:
+        opts.append(("deferrable", constraint.deferrable))
+    if constraint.initially:
+        opts.append(("initially", constraint.initially))
+    if not has_batch and alter and constraint.table.schema:
+        opts.append(("schema", _ident(constraint.table.schema)))
+    if not alter and constraint.name:
+        opts.append(
+            ("name", _render_gen_name(autogen_context, constraint.name))
+        )
+    dialect_options = _render_dialect_kwargs_items(autogen_context, constraint)
+
+    if alter:
+        args = [repr(_render_gen_name(autogen_context, constraint.name))]
+        if not has_batch:
+            args += [repr(_ident(constraint.table.name))]
+        args.append(repr([_ident(col.name) for col in constraint.columns]))
+        args.extend(["%s=%r" % (k, v) for k, v in opts])
+        args.extend(dialect_options)
+        return "%(prefix)screate_unique_constraint(%(args)s)" % {
+            "prefix": _alembic_autogenerate_prefix(autogen_context),
+            "args": ", ".join(args),
+        }
+    else:
+        args = [repr(_ident(col.name)) for col in constraint.columns]
+        args.extend(["%s=%r" % (k, v) for k, v in opts])
+        args.extend(dialect_options)
+        return "%(prefix)sUniqueConstraint(%(args)s)" % {
+            "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+            "args": ", ".join(args),
+        }
+
+
+def _user_autogenerate_prefix(autogen_context, target):
+    prefix = autogen_context.opts["user_module_prefix"]
+    if prefix is None:
+        return "%s." % target.__module__
+    else:
+        return prefix
+
+
+def _sqlalchemy_autogenerate_prefix(autogen_context: AutogenContext) -> str:
+    return autogen_context.opts["sqlalchemy_module_prefix"] or ""
+
+
+def _alembic_autogenerate_prefix(autogen_context: AutogenContext) -> str:
+    if autogen_context._has_batch:
+        return "batch_op."
+    else:
+        return autogen_context.opts["alembic_module_prefix"] or ""
+
+
+def _user_defined_render(
+    type_: str, object_: Any, autogen_context: AutogenContext
+) -> Union[str, Literal[False]]:
+    if "render_item" in autogen_context.opts:
+        render = autogen_context.opts["render_item"]
+        if render:
+            rendered = render(type_, object_, autogen_context)
+            if rendered is not False:
+                return rendered
+    return False
+
+
+def _render_column(
+    column: Column[Any], autogen_context: AutogenContext
+) -> str:
+    rendered = _user_defined_render("column", column, autogen_context)
+    if rendered is not False:
+        return rendered
+
+    args: List[str] = []
+    opts: List[Tuple[str, Any]] = []
+
+    if column.server_default:
+        rendered = _render_server_default(  # type:ignore[assignment]
+            column.server_default, autogen_context
+        )
+        if rendered:
+            if _should_render_server_default_positionally(
+                column.server_default
+            ):
+                args.append(rendered)
+            else:
+                opts.append(("server_default", rendered))
+
+    if (
+        column.autoincrement is not None
+        and column.autoincrement != sqla_compat.AUTOINCREMENT_DEFAULT
+    ):
+        opts.append(("autoincrement", column.autoincrement))
+
+    if column.nullable is not None:
+        opts.append(("nullable", column.nullable))
+
+    if column.system:
+        opts.append(("system", column.system))
+
+    comment = column.comment
+    if comment:
+        opts.append(("comment", "%r" % comment))
+
+    # TODO: for non-ascii colname, assign a "key"
+    return "%(prefix)sColumn(%(name)r, %(type)s, %(args)s%(kwargs)s)" % {
+        "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+        "name": _ident(column.name),
+        "type": _repr_type(column.type, autogen_context),
+        "args": ", ".join([str(arg) for arg in args]) + ", " if args else "",
+        "kwargs": (
+            ", ".join(
+                ["%s=%s" % (kwname, val) for kwname, val in opts]
+                + [
+                    "%s=%s"
+                    % (key, _render_potential_expr(val, autogen_context))
+                    for key, val in column.kwargs.items()
+                ]
+            )
+        ),
+    }
+
+
+def _should_render_server_default_positionally(server_default: Any) -> bool:
+    return sqla_compat._server_default_is_computed(
+        server_default
+    ) or sqla_compat._server_default_is_identity(server_default)
+
+
+def _render_server_default(
+    default: Optional[
+        Union[FetchedValue, str, TextClause, ColumnElement[Any]]
+    ],
+    autogen_context: AutogenContext,
+    repr_: bool = True,
+) -> Optional[str]:
+    rendered = _user_defined_render("server_default", default, autogen_context)
+    if rendered is not False:
+        return rendered
+
+    if sqla_compat._server_default_is_computed(default):
+        return _render_computed(cast("Computed", default), autogen_context)
+    elif sqla_compat._server_default_is_identity(default):
+        return _render_identity(cast("Identity", default), autogen_context)
+    elif isinstance(default, sa_schema.DefaultClause):
+        if isinstance(default.arg, str):
+            default = default.arg
+        else:
+            return _render_potential_expr(
+                default.arg, autogen_context, is_server_default=True
+            )
+
+    if isinstance(default, str) and repr_:
+        default = repr(re.sub(r"^'|'$", "", default))
+
+    return cast(str, default)
+
+
+def _render_computed(
+    computed: Computed, autogen_context: AutogenContext
+) -> str:
+    text = _render_potential_expr(
+        computed.sqltext, autogen_context, wrap_in_element=False
+    )
+
+    kwargs = {}
+    if computed.persisted is not None:
+        kwargs["persisted"] = computed.persisted
+    return "%(prefix)sComputed(%(text)s, %(kwargs)s)" % {
+        "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+        "text": text,
+        "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
+    }
+
+
+def _render_identity(
+    identity: Identity, autogen_context: AutogenContext
+) -> str:
+    kwargs = sqla_compat._get_identity_options_dict(
+        identity, dialect_kwargs=True
+    )
+
+    return "%(prefix)sIdentity(%(kwargs)s)" % {
+        "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+        "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
+    }
+
+
+def _repr_type(
+    type_: TypeEngine,
+    autogen_context: AutogenContext,
+    _skip_variants: bool = False,
+) -> str:
+    rendered = _user_defined_render("type", type_, autogen_context)
+    if rendered is not False:
+        return rendered
+
+    if hasattr(autogen_context.migration_context, "impl"):
+        impl_rt = autogen_context.migration_context.impl.render_type(
+            type_, autogen_context
+        )
+    else:
+        impl_rt = None
+
+    mod = type(type_).__module__
+    imports = autogen_context.imports
+
+    if not _skip_variants and sqla_compat._type_has_variants(type_):
+        return _render_Variant_type(type_, autogen_context)
+    elif mod.startswith("sqlalchemy.dialects"):
+        match = re.match(r"sqlalchemy\.dialects\.(\w+)", mod)
+        assert match is not None
+        dname = match.group(1)
+        if imports is not None:
+            imports.add("from sqlalchemy.dialects import %s" % dname)
+        if impl_rt:
+            return impl_rt
+        else:
+            return "%s.%r" % (dname, type_)
+    elif impl_rt:
+        return impl_rt
+    elif mod.startswith("sqlalchemy."):
+        if "_render_%s_type" % type_.__visit_name__ in globals():
+            fn = globals()["_render_%s_type" % type_.__visit_name__]
+            return fn(type_, autogen_context)
+        else:
+            prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
+            return "%s%r" % (prefix, type_)
+    else:
+        prefix = _user_autogenerate_prefix(autogen_context, type_)
+        return "%s%r" % (prefix, type_)
+
+
+def _render_ARRAY_type(type_: ARRAY, autogen_context: AutogenContext) -> str:
+    return cast(
+        str,
+        _render_type_w_subtype(
+            type_, autogen_context, "item_type", r"(.+?\()"
+        ),
+    )
+
+
+def _render_Variant_type(
+    type_: TypeEngine, autogen_context: AutogenContext
+) -> str:
+    base_type, variant_mapping = sqla_compat._get_variant_mapping(type_)
+    base = _repr_type(base_type, autogen_context, _skip_variants=True)
+    assert base is not None and base is not False  # type: ignore[comparison-overlap]  # noqa:E501
+    for dialect in sorted(variant_mapping):
+        typ = variant_mapping[dialect]
+        base += ".with_variant(%s, %r)" % (
+            _repr_type(typ, autogen_context, _skip_variants=True),
+            dialect,
+        )
+    return base
+
+
+def _render_type_w_subtype(
+    type_: TypeEngine,
+    autogen_context: AutogenContext,
+    attrname: str,
+    regexp: str,
+    prefix: Optional[str] = None,
+) -> Union[Optional[str], Literal[False]]:
+    outer_repr = repr(type_)
+    inner_type = getattr(type_, attrname, None)
+    if inner_type is None:
+        return False
+
+    inner_repr = repr(inner_type)
+
+    inner_repr = re.sub(r"([\(\)])", r"\\\1", inner_repr)
+    sub_type = _repr_type(getattr(type_, attrname), autogen_context)
+    outer_type = re.sub(regexp + inner_repr, r"\1%s" % sub_type, outer_repr)
+
+    if prefix:
+        return "%s%s" % (prefix, outer_type)
+
+    mod = type(type_).__module__
+    if mod.startswith("sqlalchemy.dialects"):
+        match = re.match(r"sqlalchemy\.dialects\.(\w+)", mod)
+        assert match is not None
+        dname = match.group(1)
+        return "%s.%s" % (dname, outer_type)
+    elif mod.startswith("sqlalchemy"):
+        prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
+        return "%s%s" % (prefix, outer_type)
+    else:
+        return None
+
+
+_constraint_renderers = util.Dispatcher()
+
+
+def _render_constraint(
+    constraint: Constraint,
+    autogen_context: AutogenContext,
+    namespace_metadata: Optional[MetaData],
+) -> Optional[str]:
+    try:
+        renderer = _constraint_renderers.dispatch(constraint)
+    except ValueError:
+        util.warn("No renderer is established for object %r" % constraint)
+        return "[Unknown Python object %r]" % constraint
+    else:
+        return renderer(constraint, autogen_context, namespace_metadata)
+
+
+@_constraint_renderers.dispatch_for(sa_schema.PrimaryKeyConstraint)
+def _render_primary_key(
+    constraint: PrimaryKeyConstraint,
+    autogen_context: AutogenContext,
+    namespace_metadata: Optional[MetaData],
+) -> Optional[str]:
+    rendered = _user_defined_render("primary_key", constraint, autogen_context)
+    if rendered is not False:
+        return rendered
+
+    if not constraint.columns:
+        return None
+
+    opts = []
+    if constraint.name:
+        opts.append(
+            ("name", repr(_render_gen_name(autogen_context, constraint.name)))
+        )
+    return "%(prefix)sPrimaryKeyConstraint(%(args)s)" % {
+        "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+        "args": ", ".join(
+            [repr(c.name) for c in constraint.columns]
+            + ["%s=%s" % (kwname, val) for kwname, val in opts]
+        ),
+    }
+
+
+def _fk_colspec(
+    fk: ForeignKey,
+    metadata_schema: Optional[str],
+    namespace_metadata: MetaData,
+) -> str:
+    """Implement a 'safe' version of ForeignKey._get_colspec() that
+    won't fail if the remote table can't be resolved.
+
+    """
+    colspec = fk._get_colspec()
+    tokens = colspec.split(".")
+    tname, colname = tokens[-2:]
+
+    if metadata_schema is not None and len(tokens) == 2:
+        table_fullname = "%s.%s" % (metadata_schema, tname)
+    else:
+        table_fullname = ".".join(tokens[0:-1])
+
+    if (
+        not fk.link_to_name
+        and fk.parent is not None
+        and fk.parent.table is not None
+    ):
+        # try to resolve the remote table in order to adjust for column.key.
+        # the FK constraint needs to be rendered in terms of the column
+        # name.
+
+        if table_fullname in namespace_metadata.tables:
+            col = namespace_metadata.tables[table_fullname].c.get(colname)
+            if col is not None:
+                colname = _ident(col.name)  # type: ignore[assignment]
+
+    colspec = "%s.%s" % (table_fullname, colname)
+
+    return colspec
+
+
+def _populate_render_fk_opts(
+    constraint: ForeignKeyConstraint, opts: List[Tuple[str, str]]
+) -> None:
+    if constraint.onupdate:
+        opts.append(("onupdate", repr(constraint.onupdate)))
+    if constraint.ondelete:
+        opts.append(("ondelete", repr(constraint.ondelete)))
+    if constraint.initially:
+        opts.append(("initially", repr(constraint.initially)))
+    if constraint.deferrable:
+        opts.append(("deferrable", repr(constraint.deferrable)))
+    if constraint.use_alter:
+        opts.append(("use_alter", repr(constraint.use_alter)))
+    if constraint.match:
+        opts.append(("match", repr(constraint.match)))
+
+
+@_constraint_renderers.dispatch_for(sa_schema.ForeignKeyConstraint)
+def _render_foreign_key(
+    constraint: ForeignKeyConstraint,
+    autogen_context: AutogenContext,
+    namespace_metadata: MetaData,
+) -> Optional[str]:
+    rendered = _user_defined_render("foreign_key", constraint, autogen_context)
+    if rendered is not False:
+        return rendered
+
+    opts = []
+    if constraint.name:
+        opts.append(
+            ("name", repr(_render_gen_name(autogen_context, constraint.name)))
+        )
+
+    _populate_render_fk_opts(constraint, opts)
+
+    apply_metadata_schema = namespace_metadata.schema
+    return (
+        "%(prefix)sForeignKeyConstraint([%(cols)s], "
+        "[%(refcols)s], %(args)s)"
+        % {
+            "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+            "cols": ", ".join(
+                repr(_ident(f.parent.name)) for f in constraint.elements
+            ),
+            "refcols": ", ".join(
+                repr(_fk_colspec(f, apply_metadata_schema, namespace_metadata))
+                for f in constraint.elements
+            ),
+            "args": ", ".join(
+                ["%s=%s" % (kwname, val) for kwname, val in opts]
+            ),
+        }
+    )
+
+
+@_constraint_renderers.dispatch_for(sa_schema.UniqueConstraint)
+def _render_unique_constraint(
+    constraint: UniqueConstraint,
+    autogen_context: AutogenContext,
+    namespace_metadata: Optional[MetaData],
+) -> str:
+    rendered = _user_defined_render("unique", constraint, autogen_context)
+    if rendered is not False:
+        return rendered
+
+    return _uq_constraint(constraint, autogen_context, False)
+
+
+@_constraint_renderers.dispatch_for(sa_schema.CheckConstraint)
+def _render_check_constraint(
+    constraint: CheckConstraint,
+    autogen_context: AutogenContext,
+    namespace_metadata: Optional[MetaData],
+) -> Optional[str]:
+    rendered = _user_defined_render("check", constraint, autogen_context)
+    if rendered is not False:
+        return rendered
+
+    # detect the constraint being part of
+    # a parent type which is probably in the Table already.
+    # ideally SQLAlchemy would give us more of a first class
+    # way to detect this.
+    if (
+        constraint._create_rule
+        and hasattr(constraint._create_rule, "target")
+        and isinstance(
+            constraint._create_rule.target,
+            sqltypes.TypeEngine,
+        )
+    ):
+        return None
+    opts = []
+    if constraint.name:
+        opts.append(
+            ("name", repr(_render_gen_name(autogen_context, constraint.name)))
+        )
+    return "%(prefix)sCheckConstraint(%(sqltext)s%(opts)s)" % {
+        "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+        "opts": (
+            ", " + (", ".join("%s=%s" % (k, v) for k, v in opts))
+            if opts
+            else ""
+        ),
+        "sqltext": _render_potential_expr(
+            constraint.sqltext, autogen_context, wrap_in_element=False
+        ),
+    }
+
+
+@renderers.dispatch_for(ops.ExecuteSQLOp)
+def _execute_sql(autogen_context: AutogenContext, op: ops.ExecuteSQLOp) -> str:
+    if not isinstance(op.sqltext, str):
+        raise NotImplementedError(
+            "Autogenerate rendering of SQL Expression language constructs "
+            "not supported here; please use a plain SQL string"
+        )
+    return "op.execute(%r)" % op.sqltext
+
+
+renderers = default_renderers.branch()
diff --git a/.venv/lib/python3.12/site-packages/alembic/autogenerate/rewriter.py b/.venv/lib/python3.12/site-packages/alembic/autogenerate/rewriter.py
new file mode 100644
index 00000000..8994dcf8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/autogenerate/rewriter.py
@@ -0,0 +1,240 @@
+from __future__ import annotations
+
+from typing import Any
+from typing import Callable
+from typing import Iterator
+from typing import List
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import Union
+
+from .. import util
+from ..operations import ops
+
+if TYPE_CHECKING:
+    from ..operations.ops import AddColumnOp
+    from ..operations.ops import AlterColumnOp
+    from ..operations.ops import CreateTableOp
+    from ..operations.ops import DowngradeOps
+    from ..operations.ops import MigrateOperation
+    from ..operations.ops import MigrationScript
+    from ..operations.ops import ModifyTableOps
+    from ..operations.ops import OpContainer
+    from ..operations.ops import UpgradeOps
+    from ..runtime.migration import MigrationContext
+    from ..script.revision import _GetRevArg
+
+ProcessRevisionDirectiveFn = Callable[
+    ["MigrationContext", "_GetRevArg", List["MigrationScript"]], None
+]
+
+
+class Rewriter:
+    """A helper object that allows easy 'rewriting' of ops streams.
+
+    The :class:`.Rewriter` object is intended to be passed along
+    to the
+    :paramref:`.EnvironmentContext.configure.process_revision_directives`
+    parameter in an ``env.py`` script.    Once constructed, any number
+    of "rewrites" functions can be associated with it, which will be given
+    the opportunity to modify the structure without having to have explicit
+    knowledge of the overall structure.
+
+    The function is passed the :class:`.MigrationContext` object and
+    ``revision`` tuple that are passed to the  :paramref:`.Environment
+    Context.configure.process_revision_directives` function normally,
+    and the third argument is an individual directive of the type
+    noted in the decorator.  The function has the choice of  returning
+    a single op directive, which normally can be the directive that
+    was actually passed, or a new directive to replace it, or a list
+    of zero or more directives to replace it.
+
+    .. seealso::
+
+        :ref:`autogen_rewriter` - usage example
+
+    """
+
+    _traverse = util.Dispatcher()
+
+    _chained: Tuple[Union[ProcessRevisionDirectiveFn, Rewriter], ...] = ()
+
+    def __init__(self) -> None:
+        self.dispatch = util.Dispatcher()
+
+    def chain(
+        self,
+        other: Union[
+            ProcessRevisionDirectiveFn,
+            Rewriter,
+        ],
+    ) -> Rewriter:
+        """Produce a "chain" of this :class:`.Rewriter` to another.
+
+        This allows two or more rewriters to operate serially on a stream,
+        e.g.::
+
+            writer1 = autogenerate.Rewriter()
+            writer2 = autogenerate.Rewriter()
+
+
+            @writer1.rewrites(ops.AddColumnOp)
+            def add_column_nullable(context, revision, op):
+                op.column.nullable = True
+                return op
+
+
+            @writer2.rewrites(ops.AddColumnOp)
+            def add_column_idx(context, revision, op):
+                idx_op = ops.CreateIndexOp(
+                    "ixc", op.table_name, [op.column.name]
+                )
+                return [op, idx_op]
+
+            writer = writer1.chain(writer2)
+
+        :param other: a :class:`.Rewriter` instance
+        :return: a new :class:`.Rewriter` that will run the operations
+         of this writer, then the "other" writer, in succession.
+
+        """
+        wr = self.__class__.__new__(self.__class__)
+        wr.__dict__.update(self.__dict__)
+        wr._chained += (other,)
+        return wr
+
+    def rewrites(
+        self,
+        operator: Union[
+            Type[AddColumnOp],
+            Type[MigrateOperation],
+            Type[AlterColumnOp],
+            Type[CreateTableOp],
+            Type[ModifyTableOps],
+        ],
+    ) -> Callable[..., Any]:
+        """Register a function as rewriter for a given type.
+
+        The function should receive three arguments, which are
+        the :class:`.MigrationContext`, a ``revision`` tuple, and
+        an op directive of the type indicated.  E.g.::
+
+            @writer1.rewrites(ops.AddColumnOp)
+            def add_column_nullable(context, revision, op):
+                op.column.nullable = True
+                return op
+
+        """
+        return self.dispatch.dispatch_for(operator)
+
+    def _rewrite(
+        self,
+        context: MigrationContext,
+        revision: _GetRevArg,
+        directive: MigrateOperation,
+    ) -> Iterator[MigrateOperation]:
+        try:
+            _rewriter = self.dispatch.dispatch(directive)
+        except ValueError:
+            _rewriter = None
+            yield directive
+        else:
+            if self in directive._mutations:
+                yield directive
+            else:
+                for r_directive in util.to_list(
+                    _rewriter(context, revision, directive), []
+                ):
+                    r_directive._mutations = r_directive._mutations.union(
+                        [self]
+                    )
+                    yield r_directive
+
+    def __call__(
+        self,
+        context: MigrationContext,
+        revision: _GetRevArg,
+        directives: List[MigrationScript],
+    ) -> None:
+        self.process_revision_directives(context, revision, directives)
+        for process_revision_directives in self._chained:
+            process_revision_directives(context, revision, directives)
+
+    @_traverse.dispatch_for(ops.MigrationScript)
+    def _traverse_script(
+        self,
+        context: MigrationContext,
+        revision: _GetRevArg,
+        directive: MigrationScript,
+    ) -> None:
+        upgrade_ops_list: List[UpgradeOps] = []
+        for upgrade_ops in directive.upgrade_ops_list:
+            ret = self._traverse_for(context, revision, upgrade_ops)
+            if len(ret) != 1:
+                raise ValueError(
+                    "Can only return single object for UpgradeOps traverse"
+                )
+            upgrade_ops_list.append(ret[0])
+
+        directive.upgrade_ops = upgrade_ops_list  # type: ignore
+
+        downgrade_ops_list: List[DowngradeOps] = []
+        for downgrade_ops in directive.downgrade_ops_list:
+            ret = self._traverse_for(context, revision, downgrade_ops)
+            if len(ret) != 1:
+                raise ValueError(
+                    "Can only return single object for DowngradeOps traverse"
+                )
+            downgrade_ops_list.append(ret[0])
+        directive.downgrade_ops = downgrade_ops_list  # type: ignore
+
+    @_traverse.dispatch_for(ops.OpContainer)
+    def _traverse_op_container(
+        self,
+        context: MigrationContext,
+        revision: _GetRevArg,
+        directive: OpContainer,
+    ) -> None:
+        self._traverse_list(context, revision, directive.ops)
+
+    @_traverse.dispatch_for(ops.MigrateOperation)
+    def _traverse_any_directive(
+        self,
+        context: MigrationContext,
+        revision: _GetRevArg,
+        directive: MigrateOperation,
+    ) -> None:
+        pass
+
+    def _traverse_for(
+        self,
+        context: MigrationContext,
+        revision: _GetRevArg,
+        directive: MigrateOperation,
+    ) -> Any:
+        directives = list(self._rewrite(context, revision, directive))
+        for directive in directives:
+            traverser = self._traverse.dispatch(directive)
+            traverser(self, context, revision, directive)
+        return directives
+
+    def _traverse_list(
+        self,
+        context: MigrationContext,
+        revision: _GetRevArg,
+        directives: Any,
+    ) -> None:
+        dest = []
+        for directive in directives:
+            dest.extend(self._traverse_for(context, revision, directive))
+
+        directives[:] = dest
+
+    def process_revision_directives(
+        self,
+        context: MigrationContext,
+        revision: _GetRevArg,
+        directives: List[MigrationScript],
+    ) -> None:
+        self._traverse_list(context, revision, directives)