about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/alembic/ddl/impl.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/alembic/ddl/impl.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/ddl/impl.py')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/ddl/impl.py885
1 files changed, 885 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/ddl/impl.py b/.venv/lib/python3.12/site-packages/alembic/ddl/impl.py
new file mode 100644
index 00000000..c116fcfa
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/ddl/impl.py
@@ -0,0 +1,885 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from __future__ import annotations
+
+import logging
+import re
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import Iterable
+from typing import List
+from typing import Mapping
+from typing import NamedTuple
+from typing import Optional
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy import cast
+from sqlalchemy import Column
+from sqlalchemy import MetaData
+from sqlalchemy import PrimaryKeyConstraint
+from sqlalchemy import schema
+from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy import text
+
+from . import _autogen
+from . import base
+from ._autogen import _constraint_sig as _constraint_sig
+from ._autogen import ComparisonResult as ComparisonResult
+from .. import util
+from ..util import sqla_compat
+
+if TYPE_CHECKING:
+    from typing import Literal
+    from typing import TextIO
+
+    from sqlalchemy.engine import Connection
+    from sqlalchemy.engine import Dialect
+    from sqlalchemy.engine.cursor import CursorResult
+    from sqlalchemy.engine.reflection import Inspector
+    from sqlalchemy.sql import ClauseElement
+    from sqlalchemy.sql import Executable
+    from sqlalchemy.sql.elements import ColumnElement
+    from sqlalchemy.sql.elements import quoted_name
+    from sqlalchemy.sql.schema import Constraint
+    from sqlalchemy.sql.schema import ForeignKeyConstraint
+    from sqlalchemy.sql.schema import Index
+    from sqlalchemy.sql.schema import UniqueConstraint
+    from sqlalchemy.sql.selectable import TableClause
+    from sqlalchemy.sql.type_api import TypeEngine
+
+    from .base import _ServerDefault
+    from ..autogenerate.api import AutogenContext
+    from ..operations.batch import ApplyBatchImpl
+    from ..operations.batch import BatchOperationsImpl
+
+log = logging.getLogger(__name__)
+
+
+class ImplMeta(type):
+    def __init__(
+        cls,
+        classname: str,
+        bases: Tuple[Type[DefaultImpl]],
+        dict_: Dict[str, Any],
+    ):
+        newtype = type.__init__(cls, classname, bases, dict_)
+        if "__dialect__" in dict_:
+            _impls[dict_["__dialect__"]] = cls  # type: ignore[assignment]
+        return newtype
+
+
+_impls: Dict[str, Type[DefaultImpl]] = {}
+
+
+class DefaultImpl(metaclass=ImplMeta):
+    """Provide the entrypoint for major migration operations,
+    including database-specific behavioral variances.
+
+    While individual SQL/DDL constructs already provide
+    for database-specific implementations, variances here
+    allow for entirely different sequences of operations
+    to take place for a particular migration, such as
+    SQL Server's special 'IDENTITY INSERT' step for
+    bulk inserts.
+
+    """
+
+    __dialect__ = "default"
+
+    transactional_ddl = False
+    command_terminator = ";"
+    type_synonyms: Tuple[Set[str], ...] = ({"NUMERIC", "DECIMAL"},)
+    type_arg_extract: Sequence[str] = ()
+    # These attributes are deprecated in SQLAlchemy via #10247. They need to
+    # be ignored to support older version that did not use dialect kwargs.
+    # They only apply to Oracle and are replaced by oracle_order,
+    # oracle_on_null
+    identity_attrs_ignore: Tuple[str, ...] = ("order", "on_null")
+
+    def __init__(
+        self,
+        dialect: Dialect,
+        connection: Optional[Connection],
+        as_sql: bool,
+        transactional_ddl: Optional[bool],
+        output_buffer: Optional[TextIO],
+        context_opts: Dict[str, Any],
+    ) -> None:
+        self.dialect = dialect
+        self.connection = connection
+        self.as_sql = as_sql
+        self.literal_binds = context_opts.get("literal_binds", False)
+
+        self.output_buffer = output_buffer
+        self.memo: dict = {}
+        self.context_opts = context_opts
+        if transactional_ddl is not None:
+            self.transactional_ddl = transactional_ddl
+
+        if self.literal_binds:
+            if not self.as_sql:
+                raise util.CommandError(
+                    "Can't use literal_binds setting without as_sql mode"
+                )
+
+    @classmethod
+    def get_by_dialect(cls, dialect: Dialect) -> Type[DefaultImpl]:
+        return _impls[dialect.name]
+
+    def static_output(self, text: str) -> None:
+        assert self.output_buffer is not None
+        self.output_buffer.write(text + "\n\n")
+        self.output_buffer.flush()
+
+    def version_table_impl(
+        self,
+        *,
+        version_table: str,
+        version_table_schema: Optional[str],
+        version_table_pk: bool,
+        **kw: Any,
+    ) -> Table:
+        """Generate a :class:`.Table` object which will be used as the
+        structure for the Alembic version table.
+
+        Third party dialects may override this hook to provide an alternate
+        structure for this :class:`.Table`; requirements are only that it
+        be named based on the ``version_table`` parameter and contains
+        at least a single string-holding column named ``version_num``.
+
+        .. versionadded:: 1.14
+
+        """
+        vt = Table(
+            version_table,
+            MetaData(),
+            Column("version_num", String(32), nullable=False),
+            schema=version_table_schema,
+        )
+        if version_table_pk:
+            vt.append_constraint(
+                PrimaryKeyConstraint(
+                    "version_num", name=f"{version_table}_pkc"
+                )
+            )
+
+        return vt
+
+    def requires_recreate_in_batch(
+        self, batch_op: BatchOperationsImpl
+    ) -> bool:
+        """Return True if the given :class:`.BatchOperationsImpl`
+        would need the table to be recreated and copied in order to
+        proceed.
+
+        Normally, only returns True on SQLite when operations other
+        than add_column are present.
+
+        """
+        return False
+
+    def prep_table_for_batch(
+        self, batch_impl: ApplyBatchImpl, table: Table
+    ) -> None:
+        """perform any operations needed on a table before a new
+        one is created to replace it in batch mode.
+
+        the PG dialect uses this to drop constraints on the table
+        before the new one uses those same names.
+
+        """
+
+    @property
+    def bind(self) -> Optional[Connection]:
+        return self.connection
+
+    def _exec(
+        self,
+        construct: Union[Executable, str],
+        execution_options: Optional[Mapping[str, Any]] = None,
+        multiparams: Optional[Sequence[Mapping[str, Any]]] = None,
+        params: Mapping[str, Any] = util.immutabledict(),
+    ) -> Optional[CursorResult]:
+        if isinstance(construct, str):
+            construct = text(construct)
+        if self.as_sql:
+            if multiparams is not None or params:
+                raise TypeError("SQL parameters not allowed with as_sql")
+
+            compile_kw: dict[str, Any]
+            if self.literal_binds and not isinstance(
+                construct, schema.DDLElement
+            ):
+                compile_kw = dict(compile_kwargs={"literal_binds": True})
+            else:
+                compile_kw = {}
+
+            if TYPE_CHECKING:
+                assert isinstance(construct, ClauseElement)
+            compiled = construct.compile(dialect=self.dialect, **compile_kw)
+            self.static_output(
+                str(compiled).replace("\t", "    ").strip()
+                + self.command_terminator
+            )
+            return None
+        else:
+            conn = self.connection
+            assert conn is not None
+            if execution_options:
+                conn = conn.execution_options(**execution_options)
+
+            if params and multiparams is not None:
+                raise TypeError(
+                    "Can't send params and multiparams at the same time"
+                )
+
+            if multiparams:
+                return conn.execute(construct, multiparams)
+            else:
+                return conn.execute(construct, params)
+
+    def execute(
+        self,
+        sql: Union[Executable, str],
+        execution_options: Optional[dict[str, Any]] = None,
+    ) -> None:
+        self._exec(sql, execution_options)
+
+    def alter_column(
+        self,
+        table_name: str,
+        column_name: str,
+        nullable: Optional[bool] = None,
+        server_default: Union[_ServerDefault, Literal[False]] = False,
+        name: Optional[str] = None,
+        type_: Optional[TypeEngine] = None,
+        schema: Optional[str] = None,
+        autoincrement: Optional[bool] = None,
+        comment: Optional[Union[str, Literal[False]]] = False,
+        existing_comment: Optional[str] = None,
+        existing_type: Optional[TypeEngine] = None,
+        existing_server_default: Optional[_ServerDefault] = None,
+        existing_nullable: Optional[bool] = None,
+        existing_autoincrement: Optional[bool] = None,
+        **kw: Any,
+    ) -> None:
+        if autoincrement is not None or existing_autoincrement is not None:
+            util.warn(
+                "autoincrement and existing_autoincrement "
+                "only make sense for MySQL",
+                stacklevel=3,
+            )
+        if nullable is not None:
+            self._exec(
+                base.ColumnNullable(
+                    table_name,
+                    column_name,
+                    nullable,
+                    schema=schema,
+                    existing_type=existing_type,
+                    existing_server_default=existing_server_default,
+                    existing_nullable=existing_nullable,
+                    existing_comment=existing_comment,
+                )
+            )
+        if server_default is not False:
+            kw = {}
+            cls_: Type[
+                Union[
+                    base.ComputedColumnDefault,
+                    base.IdentityColumnDefault,
+                    base.ColumnDefault,
+                ]
+            ]
+            if sqla_compat._server_default_is_computed(
+                server_default, existing_server_default
+            ):
+                cls_ = base.ComputedColumnDefault
+            elif sqla_compat._server_default_is_identity(
+                server_default, existing_server_default
+            ):
+                cls_ = base.IdentityColumnDefault
+                kw["impl"] = self
+            else:
+                cls_ = base.ColumnDefault
+            self._exec(
+                cls_(
+                    table_name,
+                    column_name,
+                    server_default,  # type:ignore[arg-type]
+                    schema=schema,
+                    existing_type=existing_type,
+                    existing_server_default=existing_server_default,
+                    existing_nullable=existing_nullable,
+                    existing_comment=existing_comment,
+                    **kw,
+                )
+            )
+        if type_ is not None:
+            self._exec(
+                base.ColumnType(
+                    table_name,
+                    column_name,
+                    type_,
+                    schema=schema,
+                    existing_type=existing_type,
+                    existing_server_default=existing_server_default,
+                    existing_nullable=existing_nullable,
+                    existing_comment=existing_comment,
+                )
+            )
+
+        if comment is not False:
+            self._exec(
+                base.ColumnComment(
+                    table_name,
+                    column_name,
+                    comment,
+                    schema=schema,
+                    existing_type=existing_type,
+                    existing_server_default=existing_server_default,
+                    existing_nullable=existing_nullable,
+                    existing_comment=existing_comment,
+                )
+            )
+
+        # do the new name last ;)
+        if name is not None:
+            self._exec(
+                base.ColumnName(
+                    table_name,
+                    column_name,
+                    name,
+                    schema=schema,
+                    existing_type=existing_type,
+                    existing_server_default=existing_server_default,
+                    existing_nullable=existing_nullable,
+                )
+            )
+
+    def add_column(
+        self,
+        table_name: str,
+        column: Column[Any],
+        schema: Optional[Union[str, quoted_name]] = None,
+    ) -> None:
+        self._exec(base.AddColumn(table_name, column, schema=schema))
+
+    def drop_column(
+        self,
+        table_name: str,
+        column: Column[Any],
+        schema: Optional[str] = None,
+        **kw,
+    ) -> None:
+        self._exec(base.DropColumn(table_name, column, schema=schema))
+
+    def add_constraint(self, const: Any) -> None:
+        if const._create_rule is None or const._create_rule(self):
+            self._exec(schema.AddConstraint(const))
+
+    def drop_constraint(self, const: Constraint) -> None:
+        self._exec(schema.DropConstraint(const))
+
+    def rename_table(
+        self,
+        old_table_name: str,
+        new_table_name: Union[str, quoted_name],
+        schema: Optional[Union[str, quoted_name]] = None,
+    ) -> None:
+        self._exec(
+            base.RenameTable(old_table_name, new_table_name, schema=schema)
+        )
+
+    def create_table(self, table: Table, **kw: Any) -> None:
+        table.dispatch.before_create(
+            table, self.connection, checkfirst=False, _ddl_runner=self
+        )
+        self._exec(schema.CreateTable(table, **kw))
+        table.dispatch.after_create(
+            table, self.connection, checkfirst=False, _ddl_runner=self
+        )
+        for index in table.indexes:
+            self._exec(schema.CreateIndex(index))
+
+        with_comment = (
+            self.dialect.supports_comments and not self.dialect.inline_comments
+        )
+        comment = table.comment
+        if comment and with_comment:
+            self.create_table_comment(table)
+
+        for column in table.columns:
+            comment = column.comment
+            if comment and with_comment:
+                self.create_column_comment(column)
+
+    def drop_table(self, table: Table, **kw: Any) -> None:
+        table.dispatch.before_drop(
+            table, self.connection, checkfirst=False, _ddl_runner=self
+        )
+        self._exec(schema.DropTable(table, **kw))
+        table.dispatch.after_drop(
+            table, self.connection, checkfirst=False, _ddl_runner=self
+        )
+
+    def create_index(self, index: Index, **kw: Any) -> None:
+        self._exec(schema.CreateIndex(index, **kw))
+
+    def create_table_comment(self, table: Table) -> None:
+        self._exec(schema.SetTableComment(table))
+
+    def drop_table_comment(self, table: Table) -> None:
+        self._exec(schema.DropTableComment(table))
+
+    def create_column_comment(self, column: ColumnElement[Any]) -> None:
+        self._exec(schema.SetColumnComment(column))
+
+    def drop_index(self, index: Index, **kw: Any) -> None:
+        self._exec(schema.DropIndex(index, **kw))
+
+    def bulk_insert(
+        self,
+        table: Union[TableClause, Table],
+        rows: List[dict],
+        multiinsert: bool = True,
+    ) -> None:
+        if not isinstance(rows, list):
+            raise TypeError("List expected")
+        elif rows and not isinstance(rows[0], dict):
+            raise TypeError("List of dictionaries expected")
+        if self.as_sql:
+            for row in rows:
+                self._exec(
+                    table.insert()
+                    .inline()
+                    .values(
+                        **{
+                            k: (
+                                sqla_compat._literal_bindparam(
+                                    k, v, type_=table.c[k].type
+                                )
+                                if not isinstance(
+                                    v, sqla_compat._literal_bindparam
+                                )
+                                else v
+                            )
+                            for k, v in row.items()
+                        }
+                    )
+                )
+        else:
+            if rows:
+                if multiinsert:
+                    self._exec(table.insert().inline(), multiparams=rows)
+                else:
+                    for row in rows:
+                        self._exec(table.insert().inline().values(**row))
+
+    def _tokenize_column_type(self, column: Column) -> Params:
+        definition: str
+        definition = self.dialect.type_compiler.process(column.type).lower()
+
+        # tokenize the SQLAlchemy-generated version of a type, so that
+        # the two can be compared.
+        #
+        # examples:
+        # NUMERIC(10, 5)
+        # TIMESTAMP WITH TIMEZONE
+        # INTEGER UNSIGNED
+        # INTEGER (10) UNSIGNED
+        # INTEGER(10) UNSIGNED
+        # varchar character set utf8
+        #
+
+        tokens: List[str] = re.findall(r"[\w\-_]+|\(.+?\)", definition)
+
+        term_tokens: List[str] = []
+        paren_term = None
+
+        for token in tokens:
+            if re.match(r"^\(.*\)$", token):
+                paren_term = token
+            else:
+                term_tokens.append(token)
+
+        params = Params(term_tokens[0], term_tokens[1:], [], {})
+
+        if paren_term:
+            term: str
+            for term in re.findall("[^(),]+", paren_term):
+                if "=" in term:
+                    key, val = term.split("=")
+                    params.kwargs[key.strip()] = val.strip()
+                else:
+                    params.args.append(term.strip())
+
+        return params
+
+    def _column_types_match(
+        self, inspector_params: Params, metadata_params: Params
+    ) -> bool:
+        if inspector_params.token0 == metadata_params.token0:
+            return True
+
+        synonyms = [{t.lower() for t in batch} for batch in self.type_synonyms]
+        inspector_all_terms = " ".join(
+            [inspector_params.token0] + inspector_params.tokens
+        )
+        metadata_all_terms = " ".join(
+            [metadata_params.token0] + metadata_params.tokens
+        )
+
+        for batch in synonyms:
+            if {inspector_all_terms, metadata_all_terms}.issubset(batch) or {
+                inspector_params.token0,
+                metadata_params.token0,
+            }.issubset(batch):
+                return True
+        return False
+
+    def _column_args_match(
+        self, inspected_params: Params, meta_params: Params
+    ) -> bool:
+        """We want to compare column parameters. However, we only want
+        to compare parameters that are set. If they both have `collation`,
+        we want to make sure they are the same. However, if only one
+        specifies it, dont flag it for being less specific
+        """
+
+        if (
+            len(meta_params.tokens) == len(inspected_params.tokens)
+            and meta_params.tokens != inspected_params.tokens
+        ):
+            return False
+
+        if (
+            len(meta_params.args) == len(inspected_params.args)
+            and meta_params.args != inspected_params.args
+        ):
+            return False
+
+        insp = " ".join(inspected_params.tokens).lower()
+        meta = " ".join(meta_params.tokens).lower()
+
+        for reg in self.type_arg_extract:
+            mi = re.search(reg, insp)
+            mm = re.search(reg, meta)
+
+            if mi and mm and mi.group(1) != mm.group(1):
+                return False
+
+        return True
+
+    def compare_type(
+        self, inspector_column: Column[Any], metadata_column: Column
+    ) -> bool:
+        """Returns True if there ARE differences between the types of the two
+        columns. Takes impl.type_synonyms into account between retrospected
+        and metadata types
+        """
+        inspector_params = self._tokenize_column_type(inspector_column)
+        metadata_params = self._tokenize_column_type(metadata_column)
+
+        if not self._column_types_match(inspector_params, metadata_params):
+            return True
+        if not self._column_args_match(inspector_params, metadata_params):
+            return True
+        return False
+
+    def compare_server_default(
+        self,
+        inspector_column,
+        metadata_column,
+        rendered_metadata_default,
+        rendered_inspector_default,
+    ):
+        return rendered_inspector_default != rendered_metadata_default
+
+    def correct_for_autogen_constraints(
+        self,
+        conn_uniques: Set[UniqueConstraint],
+        conn_indexes: Set[Index],
+        metadata_unique_constraints: Set[UniqueConstraint],
+        metadata_indexes: Set[Index],
+    ) -> None:
+        pass
+
+    def cast_for_batch_migrate(self, existing, existing_transfer, new_type):
+        if existing.type._type_affinity is not new_type._type_affinity:
+            existing_transfer["expr"] = cast(
+                existing_transfer["expr"], new_type
+            )
+
+    def render_ddl_sql_expr(
+        self, expr: ClauseElement, is_server_default: bool = False, **kw: Any
+    ) -> str:
+        """Render a SQL expression that is typically a server default,
+        index expression, etc.
+
+        """
+
+        compile_kw = {"literal_binds": True, "include_table": False}
+
+        return str(
+            expr.compile(dialect=self.dialect, compile_kwargs=compile_kw)
+        )
+
+    def _compat_autogen_column_reflect(self, inspector: Inspector) -> Callable:
+        return self.autogen_column_reflect
+
+    def correct_for_autogen_foreignkeys(
+        self,
+        conn_fks: Set[ForeignKeyConstraint],
+        metadata_fks: Set[ForeignKeyConstraint],
+    ) -> None:
+        pass
+
+    def autogen_column_reflect(self, inspector, table, column_info):
+        """A hook that is attached to the 'column_reflect' event for when
+        a Table is reflected from the database during the autogenerate
+        process.
+
+        Dialects can elect to modify the information gathered here.
+
+        """
+
+    def start_migrations(self) -> None:
+        """A hook called when :meth:`.EnvironmentContext.run_migrations`
+        is called.
+
+        Implementations can set up per-migration-run state here.
+
+        """
+
+    def emit_begin(self) -> None:
+        """Emit the string ``BEGIN``, or the backend-specific
+        equivalent, on the current connection context.
+
+        This is used in offline mode and typically
+        via :meth:`.EnvironmentContext.begin_transaction`.
+
+        """
+        self.static_output("BEGIN" + self.command_terminator)
+
+    def emit_commit(self) -> None:
+        """Emit the string ``COMMIT``, or the backend-specific
+        equivalent, on the current connection context.
+
+        This is used in offline mode and typically
+        via :meth:`.EnvironmentContext.begin_transaction`.
+
+        """
+        self.static_output("COMMIT" + self.command_terminator)
+
+    def render_type(
+        self, type_obj: TypeEngine, autogen_context: AutogenContext
+    ) -> Union[str, Literal[False]]:
+        return False
+
+    def _compare_identity_default(self, metadata_identity, inspector_identity):
+        # ignored contains the attributes that were not considered
+        # because assumed to their default values in the db.
+        diff, ignored = _compare_identity_options(
+            metadata_identity,
+            inspector_identity,
+            schema.Identity(),
+            skip={"always"},
+        )
+
+        meta_always = getattr(metadata_identity, "always", None)
+        inspector_always = getattr(inspector_identity, "always", None)
+        # None and False are the same in this comparison
+        if bool(meta_always) != bool(inspector_always):
+            diff.add("always")
+
+        diff.difference_update(self.identity_attrs_ignore)
+
+        # returns 3 values:
+        return (
+            # different identity attributes
+            diff,
+            # ignored identity attributes
+            ignored,
+            # if the two identity should be considered different
+            bool(diff) or bool(metadata_identity) != bool(inspector_identity),
+        )
+
+    def _compare_index_unique(
+        self, metadata_index: Index, reflected_index: Index
+    ) -> Optional[str]:
+        conn_unique = bool(reflected_index.unique)
+        meta_unique = bool(metadata_index.unique)
+        if conn_unique != meta_unique:
+            return f"unique={conn_unique} to unique={meta_unique}"
+        else:
+            return None
+
+    def _create_metadata_constraint_sig(
+        self, constraint: _autogen._C, **opts: Any
+    ) -> _constraint_sig[_autogen._C]:
+        return _constraint_sig.from_constraint(True, self, constraint, **opts)
+
+    def _create_reflected_constraint_sig(
+        self, constraint: _autogen._C, **opts: Any
+    ) -> _constraint_sig[_autogen._C]:
+        return _constraint_sig.from_constraint(False, self, constraint, **opts)
+
+    def compare_indexes(
+        self,
+        metadata_index: Index,
+        reflected_index: Index,
+    ) -> ComparisonResult:
+        """Compare two indexes by comparing the signature generated by
+        ``create_index_sig``.
+
+        This method returns a ``ComparisonResult``.
+        """
+        msg: List[str] = []
+        unique_msg = self._compare_index_unique(
+            metadata_index, reflected_index
+        )
+        if unique_msg:
+            msg.append(unique_msg)
+        m_sig = self._create_metadata_constraint_sig(metadata_index)
+        r_sig = self._create_reflected_constraint_sig(reflected_index)
+
+        assert _autogen.is_index_sig(m_sig)
+        assert _autogen.is_index_sig(r_sig)
+
+        # The assumption is that the index have no expression
+        for sig in m_sig, r_sig:
+            if sig.has_expressions:
+                log.warning(
+                    "Generating approximate signature for index %s. "
+                    "The dialect "
+                    "implementation should either skip expression indexes "
+                    "or provide a custom implementation.",
+                    sig.const,
+                )
+
+        if m_sig.column_names != r_sig.column_names:
+            msg.append(
+                f"expression {r_sig.column_names} to {m_sig.column_names}"
+            )
+
+        if msg:
+            return ComparisonResult.Different(msg)
+        else:
+            return ComparisonResult.Equal()
+
+    def compare_unique_constraint(
+        self,
+        metadata_constraint: UniqueConstraint,
+        reflected_constraint: UniqueConstraint,
+    ) -> ComparisonResult:
+        """Compare two unique constraints by comparing the two signatures.
+
+        The arguments are two tuples that contain the unique constraint and
+        the signatures generated by ``create_unique_constraint_sig``.
+
+        This method returns a ``ComparisonResult``.
+        """
+        metadata_tup = self._create_metadata_constraint_sig(
+            metadata_constraint
+        )
+        reflected_tup = self._create_reflected_constraint_sig(
+            reflected_constraint
+        )
+
+        meta_sig = metadata_tup.unnamed
+        conn_sig = reflected_tup.unnamed
+        if conn_sig != meta_sig:
+            return ComparisonResult.Different(
+                f"expression {conn_sig} to {meta_sig}"
+            )
+        else:
+            return ComparisonResult.Equal()
+
+    def _skip_functional_indexes(self, metadata_indexes, conn_indexes):
+        conn_indexes_by_name = {c.name: c for c in conn_indexes}
+
+        for idx in list(metadata_indexes):
+            if idx.name in conn_indexes_by_name:
+                continue
+            iex = sqla_compat.is_expression_index(idx)
+            if iex:
+                util.warn(
+                    "autogenerate skipping metadata-specified "
+                    "expression-based index "
+                    f"{idx.name!r}; dialect {self.__dialect__!r} under "
+                    f"SQLAlchemy {sqla_compat.sqlalchemy_version} can't "
+                    "reflect these indexes so they can't be compared"
+                )
+                metadata_indexes.discard(idx)
+
+    def adjust_reflected_dialect_options(
+        self, reflected_object: Dict[str, Any], kind: str
+    ) -> Dict[str, Any]:
+        return reflected_object.get("dialect_options", {})
+
+
+class Params(NamedTuple):
+    token0: str
+    tokens: List[str]
+    args: List[str]
+    kwargs: Dict[str, str]
+
+
+def _compare_identity_options(
+    metadata_io: Union[schema.Identity, schema.Sequence, None],
+    inspector_io: Union[schema.Identity, schema.Sequence, None],
+    default_io: Union[schema.Identity, schema.Sequence],
+    skip: Set[str],
+):
+    # this can be used for identity or sequence compare.
+    # default_io is an instance of IdentityOption with all attributes to the
+    # default value.
+    meta_d = sqla_compat._get_identity_options_dict(metadata_io)
+    insp_d = sqla_compat._get_identity_options_dict(inspector_io)
+
+    diff = set()
+    ignored_attr = set()
+
+    def check_dicts(
+        meta_dict: Mapping[str, Any],
+        insp_dict: Mapping[str, Any],
+        default_dict: Mapping[str, Any],
+        attrs: Iterable[str],
+    ):
+        for attr in set(attrs).difference(skip):
+            meta_value = meta_dict.get(attr)
+            insp_value = insp_dict.get(attr)
+            if insp_value != meta_value:
+                default_value = default_dict.get(attr)
+                if meta_value == default_value:
+                    ignored_attr.add(attr)
+                else:
+                    diff.add(attr)
+
+    check_dicts(
+        meta_d,
+        insp_d,
+        sqla_compat._get_identity_options_dict(default_io),
+        set(meta_d).union(insp_d),
+    )
+    if sqla_compat.identity_has_dialect_kwargs:
+        assert hasattr(default_io, "dialect_kwargs")
+        # use only the dialect kwargs in inspector_io since metadata_io
+        # can have options for many backends
+        check_dicts(
+            getattr(metadata_io, "dialect_kwargs", {}),
+            getattr(inspector_io, "dialect_kwargs", {}),
+            default_io.dialect_kwargs,
+            getattr(inspector_io, "dialect_kwargs", {}),
+        )
+
+    return diff, ignored_attr