aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/alembic/autogenerate
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/alembic/autogenerate
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are hereHEADmaster
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/autogenerate')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/autogenerate/__init__.py10
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py650
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/autogenerate/compare.py1317
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/autogenerate/render.py1125
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/autogenerate/rewriter.py240
5 files changed, 3342 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/autogenerate/__init__.py b/.venv/lib/python3.12/site-packages/alembic/autogenerate/__init__.py
new file mode 100644
index 00000000..445ddb25
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/autogenerate/__init__.py
@@ -0,0 +1,10 @@
+from .api import _render_migration_diffs as _render_migration_diffs
+from .api import compare_metadata as compare_metadata
+from .api import produce_migrations as produce_migrations
+from .api import render_python_code as render_python_code
+from .api import RevisionContext as RevisionContext
+from .compare import _produce_net_changes as _produce_net_changes
+from .compare import comparators as comparators
+from .render import render_op_text as render_op_text
+from .render import renderers as renderers
+from .rewriter import Rewriter as Rewriter
diff --git a/.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py b/.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py
new file mode 100644
index 00000000..811462e8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py
@@ -0,0 +1,650 @@
+from __future__ import annotations
+
+import contextlib
+from typing import Any
+from typing import Dict
+from typing import Iterator
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Set
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy import inspect
+
+from . import compare
+from . import render
+from .. import util
+from ..operations import ops
+from ..util import sqla_compat
+
+"""Provide the 'autogenerate' feature which can produce migration operations
+automatically."""
+
+if TYPE_CHECKING:
+ from sqlalchemy.engine import Connection
+ from sqlalchemy.engine import Dialect
+ from sqlalchemy.engine import Inspector
+ from sqlalchemy.sql.schema import MetaData
+ from sqlalchemy.sql.schema import SchemaItem
+ from sqlalchemy.sql.schema import Table
+
+ from ..config import Config
+ from ..operations.ops import DowngradeOps
+ from ..operations.ops import MigrationScript
+ from ..operations.ops import UpgradeOps
+ from ..runtime.environment import NameFilterParentNames
+ from ..runtime.environment import NameFilterType
+ from ..runtime.environment import ProcessRevisionDirectiveFn
+ from ..runtime.environment import RenderItemFn
+ from ..runtime.migration import MigrationContext
+ from ..script.base import Script
+ from ..script.base import ScriptDirectory
+ from ..script.revision import _GetRevArg
+
+
+def compare_metadata(context: MigrationContext, metadata: MetaData) -> Any:
+ """Compare a database schema to that given in a
+ :class:`~sqlalchemy.schema.MetaData` instance.
+
+ The database connection is presented in the context
+ of a :class:`.MigrationContext` object, which
+ provides database connectivity as well as optional
+ comparison functions to use for datatypes and
+ server defaults - see the "autogenerate" arguments
+ at :meth:`.EnvironmentContext.configure`
+ for details on these.
+
+ The return format is a list of "diff" directives,
+ each representing individual differences::
+
+ from alembic.migration import MigrationContext
+ from alembic.autogenerate import compare_metadata
+ from sqlalchemy import (
+ create_engine,
+ MetaData,
+ Column,
+ Integer,
+ String,
+ Table,
+ text,
+ )
+ import pprint
+
+ engine = create_engine("sqlite://")
+
+ with engine.begin() as conn:
+ conn.execute(
+ text(
+ '''
+ create table foo (
+ id integer not null primary key,
+ old_data varchar,
+ x integer
+ )
+ '''
+ )
+ )
+ conn.execute(text("create table bar (data varchar)"))
+
+ metadata = MetaData()
+ Table(
+ "foo",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", Integer),
+ Column("x", Integer, nullable=False),
+ )
+ Table("bat", metadata, Column("info", String))
+
+ mc = MigrationContext.configure(engine.connect())
+
+ diff = compare_metadata(mc, metadata)
+ pprint.pprint(diff, indent=2, width=20)
+
+ Output::
+
+ [
+ (
+ "add_table",
+ Table(
+ "bat",
+ MetaData(),
+ Column("info", String(), table=<bat>),
+ schema=None,
+ ),
+ ),
+ (
+ "remove_table",
+ Table(
+ "bar",
+ MetaData(),
+ Column("data", VARCHAR(), table=<bar>),
+ schema=None,
+ ),
+ ),
+ (
+ "add_column",
+ None,
+ "foo",
+ Column("data", Integer(), table=<foo>),
+ ),
+ [
+ (
+ "modify_nullable",
+ None,
+ "foo",
+ "x",
+ {
+ "existing_comment": None,
+ "existing_server_default": False,
+ "existing_type": INTEGER(),
+ },
+ True,
+ False,
+ )
+ ],
+ (
+ "remove_column",
+ None,
+ "foo",
+ Column("old_data", VARCHAR(), table=<foo>),
+ ),
+ ]
+
+ :param context: a :class:`.MigrationContext`
+ instance.
+ :param metadata: a :class:`~sqlalchemy.schema.MetaData`
+ instance.
+
+ .. seealso::
+
+ :func:`.produce_migrations` - produces a :class:`.MigrationScript`
+ structure based on metadata comparison.
+
+ """
+
+ migration_script = produce_migrations(context, metadata)
+ assert migration_script.upgrade_ops is not None
+ return migration_script.upgrade_ops.as_diffs()
+
+
+def produce_migrations(
+ context: MigrationContext, metadata: MetaData
+) -> MigrationScript:
+ """Produce a :class:`.MigrationScript` structure based on schema
+ comparison.
+
+ This function does essentially what :func:`.compare_metadata` does,
+ but then runs the resulting list of diffs to produce the full
+ :class:`.MigrationScript` object. For an example of what this looks like,
+ see the example in :ref:`customizing_revision`.
+
+ .. seealso::
+
+ :func:`.compare_metadata` - returns more fundamental "diff"
+ data from comparing a schema.
+
+ """
+
+ autogen_context = AutogenContext(context, metadata=metadata)
+
+ migration_script = ops.MigrationScript(
+ rev_id=None,
+ upgrade_ops=ops.UpgradeOps([]),
+ downgrade_ops=ops.DowngradeOps([]),
+ )
+
+ compare._populate_migration_script(autogen_context, migration_script)
+
+ return migration_script
+
+
+def render_python_code(
+ up_or_down_op: Union[UpgradeOps, DowngradeOps],
+ sqlalchemy_module_prefix: str = "sa.",
+ alembic_module_prefix: str = "op.",
+ render_as_batch: bool = False,
+ imports: Sequence[str] = (),
+ render_item: Optional[RenderItemFn] = None,
+ migration_context: Optional[MigrationContext] = None,
+ user_module_prefix: Optional[str] = None,
+) -> str:
+ """Render Python code given an :class:`.UpgradeOps` or
+ :class:`.DowngradeOps` object.
+
+ This is a convenience function that can be used to test the
+ autogenerate output of a user-defined :class:`.MigrationScript` structure.
+
+ :param up_or_down_op: :class:`.UpgradeOps` or :class:`.DowngradeOps` object
+ :param sqlalchemy_module_prefix: module prefix for SQLAlchemy objects
+ :param alembic_module_prefix: module prefix for Alembic constructs
+ :param render_as_batch: use "batch operations" style for rendering
+ :param imports: sequence of import symbols to add
+ :param render_item: callable to render items
+ :param migration_context: optional :class:`.MigrationContext`
+ :param user_module_prefix: optional string prefix for user-defined types
+
+ .. versionadded:: 1.11.0
+
+ """
+ opts = {
+ "sqlalchemy_module_prefix": sqlalchemy_module_prefix,
+ "alembic_module_prefix": alembic_module_prefix,
+ "render_item": render_item,
+ "render_as_batch": render_as_batch,
+ "user_module_prefix": user_module_prefix,
+ }
+
+ if migration_context is None:
+ from ..runtime.migration import MigrationContext
+ from sqlalchemy.engine.default import DefaultDialect
+
+ migration_context = MigrationContext.configure(
+ dialect=DefaultDialect()
+ )
+
+ autogen_context = AutogenContext(migration_context, opts=opts)
+ autogen_context.imports = set(imports)
+ return render._indent(
+ render._render_cmd_body(up_or_down_op, autogen_context)
+ )
+
+
+def _render_migration_diffs(
+ context: MigrationContext, template_args: Dict[Any, Any]
+) -> None:
+ """legacy, used by test_autogen_composition at the moment"""
+
+ autogen_context = AutogenContext(context)
+
+ upgrade_ops = ops.UpgradeOps([])
+ compare._produce_net_changes(autogen_context, upgrade_ops)
+
+ migration_script = ops.MigrationScript(
+ rev_id=None,
+ upgrade_ops=upgrade_ops,
+ downgrade_ops=upgrade_ops.reverse(),
+ )
+
+ render._render_python_into_templatevars(
+ autogen_context, migration_script, template_args
+ )
+
+
+class AutogenContext:
+ """Maintains configuration and state that's specific to an
+ autogenerate operation."""
+
+ metadata: Union[MetaData, Sequence[MetaData], None] = None
+ """The :class:`~sqlalchemy.schema.MetaData` object
+ representing the destination.
+
+ This object is the one that is passed within ``env.py``
+ to the :paramref:`.EnvironmentContext.configure.target_metadata`
+ parameter. It represents the structure of :class:`.Table` and other
+ objects as stated in the current database model, and represents the
+ destination structure for the database being examined.
+
+ While the :class:`~sqlalchemy.schema.MetaData` object is primarily
+ known as a collection of :class:`~sqlalchemy.schema.Table` objects,
+ it also has an :attr:`~sqlalchemy.schema.MetaData.info` dictionary
+ that may be used by end-user schemes to store additional schema-level
+ objects that are to be compared in custom autogeneration schemes.
+
+ """
+
+ connection: Optional[Connection] = None
+ """The :class:`~sqlalchemy.engine.base.Connection` object currently
+ connected to the database backend being compared.
+
+ This is obtained from the :attr:`.MigrationContext.bind` and is
+ ultimately set up in the ``env.py`` script.
+
+ """
+
+ dialect: Optional[Dialect] = None
+ """The :class:`~sqlalchemy.engine.Dialect` object currently in use.
+
+ This is normally obtained from the
+ :attr:`~sqlalchemy.engine.base.Connection.dialect` attribute.
+
+ """
+
+ imports: Set[str] = None # type: ignore[assignment]
+ """A ``set()`` which contains string Python import directives.
+
+ The directives are to be rendered into the ``${imports}`` section
+ of a script template. The set is normally empty and can be modified
+ within hooks such as the
+ :paramref:`.EnvironmentContext.configure.render_item` hook.
+
+ .. seealso::
+
+ :ref:`autogen_render_types`
+
+ """
+
+ migration_context: MigrationContext = None # type: ignore[assignment]
+ """The :class:`.MigrationContext` established by the ``env.py`` script."""
+
+ def __init__(
+ self,
+ migration_context: MigrationContext,
+ metadata: Union[MetaData, Sequence[MetaData], None] = None,
+ opts: Optional[Dict[str, Any]] = None,
+ autogenerate: bool = True,
+ ) -> None:
+ if (
+ autogenerate
+ and migration_context is not None
+ and migration_context.as_sql
+ ):
+ raise util.CommandError(
+ "autogenerate can't use as_sql=True as it prevents querying "
+ "the database for schema information"
+ )
+
+ if opts is None:
+ opts = migration_context.opts
+
+ self.metadata = metadata = (
+ opts.get("target_metadata", None) if metadata is None else metadata
+ )
+
+ if (
+ autogenerate
+ and metadata is None
+ and migration_context is not None
+ and migration_context.script is not None
+ ):
+ raise util.CommandError(
+ "Can't proceed with --autogenerate option; environment "
+ "script %s does not provide "
+ "a MetaData object or sequence of objects to the context."
+ % (migration_context.script.env_py_location)
+ )
+
+ include_object = opts.get("include_object", None)
+ include_name = opts.get("include_name", None)
+
+ object_filters = []
+ name_filters = []
+ if include_object:
+ object_filters.append(include_object)
+ if include_name:
+ name_filters.append(include_name)
+
+ self._object_filters = object_filters
+ self._name_filters = name_filters
+
+ self.migration_context = migration_context
+ if self.migration_context is not None:
+ self.connection = self.migration_context.bind
+ self.dialect = self.migration_context.dialect
+
+ self.imports = set()
+ self.opts: Dict[str, Any] = opts
+ self._has_batch: bool = False
+
+ @util.memoized_property
+ def inspector(self) -> Inspector:
+ if self.connection is None:
+ raise TypeError(
+ "can't return inspector as this "
+ "AutogenContext has no database connection"
+ )
+ return inspect(self.connection)
+
+ @contextlib.contextmanager
+ def _within_batch(self) -> Iterator[None]:
+ self._has_batch = True
+ yield
+ self._has_batch = False
+
+ def run_name_filters(
+ self,
+ name: Optional[str],
+ type_: NameFilterType,
+ parent_names: NameFilterParentNames,
+ ) -> bool:
+ """Run the context's name filters and return True if the targets
+ should be part of the autogenerate operation.
+
+ This method should be run for every kind of name encountered within the
+ reflection side of an autogenerate operation, giving the environment
+ the chance to filter what names should be reflected as database
+ objects. The filters here are produced directly via the
+ :paramref:`.EnvironmentContext.configure.include_name` parameter.
+
+ """
+ if "schema_name" in parent_names:
+ if type_ == "table":
+ table_name = name
+ else:
+ table_name = parent_names.get("table_name", None)
+ if table_name:
+ schema_name = parent_names["schema_name"]
+ if schema_name:
+ parent_names["schema_qualified_table_name"] = "%s.%s" % (
+ schema_name,
+ table_name,
+ )
+ else:
+ parent_names["schema_qualified_table_name"] = table_name
+
+ for fn in self._name_filters:
+ if not fn(name, type_, parent_names):
+ return False
+ else:
+ return True
+
+ def run_object_filters(
+ self,
+ object_: SchemaItem,
+ name: sqla_compat._ConstraintName,
+ type_: NameFilterType,
+ reflected: bool,
+ compare_to: Optional[SchemaItem],
+ ) -> bool:
+ """Run the context's object filters and return True if the targets
+ should be part of the autogenerate operation.
+
+ This method should be run for every kind of object encountered within
+ an autogenerate operation, giving the environment the chance
+ to filter what objects should be included in the comparison.
+ The filters here are produced directly via the
+ :paramref:`.EnvironmentContext.configure.include_object` parameter.
+
+ """
+ for fn in self._object_filters:
+ if not fn(object_, name, type_, reflected, compare_to):
+ return False
+ else:
+ return True
+
+ run_filters = run_object_filters
+
+ @util.memoized_property
+ def sorted_tables(self) -> List[Table]:
+ """Return an aggregate of the :attr:`.MetaData.sorted_tables`
+ collection(s).
+
+ For a sequence of :class:`.MetaData` objects, this
+ concatenates the :attr:`.MetaData.sorted_tables` collection
+ for each individual :class:`.MetaData` in the order of the
+ sequence. It does **not** collate the sorted tables collections.
+
+ """
+ result = []
+ for m in util.to_list(self.metadata):
+ result.extend(m.sorted_tables)
+ return result
+
+ @util.memoized_property
+ def table_key_to_table(self) -> Dict[str, Table]:
+ """Return an aggregate of the :attr:`.MetaData.tables` dictionaries.
+
+ The :attr:`.MetaData.tables` collection is a dictionary of table key
+ to :class:`.Table`; this method aggregates the dictionary across
+ multiple :class:`.MetaData` objects into one dictionary.
+
+ Duplicate table keys are **not** supported; if two :class:`.MetaData`
+ objects contain the same table key, an exception is raised.
+
+ """
+ result: Dict[str, Table] = {}
+ for m in util.to_list(self.metadata):
+ intersect = set(result).intersection(set(m.tables))
+ if intersect:
+ raise ValueError(
+ "Duplicate table keys across multiple "
+ "MetaData objects: %s"
+ % (", ".join('"%s"' % key for key in sorted(intersect)))
+ )
+
+ result.update(m.tables)
+ return result
+
+
+class RevisionContext:
+ """Maintains configuration and state that's specific to a revision
+ file generation operation."""
+
+ generated_revisions: List[MigrationScript]
+ process_revision_directives: Optional[ProcessRevisionDirectiveFn]
+
+ def __init__(
+ self,
+ config: Config,
+ script_directory: ScriptDirectory,
+ command_args: Dict[str, Any],
+ process_revision_directives: Optional[
+ ProcessRevisionDirectiveFn
+ ] = None,
+ ) -> None:
+ self.config = config
+ self.script_directory = script_directory
+ self.command_args = command_args
+ self.process_revision_directives = process_revision_directives
+ self.template_args = {
+ "config": config # Let templates use config for
+ # e.g. multiple databases
+ }
+ self.generated_revisions = [self._default_revision()]
+
+ def _to_script(
+ self, migration_script: MigrationScript
+ ) -> Optional[Script]:
+ template_args: Dict[str, Any] = self.template_args.copy()
+
+ if getattr(migration_script, "_needs_render", False):
+ autogen_context = self._last_autogen_context
+
+ # clear out existing imports if we are doing multiple
+ # renders
+ autogen_context.imports = set()
+ if migration_script.imports:
+ autogen_context.imports.update(migration_script.imports)
+ render._render_python_into_templatevars(
+ autogen_context, migration_script, template_args
+ )
+
+ assert migration_script.rev_id is not None
+ return self.script_directory.generate_revision(
+ migration_script.rev_id,
+ migration_script.message,
+ refresh=True,
+ head=migration_script.head,
+ splice=migration_script.splice,
+ branch_labels=migration_script.branch_label,
+ version_path=migration_script.version_path,
+ depends_on=migration_script.depends_on,
+ **template_args,
+ )
+
+ def run_autogenerate(
+ self, rev: _GetRevArg, migration_context: MigrationContext
+ ) -> None:
+ self._run_environment(rev, migration_context, True)
+
+ def run_no_autogenerate(
+ self, rev: _GetRevArg, migration_context: MigrationContext
+ ) -> None:
+ self._run_environment(rev, migration_context, False)
+
+ def _run_environment(
+ self,
+ rev: _GetRevArg,
+ migration_context: MigrationContext,
+ autogenerate: bool,
+ ) -> None:
+ if autogenerate:
+ if self.command_args["sql"]:
+ raise util.CommandError(
+ "Using --sql with --autogenerate does not make any sense"
+ )
+ if set(self.script_directory.get_revisions(rev)) != set(
+ self.script_directory.get_revisions("heads")
+ ):
+ raise util.CommandError("Target database is not up to date.")
+
+ upgrade_token = migration_context.opts["upgrade_token"]
+ downgrade_token = migration_context.opts["downgrade_token"]
+
+ migration_script = self.generated_revisions[-1]
+ if not getattr(migration_script, "_needs_render", False):
+ migration_script.upgrade_ops_list[-1].upgrade_token = upgrade_token
+ migration_script.downgrade_ops_list[-1].downgrade_token = (
+ downgrade_token
+ )
+ migration_script._needs_render = True
+ else:
+ migration_script._upgrade_ops.append(
+ ops.UpgradeOps([], upgrade_token=upgrade_token)
+ )
+ migration_script._downgrade_ops.append(
+ ops.DowngradeOps([], downgrade_token=downgrade_token)
+ )
+
+ autogen_context = AutogenContext(
+ migration_context, autogenerate=autogenerate
+ )
+ self._last_autogen_context: AutogenContext = autogen_context
+
+ if autogenerate:
+ compare._populate_migration_script(
+ autogen_context, migration_script
+ )
+
+ if self.process_revision_directives:
+ self.process_revision_directives(
+ migration_context, rev, self.generated_revisions
+ )
+
+ hook = migration_context.opts["process_revision_directives"]
+ if hook:
+ hook(migration_context, rev, self.generated_revisions)
+
+ for migration_script in self.generated_revisions:
+ migration_script._needs_render = True
+
+ def _default_revision(self) -> MigrationScript:
+ command_args: Dict[str, Any] = self.command_args
+ op = ops.MigrationScript(
+ rev_id=command_args["rev_id"] or util.rev_id(),
+ message=command_args["message"],
+ upgrade_ops=ops.UpgradeOps([]),
+ downgrade_ops=ops.DowngradeOps([]),
+ head=command_args["head"],
+ splice=command_args["splice"],
+ branch_label=command_args["branch_label"],
+ version_path=command_args["version_path"],
+ depends_on=command_args["depends_on"],
+ )
+ return op
+
+ def generate_scripts(self) -> Iterator[Optional[Script]]:
+ for generated_revision in self.generated_revisions:
+ yield self._to_script(generated_revision)
diff --git a/.venv/lib/python3.12/site-packages/alembic/autogenerate/compare.py b/.venv/lib/python3.12/site-packages/alembic/autogenerate/compare.py
new file mode 100644
index 00000000..8d6d8f1b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/autogenerate/compare.py
@@ -0,0 +1,1317 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from __future__ import annotations
+
+import contextlib
+import logging
+import re
+from typing import Any
+from typing import cast
+from typing import Dict
+from typing import Iterator
+from typing import Mapping
+from typing import Optional
+from typing import Set
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from sqlalchemy import event
+from sqlalchemy import inspect
+from sqlalchemy import schema as sa_schema
+from sqlalchemy import text
+from sqlalchemy import types as sqltypes
+from sqlalchemy.sql import expression
+from sqlalchemy.sql.schema import ForeignKeyConstraint
+from sqlalchemy.sql.schema import Index
+from sqlalchemy.sql.schema import UniqueConstraint
+from sqlalchemy.util import OrderedSet
+
+from .. import util
+from ..ddl._autogen import is_index_sig
+from ..ddl._autogen import is_uq_sig
+from ..operations import ops
+from ..util import sqla_compat
+
+if TYPE_CHECKING:
+ from typing import Literal
+
+ from sqlalchemy.engine.reflection import Inspector
+ from sqlalchemy.sql.elements import quoted_name
+ from sqlalchemy.sql.elements import TextClause
+ from sqlalchemy.sql.schema import Column
+ from sqlalchemy.sql.schema import Table
+
+ from alembic.autogenerate.api import AutogenContext
+ from alembic.ddl.impl import DefaultImpl
+ from alembic.operations.ops import AlterColumnOp
+ from alembic.operations.ops import MigrationScript
+ from alembic.operations.ops import ModifyTableOps
+ from alembic.operations.ops import UpgradeOps
+ from ..ddl._autogen import _constraint_sig
+
+
+log = logging.getLogger(__name__)
+
+
+def _populate_migration_script(
+ autogen_context: AutogenContext, migration_script: MigrationScript
+) -> None:
+ upgrade_ops = migration_script.upgrade_ops_list[-1]
+ downgrade_ops = migration_script.downgrade_ops_list[-1]
+
+ _produce_net_changes(autogen_context, upgrade_ops)
+ upgrade_ops.reverse_into(downgrade_ops)
+
+
+comparators = util.Dispatcher(uselist=True)
+
+
+def _produce_net_changes(
+ autogen_context: AutogenContext, upgrade_ops: UpgradeOps
+) -> None:
+ connection = autogen_context.connection
+ assert connection is not None
+ include_schemas = autogen_context.opts.get("include_schemas", False)
+
+ inspector: Inspector = inspect(connection)
+
+ default_schema = connection.dialect.default_schema_name
+ schemas: Set[Optional[str]]
+ if include_schemas:
+ schemas = set(inspector.get_schema_names())
+ # replace default schema name with None
+ schemas.discard("information_schema")
+ # replace the "default" schema with None
+ schemas.discard(default_schema)
+ schemas.add(None)
+ else:
+ schemas = {None}
+
+ schemas = {
+ s for s in schemas if autogen_context.run_name_filters(s, "schema", {})
+ }
+
+ assert autogen_context.dialect is not None
+ comparators.dispatch("schema", autogen_context.dialect.name)(
+ autogen_context, upgrade_ops, schemas
+ )
+
+
+@comparators.dispatch_for("schema")
+def _autogen_for_tables(
+ autogen_context: AutogenContext,
+ upgrade_ops: UpgradeOps,
+ schemas: Union[Set[None], Set[Optional[str]]],
+) -> None:
+ inspector = autogen_context.inspector
+
+ conn_table_names: Set[Tuple[Optional[str], str]] = set()
+
+ version_table_schema = (
+ autogen_context.migration_context.version_table_schema
+ )
+ version_table = autogen_context.migration_context.version_table
+
+ for schema_name in schemas:
+ tables = set(inspector.get_table_names(schema=schema_name))
+ if schema_name == version_table_schema:
+ tables = tables.difference(
+ [autogen_context.migration_context.version_table]
+ )
+
+ conn_table_names.update(
+ (schema_name, tname)
+ for tname in tables
+ if autogen_context.run_name_filters(
+ tname, "table", {"schema_name": schema_name}
+ )
+ )
+
+ metadata_table_names = OrderedSet(
+ [(table.schema, table.name) for table in autogen_context.sorted_tables]
+ ).difference([(version_table_schema, version_table)])
+
+ _compare_tables(
+ conn_table_names,
+ metadata_table_names,
+ inspector,
+ upgrade_ops,
+ autogen_context,
+ )
+
+
+def _compare_tables(
+ conn_table_names: set,
+ metadata_table_names: set,
+ inspector: Inspector,
+ upgrade_ops: UpgradeOps,
+ autogen_context: AutogenContext,
+) -> None:
+ default_schema = inspector.bind.dialect.default_schema_name
+
+ # tables coming from the connection will not have "schema"
+ # set if it matches default_schema_name; so we need a list
+ # of table names from local metadata that also have "None" if schema
+ # == default_schema_name. Most setups will be like this anyway but
+ # some are not (see #170)
+ metadata_table_names_no_dflt_schema = OrderedSet(
+ [
+ (schema if schema != default_schema else None, tname)
+ for schema, tname in metadata_table_names
+ ]
+ )
+
+ # to adjust for the MetaData collection storing the tables either
+ # as "schemaname.tablename" or just "tablename", create a new lookup
+ # which will match the "non-default-schema" keys to the Table object.
+ tname_to_table = {
+ no_dflt_schema: autogen_context.table_key_to_table[
+ sa_schema._get_table_key(tname, schema)
+ ]
+ for no_dflt_schema, (schema, tname) in zip(
+ metadata_table_names_no_dflt_schema, metadata_table_names
+ )
+ }
+ metadata_table_names = metadata_table_names_no_dflt_schema
+
+ for s, tname in metadata_table_names.difference(conn_table_names):
+ name = "%s.%s" % (s, tname) if s else tname
+ metadata_table = tname_to_table[(s, tname)]
+ if autogen_context.run_object_filters(
+ metadata_table, tname, "table", False, None
+ ):
+ upgrade_ops.ops.append(
+ ops.CreateTableOp.from_table(metadata_table)
+ )
+ log.info("Detected added table %r", name)
+ modify_table_ops = ops.ModifyTableOps(tname, [], schema=s)
+
+ comparators.dispatch("table")(
+ autogen_context,
+ modify_table_ops,
+ s,
+ tname,
+ None,
+ metadata_table,
+ )
+ if not modify_table_ops.is_empty():
+ upgrade_ops.ops.append(modify_table_ops)
+
+ removal_metadata = sa_schema.MetaData()
+ for s, tname in conn_table_names.difference(metadata_table_names):
+ name = sa_schema._get_table_key(tname, s)
+ exists = name in removal_metadata.tables
+ t = sa_schema.Table(tname, removal_metadata, schema=s)
+
+ if not exists:
+ event.listen(
+ t,
+ "column_reflect",
+ # fmt: off
+ autogen_context.migration_context.impl.
+ _compat_autogen_column_reflect
+ (inspector),
+ # fmt: on
+ )
+ inspector.reflect_table(t, include_columns=None)
+ if autogen_context.run_object_filters(t, tname, "table", True, None):
+ modify_table_ops = ops.ModifyTableOps(tname, [], schema=s)
+
+ comparators.dispatch("table")(
+ autogen_context, modify_table_ops, s, tname, t, None
+ )
+ if not modify_table_ops.is_empty():
+ upgrade_ops.ops.append(modify_table_ops)
+
+ upgrade_ops.ops.append(ops.DropTableOp.from_table(t))
+ log.info("Detected removed table %r", name)
+
+ existing_tables = conn_table_names.intersection(metadata_table_names)
+
+ existing_metadata = sa_schema.MetaData()
+ conn_column_info = {}
+ for s, tname in existing_tables:
+ name = sa_schema._get_table_key(tname, s)
+ exists = name in existing_metadata.tables
+ t = sa_schema.Table(tname, existing_metadata, schema=s)
+ if not exists:
+ event.listen(
+ t,
+ "column_reflect",
+ # fmt: off
+ autogen_context.migration_context.impl.
+ _compat_autogen_column_reflect(inspector),
+ # fmt: on
+ )
+ inspector.reflect_table(t, include_columns=None)
+ conn_column_info[(s, tname)] = t
+
+ for s, tname in sorted(existing_tables, key=lambda x: (x[0] or "", x[1])):
+ s = s or None
+ name = "%s.%s" % (s, tname) if s else tname
+ metadata_table = tname_to_table[(s, tname)]
+ conn_table = existing_metadata.tables[name]
+
+ if autogen_context.run_object_filters(
+ metadata_table, tname, "table", False, conn_table
+ ):
+ modify_table_ops = ops.ModifyTableOps(tname, [], schema=s)
+ with _compare_columns(
+ s,
+ tname,
+ conn_table,
+ metadata_table,
+ modify_table_ops,
+ autogen_context,
+ inspector,
+ ):
+ comparators.dispatch("table")(
+ autogen_context,
+ modify_table_ops,
+ s,
+ tname,
+ conn_table,
+ metadata_table,
+ )
+
+ if not modify_table_ops.is_empty():
+ upgrade_ops.ops.append(modify_table_ops)
+
+
+_IndexColumnSortingOps: Mapping[str, Any] = util.immutabledict(
+ {
+ "asc": expression.asc,
+ "desc": expression.desc,
+ "nulls_first": expression.nullsfirst,
+ "nulls_last": expression.nullslast,
+ "nullsfirst": expression.nullsfirst, # 1_3 name
+ "nullslast": expression.nullslast, # 1_3 name
+ }
+)
+
+
+def _make_index(
+ impl: DefaultImpl, params: Dict[str, Any], conn_table: Table
+) -> Optional[Index]:
+ exprs: list[Union[Column[Any], TextClause]] = []
+ sorting = params.get("column_sorting")
+
+ for num, col_name in enumerate(params["column_names"]):
+ item: Union[Column[Any], TextClause]
+ if col_name is None:
+ assert "expressions" in params
+ name = params["expressions"][num]
+ item = text(name)
+ else:
+ name = col_name
+ item = conn_table.c[col_name]
+ if sorting and name in sorting:
+ for operator in sorting[name]:
+ if operator in _IndexColumnSortingOps:
+ item = _IndexColumnSortingOps[operator](item)
+ exprs.append(item)
+ ix = sa_schema.Index(
+ params["name"],
+ *exprs,
+ unique=params["unique"],
+ _table=conn_table,
+ **impl.adjust_reflected_dialect_options(params, "index"),
+ )
+ if "duplicates_constraint" in params:
+ ix.info["duplicates_constraint"] = params["duplicates_constraint"]
+ return ix
+
+
+def _make_unique_constraint(
+ impl: DefaultImpl, params: Dict[str, Any], conn_table: Table
+) -> UniqueConstraint:
+ uq = sa_schema.UniqueConstraint(
+ *[conn_table.c[cname] for cname in params["column_names"]],
+ name=params["name"],
+ **impl.adjust_reflected_dialect_options(params, "unique_constraint"),
+ )
+ if "duplicates_index" in params:
+ uq.info["duplicates_index"] = params["duplicates_index"]
+
+ return uq
+
+
+def _make_foreign_key(
+ params: Dict[str, Any], conn_table: Table
+) -> ForeignKeyConstraint:
+ tname = params["referred_table"]
+ if params["referred_schema"]:
+ tname = "%s.%s" % (params["referred_schema"], tname)
+
+ options = params.get("options", {})
+
+ const = sa_schema.ForeignKeyConstraint(
+ [conn_table.c[cname] for cname in params["constrained_columns"]],
+ ["%s.%s" % (tname, n) for n in params["referred_columns"]],
+ onupdate=options.get("onupdate"),
+ ondelete=options.get("ondelete"),
+ deferrable=options.get("deferrable"),
+ initially=options.get("initially"),
+ name=params["name"],
+ )
+ # needed by 0.7
+ conn_table.append_constraint(const)
+ return const
+
+
+@contextlib.contextmanager
+def _compare_columns(
+ schema: Optional[str],
+ tname: Union[quoted_name, str],
+ conn_table: Table,
+ metadata_table: Table,
+ modify_table_ops: ModifyTableOps,
+ autogen_context: AutogenContext,
+ inspector: Inspector,
+) -> Iterator[None]:
+ name = "%s.%s" % (schema, tname) if schema else tname
+ metadata_col_names = OrderedSet(
+ c.name for c in metadata_table.c if not c.system
+ )
+ metadata_cols_by_name = {
+ c.name: c for c in metadata_table.c if not c.system
+ }
+
+ conn_col_names = {
+ c.name: c
+ for c in conn_table.c
+ if autogen_context.run_name_filters(
+ c.name, "column", {"table_name": tname, "schema_name": schema}
+ )
+ }
+
+ for cname in metadata_col_names.difference(conn_col_names):
+ if autogen_context.run_object_filters(
+ metadata_cols_by_name[cname], cname, "column", False, None
+ ):
+ modify_table_ops.ops.append(
+ ops.AddColumnOp.from_column_and_tablename(
+ schema, tname, metadata_cols_by_name[cname]
+ )
+ )
+ log.info("Detected added column '%s.%s'", name, cname)
+
+ for colname in metadata_col_names.intersection(conn_col_names):
+ metadata_col = metadata_cols_by_name[colname]
+ conn_col = conn_table.c[colname]
+ if not autogen_context.run_object_filters(
+ metadata_col, colname, "column", False, conn_col
+ ):
+ continue
+ alter_column_op = ops.AlterColumnOp(tname, colname, schema=schema)
+
+ comparators.dispatch("column")(
+ autogen_context,
+ alter_column_op,
+ schema,
+ tname,
+ colname,
+ conn_col,
+ metadata_col,
+ )
+
+ if alter_column_op.has_changes():
+ modify_table_ops.ops.append(alter_column_op)
+
+ yield
+
+ for cname in set(conn_col_names).difference(metadata_col_names):
+ if autogen_context.run_object_filters(
+ conn_table.c[cname], cname, "column", True, None
+ ):
+ modify_table_ops.ops.append(
+ ops.DropColumnOp.from_column_and_tablename(
+ schema, tname, conn_table.c[cname]
+ )
+ )
+ log.info("Detected removed column '%s.%s'", name, cname)
+
+
+_C = TypeVar("_C", bound=Union[UniqueConstraint, ForeignKeyConstraint, Index])
+
+
+@comparators.dispatch_for("table")
+def _compare_indexes_and_uniques(
+ autogen_context: AutogenContext,
+ modify_ops: ModifyTableOps,
+ schema: Optional[str],
+ tname: Union[quoted_name, str],
+ conn_table: Optional[Table],
+ metadata_table: Optional[Table],
+) -> None:
+ inspector = autogen_context.inspector
+ is_create_table = conn_table is None
+ is_drop_table = metadata_table is None
+ impl = autogen_context.migration_context.impl
+
+ # 1a. get raw indexes and unique constraints from metadata ...
+ if metadata_table is not None:
+ metadata_unique_constraints = {
+ uq
+ for uq in metadata_table.constraints
+ if isinstance(uq, sa_schema.UniqueConstraint)
+ }
+ metadata_indexes = set(metadata_table.indexes)
+ else:
+ metadata_unique_constraints = set()
+ metadata_indexes = set()
+
+ conn_uniques = conn_indexes = frozenset() # type:ignore[var-annotated]
+
+ supports_unique_constraints = False
+
+ unique_constraints_duplicate_unique_indexes = False
+
+ if conn_table is not None:
+ # 1b. ... and from connection, if the table exists
+ try:
+ conn_uniques = inspector.get_unique_constraints( # type:ignore[assignment] # noqa
+ tname, schema=schema
+ )
+ supports_unique_constraints = True
+ except NotImplementedError:
+ pass
+ except TypeError:
+ # number of arguments is off for the base
+ # method in SQLAlchemy due to the cache decorator
+ # not being present
+ pass
+ else:
+ conn_uniques = [ # type:ignore[assignment]
+ uq
+ for uq in conn_uniques
+ if autogen_context.run_name_filters(
+ uq["name"],
+ "unique_constraint",
+ {"table_name": tname, "schema_name": schema},
+ )
+ ]
+ for uq in conn_uniques:
+ if uq.get("duplicates_index"):
+ unique_constraints_duplicate_unique_indexes = True
+ try:
+ conn_indexes = inspector.get_indexes( # type:ignore[assignment]
+ tname, schema=schema
+ )
+ except NotImplementedError:
+ pass
+ else:
+ conn_indexes = [ # type:ignore[assignment]
+ ix
+ for ix in conn_indexes
+ if autogen_context.run_name_filters(
+ ix["name"],
+ "index",
+ {"table_name": tname, "schema_name": schema},
+ )
+ ]
+
+ # 2. convert conn-level objects from raw inspector records
+ # into schema objects
+ if is_drop_table:
+ # for DROP TABLE uniques are inline, don't need them
+ conn_uniques = set() # type:ignore[assignment]
+ else:
+ conn_uniques = { # type:ignore[assignment]
+ _make_unique_constraint(impl, uq_def, conn_table)
+ for uq_def in conn_uniques
+ }
+
+ conn_indexes = { # type:ignore[assignment]
+ index
+ for index in (
+ _make_index(impl, ix, conn_table) for ix in conn_indexes
+ )
+ if index is not None
+ }
+
+ # 2a. if the dialect dupes unique indexes as unique constraints
+ # (mysql and oracle), correct for that
+
+ if unique_constraints_duplicate_unique_indexes:
+ _correct_for_uq_duplicates_uix(
+ conn_uniques,
+ conn_indexes,
+ metadata_unique_constraints,
+ metadata_indexes,
+ autogen_context.dialect,
+ impl,
+ )
+
+ # 3. give the dialect a chance to omit indexes and constraints that
+ # we know are either added implicitly by the DB or that the DB
+ # can't accurately report on
+ impl.correct_for_autogen_constraints(
+ conn_uniques, # type: ignore[arg-type]
+ conn_indexes, # type: ignore[arg-type]
+ metadata_unique_constraints,
+ metadata_indexes,
+ )
+
+ # 4. organize the constraints into "signature" collections, the
+ # _constraint_sig() objects provide a consistent facade over both
+ # Index and UniqueConstraint so we can easily work with them
+ # interchangeably
+ metadata_unique_constraints_sig = {
+ impl._create_metadata_constraint_sig(uq)
+ for uq in metadata_unique_constraints
+ }
+
+ metadata_indexes_sig = {
+ impl._create_metadata_constraint_sig(ix) for ix in metadata_indexes
+ }
+
+ conn_unique_constraints = {
+ impl._create_reflected_constraint_sig(uq) for uq in conn_uniques
+ }
+
+ conn_indexes_sig = {
+ impl._create_reflected_constraint_sig(ix) for ix in conn_indexes
+ }
+
+ # 5. index things by name, for those objects that have names
+ metadata_names = {
+ cast(str, c.md_name_to_sql_name(autogen_context)): c
+ for c in metadata_unique_constraints_sig.union(metadata_indexes_sig)
+ if c.is_named
+ }
+
+ conn_uniques_by_name: Dict[sqla_compat._ConstraintName, _constraint_sig]
+ conn_indexes_by_name: Dict[sqla_compat._ConstraintName, _constraint_sig]
+
+ conn_uniques_by_name = {c.name: c for c in conn_unique_constraints}
+ conn_indexes_by_name = {c.name: c for c in conn_indexes_sig}
+ conn_names = {
+ c.name: c
+ for c in conn_unique_constraints.union(conn_indexes_sig)
+ if sqla_compat.constraint_name_string(c.name)
+ }
+
+ doubled_constraints = {
+ name: (conn_uniques_by_name[name], conn_indexes_by_name[name])
+ for name in set(conn_uniques_by_name).intersection(
+ conn_indexes_by_name
+ )
+ }
+
+ # 6. index things by "column signature", to help with unnamed unique
+ # constraints.
+ conn_uniques_by_sig = {uq.unnamed: uq for uq in conn_unique_constraints}
+ metadata_uniques_by_sig = {
+ uq.unnamed: uq for uq in metadata_unique_constraints_sig
+ }
+ unnamed_metadata_uniques = {
+ uq.unnamed: uq
+ for uq in metadata_unique_constraints_sig
+ if not sqla_compat._constraint_is_named(
+ uq.const, autogen_context.dialect
+ )
+ }
+
+ # assumptions:
+ # 1. a unique constraint or an index from the connection *always*
+ # has a name.
+ # 2. an index on the metadata side *always* has a name.
+ # 3. a unique constraint on the metadata side *might* have a name.
+ # 4. The backend may double up indexes as unique constraints and
+ # vice versa (e.g. MySQL, Postgresql)
+
+ def obj_added(obj: _constraint_sig):
+ if is_index_sig(obj):
+ if autogen_context.run_object_filters(
+ obj.const, obj.name, "index", False, None
+ ):
+ modify_ops.ops.append(ops.CreateIndexOp.from_index(obj.const))
+ log.info(
+ "Detected added index '%r' on '%s'",
+ obj.name,
+ obj.column_names,
+ )
+ elif is_uq_sig(obj):
+ if not supports_unique_constraints:
+ # can't report unique indexes as added if we don't
+ # detect them
+ return
+ if is_create_table or is_drop_table:
+ # unique constraints are created inline with table defs
+ return
+ if autogen_context.run_object_filters(
+ obj.const, obj.name, "unique_constraint", False, None
+ ):
+ modify_ops.ops.append(
+ ops.AddConstraintOp.from_constraint(obj.const)
+ )
+ log.info(
+ "Detected added unique constraint %r on '%s'",
+ obj.name,
+ obj.column_names,
+ )
+ else:
+ assert False
+
+ def obj_removed(obj: _constraint_sig):
+ if is_index_sig(obj):
+ if obj.is_unique and not supports_unique_constraints:
+ # many databases double up unique constraints
+ # as unique indexes. without that list we can't
+ # be sure what we're doing here
+ return
+
+ if autogen_context.run_object_filters(
+ obj.const, obj.name, "index", True, None
+ ):
+ modify_ops.ops.append(ops.DropIndexOp.from_index(obj.const))
+ log.info("Detected removed index %r on %r", obj.name, tname)
+ elif is_uq_sig(obj):
+ if is_create_table or is_drop_table:
+ # if the whole table is being dropped, we don't need to
+ # consider unique constraint separately
+ return
+ if autogen_context.run_object_filters(
+ obj.const, obj.name, "unique_constraint", True, None
+ ):
+ modify_ops.ops.append(
+ ops.DropConstraintOp.from_constraint(obj.const)
+ )
+ log.info(
+ "Detected removed unique constraint %r on %r",
+ obj.name,
+ tname,
+ )
+ else:
+ assert False
+
+ def obj_changed(
+ old: _constraint_sig,
+ new: _constraint_sig,
+ msg: str,
+ ):
+ if is_index_sig(old):
+ assert is_index_sig(new)
+
+ if autogen_context.run_object_filters(
+ new.const, new.name, "index", False, old.const
+ ):
+ log.info(
+ "Detected changed index %r on %r: %s", old.name, tname, msg
+ )
+ modify_ops.ops.append(ops.DropIndexOp.from_index(old.const))
+ modify_ops.ops.append(ops.CreateIndexOp.from_index(new.const))
+ elif is_uq_sig(old):
+ assert is_uq_sig(new)
+
+ if autogen_context.run_object_filters(
+ new.const, new.name, "unique_constraint", False, old.const
+ ):
+ log.info(
+ "Detected changed unique constraint %r on %r: %s",
+ old.name,
+ tname,
+ msg,
+ )
+ modify_ops.ops.append(
+ ops.DropConstraintOp.from_constraint(old.const)
+ )
+ modify_ops.ops.append(
+ ops.AddConstraintOp.from_constraint(new.const)
+ )
+ else:
+ assert False
+
+ for removed_name in sorted(set(conn_names).difference(metadata_names)):
+ conn_obj = conn_names[removed_name]
+ if (
+ is_uq_sig(conn_obj)
+ and conn_obj.unnamed in unnamed_metadata_uniques
+ ):
+ continue
+ elif removed_name in doubled_constraints:
+ conn_uq, conn_idx = doubled_constraints[removed_name]
+ if (
+ all(
+ conn_idx.unnamed != meta_idx.unnamed
+ for meta_idx in metadata_indexes_sig
+ )
+ and conn_uq.unnamed not in metadata_uniques_by_sig
+ ):
+ obj_removed(conn_uq)
+ obj_removed(conn_idx)
+ else:
+ obj_removed(conn_obj)
+
+ for existing_name in sorted(set(metadata_names).intersection(conn_names)):
+ metadata_obj = metadata_names[existing_name]
+
+ if existing_name in doubled_constraints:
+ conn_uq, conn_idx = doubled_constraints[existing_name]
+ if is_index_sig(metadata_obj):
+ conn_obj = conn_idx
+ else:
+ conn_obj = conn_uq
+ else:
+ conn_obj = conn_names[existing_name]
+
+ if type(conn_obj) != type(metadata_obj):
+ obj_removed(conn_obj)
+ obj_added(metadata_obj)
+ else:
+ comparison = metadata_obj.compare_to_reflected(conn_obj)
+
+ if comparison.is_different:
+ # constraint are different
+ obj_changed(conn_obj, metadata_obj, comparison.message)
+ elif comparison.is_skip:
+ # constraint cannot be compared, skip them
+ thing = (
+ "index" if is_index_sig(conn_obj) else "unique constraint"
+ )
+ log.info(
+ "Cannot compare %s %r, assuming equal and skipping. %s",
+ thing,
+ conn_obj.name,
+ comparison.message,
+ )
+ else:
+ # constraint are equal
+ assert comparison.is_equal
+
+ for added_name in sorted(set(metadata_names).difference(conn_names)):
+ obj = metadata_names[added_name]
+ obj_added(obj)
+
+ for uq_sig in unnamed_metadata_uniques:
+ if uq_sig not in conn_uniques_by_sig:
+ obj_added(unnamed_metadata_uniques[uq_sig])
+
+
+def _correct_for_uq_duplicates_uix(
+ conn_unique_constraints,
+ conn_indexes,
+ metadata_unique_constraints,
+ metadata_indexes,
+ dialect,
+ impl,
+):
+ # dedupe unique indexes vs. constraints, since MySQL / Oracle
+ # doesn't really have unique constraints as a separate construct.
+ # but look in the metadata and try to maintain constructs
+ # that already seem to be defined one way or the other
+ # on that side. This logic was formerly local to MySQL dialect,
+ # generalized to Oracle and others. See #276
+
+ # resolve final rendered name for unique constraints defined in the
+ # metadata. this includes truncation of long names. naming convention
+ # names currently should already be set as cons.name, however leave this
+ # to the sqla_compat to decide.
+ metadata_cons_names = [
+ (sqla_compat._get_constraint_final_name(cons, dialect), cons)
+ for cons in metadata_unique_constraints
+ ]
+
+ metadata_uq_names = {
+ name for name, cons in metadata_cons_names if name is not None
+ }
+
+ unnamed_metadata_uqs = {
+ impl._create_metadata_constraint_sig(cons).unnamed
+ for name, cons in metadata_cons_names
+ if name is None
+ }
+
+ metadata_ix_names = {
+ sqla_compat._get_constraint_final_name(cons, dialect)
+ for cons in metadata_indexes
+ if cons.unique
+ }
+
+ # for reflection side, names are in their final database form
+ # already since they're from the database
+ conn_ix_names = {cons.name: cons for cons in conn_indexes if cons.unique}
+
+ uqs_dupe_indexes = {
+ cons.name: cons
+ for cons in conn_unique_constraints
+ if cons.info["duplicates_index"]
+ }
+
+ for overlap in uqs_dupe_indexes:
+ if overlap not in metadata_uq_names:
+ if (
+ impl._create_reflected_constraint_sig(
+ uqs_dupe_indexes[overlap]
+ ).unnamed
+ not in unnamed_metadata_uqs
+ ):
+ conn_unique_constraints.discard(uqs_dupe_indexes[overlap])
+ elif overlap not in metadata_ix_names:
+ conn_indexes.discard(conn_ix_names[overlap])
+
+
+@comparators.dispatch_for("column")
+def _compare_nullable(
+ autogen_context: AutogenContext,
+ alter_column_op: AlterColumnOp,
+ schema: Optional[str],
+ tname: Union[quoted_name, str],
+ cname: Union[quoted_name, str],
+ conn_col: Column[Any],
+ metadata_col: Column[Any],
+) -> None:
+ metadata_col_nullable = metadata_col.nullable
+ conn_col_nullable = conn_col.nullable
+ alter_column_op.existing_nullable = conn_col_nullable
+
+ if conn_col_nullable is not metadata_col_nullable:
+ if (
+ sqla_compat._server_default_is_computed(
+ metadata_col.server_default, conn_col.server_default
+ )
+ and sqla_compat._nullability_might_be_unset(metadata_col)
+ or (
+ sqla_compat._server_default_is_identity(
+ metadata_col.server_default, conn_col.server_default
+ )
+ )
+ ):
+ log.info(
+ "Ignoring nullable change on identity column '%s.%s'",
+ tname,
+ cname,
+ )
+ else:
+ alter_column_op.modify_nullable = metadata_col_nullable
+ log.info(
+ "Detected %s on column '%s.%s'",
+ "NULL" if metadata_col_nullable else "NOT NULL",
+ tname,
+ cname,
+ )
+
+
+@comparators.dispatch_for("column")
+def _setup_autoincrement(
+ autogen_context: AutogenContext,
+ alter_column_op: AlterColumnOp,
+ schema: Optional[str],
+ tname: Union[quoted_name, str],
+ cname: quoted_name,
+ conn_col: Column[Any],
+ metadata_col: Column[Any],
+) -> None:
+ if metadata_col.table._autoincrement_column is metadata_col:
+ alter_column_op.kw["autoincrement"] = True
+ elif metadata_col.autoincrement is True:
+ alter_column_op.kw["autoincrement"] = True
+ elif metadata_col.autoincrement is False:
+ alter_column_op.kw["autoincrement"] = False
+
+
+@comparators.dispatch_for("column")
+def _compare_type(
+ autogen_context: AutogenContext,
+ alter_column_op: AlterColumnOp,
+ schema: Optional[str],
+ tname: Union[quoted_name, str],
+ cname: Union[quoted_name, str],
+ conn_col: Column[Any],
+ metadata_col: Column[Any],
+) -> None:
+ conn_type = conn_col.type
+ alter_column_op.existing_type = conn_type
+ metadata_type = metadata_col.type
+ if conn_type._type_affinity is sqltypes.NullType:
+ log.info(
+ "Couldn't determine database type " "for column '%s.%s'",
+ tname,
+ cname,
+ )
+ return
+ if metadata_type._type_affinity is sqltypes.NullType:
+ log.info(
+ "Column '%s.%s' has no type within " "the model; can't compare",
+ tname,
+ cname,
+ )
+ return
+
+ isdiff = autogen_context.migration_context._compare_type(
+ conn_col, metadata_col
+ )
+
+ if isdiff:
+ alter_column_op.modify_type = metadata_type
+ log.info(
+ "Detected type change from %r to %r on '%s.%s'",
+ conn_type,
+ metadata_type,
+ tname,
+ cname,
+ )
+
+
+def _render_server_default_for_compare(
+ metadata_default: Optional[Any], autogen_context: AutogenContext
+) -> Optional[str]:
+ if isinstance(metadata_default, sa_schema.DefaultClause):
+ if isinstance(metadata_default.arg, str):
+ metadata_default = metadata_default.arg
+ else:
+ metadata_default = str(
+ metadata_default.arg.compile(
+ dialect=autogen_context.dialect,
+ compile_kwargs={"literal_binds": True},
+ )
+ )
+ if isinstance(metadata_default, str):
+ return metadata_default
+ else:
+ return None
+
+
+def _normalize_computed_default(sqltext: str) -> str:
+ """we want to warn if a computed sql expression has changed. however
+ we don't want false positives and the warning is not that critical.
+ so filter out most forms of variability from the SQL text.
+
+ """
+
+ return re.sub(r"[ \(\)'\"`\[\]\t\r\n]", "", sqltext).lower()
+
+
+def _compare_computed_default(
+ autogen_context: AutogenContext,
+ alter_column_op: AlterColumnOp,
+ schema: Optional[str],
+ tname: str,
+ cname: str,
+ conn_col: Column[Any],
+ metadata_col: Column[Any],
+) -> None:
+ rendered_metadata_default = str(
+ cast(sa_schema.Computed, metadata_col.server_default).sqltext.compile(
+ dialect=autogen_context.dialect,
+ compile_kwargs={"literal_binds": True},
+ )
+ )
+
+ # since we cannot change computed columns, we do only a crude comparison
+ # here where we try to eliminate syntactical differences in order to
+ # get a minimal comparison just to emit a warning.
+
+ rendered_metadata_default = _normalize_computed_default(
+ rendered_metadata_default
+ )
+
+ if isinstance(conn_col.server_default, sa_schema.Computed):
+ rendered_conn_default = str(
+ conn_col.server_default.sqltext.compile(
+ dialect=autogen_context.dialect,
+ compile_kwargs={"literal_binds": True},
+ )
+ )
+ if rendered_conn_default is None:
+ rendered_conn_default = ""
+ else:
+ rendered_conn_default = _normalize_computed_default(
+ rendered_conn_default
+ )
+ else:
+ rendered_conn_default = ""
+
+ if rendered_metadata_default != rendered_conn_default:
+ _warn_computed_not_supported(tname, cname)
+
+
+def _warn_computed_not_supported(tname: str, cname: str) -> None:
+ util.warn("Computed default on %s.%s cannot be modified" % (tname, cname))
+
+
+def _compare_identity_default(
+ autogen_context,
+ alter_column_op,
+ schema,
+ tname,
+ cname,
+ conn_col,
+ metadata_col,
+):
+ impl = autogen_context.migration_context.impl
+ diff, ignored_attr, is_alter = impl._compare_identity_default(
+ metadata_col.server_default, conn_col.server_default
+ )
+
+ return diff, is_alter
+
+
+@comparators.dispatch_for("column")
+def _compare_server_default(
+ autogen_context: AutogenContext,
+ alter_column_op: AlterColumnOp,
+ schema: Optional[str],
+ tname: Union[quoted_name, str],
+ cname: Union[quoted_name, str],
+ conn_col: Column[Any],
+ metadata_col: Column[Any],
+) -> Optional[bool]:
+ metadata_default = metadata_col.server_default
+ conn_col_default = conn_col.server_default
+ if conn_col_default is None and metadata_default is None:
+ return False
+
+ if sqla_compat._server_default_is_computed(metadata_default):
+ return _compare_computed_default( # type:ignore[func-returns-value]
+ autogen_context,
+ alter_column_op,
+ schema,
+ tname,
+ cname,
+ conn_col,
+ metadata_col,
+ )
+ if sqla_compat._server_default_is_computed(conn_col_default):
+ _warn_computed_not_supported(tname, cname)
+ return False
+
+ if sqla_compat._server_default_is_identity(
+ metadata_default, conn_col_default
+ ):
+ alter_column_op.existing_server_default = conn_col_default
+ diff, is_alter = _compare_identity_default(
+ autogen_context,
+ alter_column_op,
+ schema,
+ tname,
+ cname,
+ conn_col,
+ metadata_col,
+ )
+ if is_alter:
+ alter_column_op.modify_server_default = metadata_default
+ if diff:
+ log.info(
+ "Detected server default on column '%s.%s': "
+ "identity options attributes %s",
+ tname,
+ cname,
+ sorted(diff),
+ )
+ else:
+ rendered_metadata_default = _render_server_default_for_compare(
+ metadata_default, autogen_context
+ )
+
+ rendered_conn_default = (
+ cast(Any, conn_col_default).arg.text if conn_col_default else None
+ )
+
+ alter_column_op.existing_server_default = conn_col_default
+
+ is_diff = autogen_context.migration_context._compare_server_default(
+ conn_col,
+ metadata_col,
+ rendered_metadata_default,
+ rendered_conn_default,
+ )
+ if is_diff:
+ alter_column_op.modify_server_default = metadata_default
+ log.info("Detected server default on column '%s.%s'", tname, cname)
+
+ return None
+
+
+@comparators.dispatch_for("column")
+def _compare_column_comment(
+ autogen_context: AutogenContext,
+ alter_column_op: AlterColumnOp,
+ schema: Optional[str],
+ tname: Union[quoted_name, str],
+ cname: quoted_name,
+ conn_col: Column[Any],
+ metadata_col: Column[Any],
+) -> Optional[Literal[False]]:
+ assert autogen_context.dialect is not None
+ if not autogen_context.dialect.supports_comments:
+ return None
+
+ metadata_comment = metadata_col.comment
+ conn_col_comment = conn_col.comment
+ if conn_col_comment is None and metadata_comment is None:
+ return False
+
+ alter_column_op.existing_comment = conn_col_comment
+
+ if conn_col_comment != metadata_comment:
+ alter_column_op.modify_comment = metadata_comment
+ log.info("Detected column comment '%s.%s'", tname, cname)
+
+ return None
+
+
+@comparators.dispatch_for("table")
+def _compare_foreign_keys(
+ autogen_context: AutogenContext,
+ modify_table_ops: ModifyTableOps,
+ schema: Optional[str],
+ tname: Union[quoted_name, str],
+ conn_table: Table,
+ metadata_table: Table,
+) -> None:
+ # if we're doing CREATE TABLE, all FKs are created
+ # inline within the table def
+ if conn_table is None or metadata_table is None:
+ return
+
+ inspector = autogen_context.inspector
+ metadata_fks = {
+ fk
+ for fk in metadata_table.constraints
+ if isinstance(fk, sa_schema.ForeignKeyConstraint)
+ }
+
+ conn_fks_list = [
+ fk
+ for fk in inspector.get_foreign_keys(tname, schema=schema)
+ if autogen_context.run_name_filters(
+ fk["name"],
+ "foreign_key_constraint",
+ {"table_name": tname, "schema_name": schema},
+ )
+ ]
+
+ conn_fks = {
+ _make_foreign_key(const, conn_table) # type: ignore[arg-type]
+ for const in conn_fks_list
+ }
+
+ impl = autogen_context.migration_context.impl
+
+ # give the dialect a chance to correct the FKs to match more
+ # closely
+ autogen_context.migration_context.impl.correct_for_autogen_foreignkeys(
+ conn_fks, metadata_fks
+ )
+
+ metadata_fks_sig = {
+ impl._create_metadata_constraint_sig(fk) for fk in metadata_fks
+ }
+
+ conn_fks_sig = {
+ impl._create_reflected_constraint_sig(fk) for fk in conn_fks
+ }
+
+ # check if reflected FKs include options, indicating the backend
+ # can reflect FK options
+ if conn_fks_list and "options" in conn_fks_list[0]:
+ conn_fks_by_sig = {c.unnamed: c for c in conn_fks_sig}
+ metadata_fks_by_sig = {c.unnamed: c for c in metadata_fks_sig}
+ else:
+ # otherwise compare by sig without options added
+ conn_fks_by_sig = {c.unnamed_no_options: c for c in conn_fks_sig}
+ metadata_fks_by_sig = {
+ c.unnamed_no_options: c for c in metadata_fks_sig
+ }
+
+ metadata_fks_by_name = {
+ c.name: c for c in metadata_fks_sig if c.name is not None
+ }
+ conn_fks_by_name = {c.name: c for c in conn_fks_sig if c.name is not None}
+
+ def _add_fk(obj, compare_to):
+ if autogen_context.run_object_filters(
+ obj.const, obj.name, "foreign_key_constraint", False, compare_to
+ ):
+ modify_table_ops.ops.append(
+ ops.CreateForeignKeyOp.from_constraint(const.const) # type: ignore[has-type] # noqa: E501
+ )
+
+ log.info(
+ "Detected added foreign key (%s)(%s) on table %s%s",
+ ", ".join(obj.source_columns),
+ ", ".join(obj.target_columns),
+ "%s." % obj.source_schema if obj.source_schema else "",
+ obj.source_table,
+ )
+
+ def _remove_fk(obj, compare_to):
+ if autogen_context.run_object_filters(
+ obj.const, obj.name, "foreign_key_constraint", True, compare_to
+ ):
+ modify_table_ops.ops.append(
+ ops.DropConstraintOp.from_constraint(obj.const)
+ )
+ log.info(
+ "Detected removed foreign key (%s)(%s) on table %s%s",
+ ", ".join(obj.source_columns),
+ ", ".join(obj.target_columns),
+ "%s." % obj.source_schema if obj.source_schema else "",
+ obj.source_table,
+ )
+
+ # so far it appears we don't need to do this by name at all.
+ # SQLite doesn't preserve constraint names anyway
+
+ for removed_sig in set(conn_fks_by_sig).difference(metadata_fks_by_sig):
+ const = conn_fks_by_sig[removed_sig]
+ if removed_sig not in metadata_fks_by_sig:
+ compare_to = (
+ metadata_fks_by_name[const.name].const
+ if const.name in metadata_fks_by_name
+ else None
+ )
+ _remove_fk(const, compare_to)
+
+ for added_sig in set(metadata_fks_by_sig).difference(conn_fks_by_sig):
+ const = metadata_fks_by_sig[added_sig]
+ if added_sig not in conn_fks_by_sig:
+ compare_to = (
+ conn_fks_by_name[const.name].const
+ if const.name in conn_fks_by_name
+ else None
+ )
+ _add_fk(const, compare_to)
+
+
+@comparators.dispatch_for("table")
+def _compare_table_comment(
+ autogen_context: AutogenContext,
+ modify_table_ops: ModifyTableOps,
+ schema: Optional[str],
+ tname: Union[quoted_name, str],
+ conn_table: Optional[Table],
+ metadata_table: Optional[Table],
+) -> None:
+ assert autogen_context.dialect is not None
+ if not autogen_context.dialect.supports_comments:
+ return
+
+ # if we're doing CREATE TABLE, comments will be created inline
+ # with the create_table op.
+ if conn_table is None or metadata_table is None:
+ return
+
+ if conn_table.comment is None and metadata_table.comment is None:
+ return
+
+ if metadata_table.comment is None and conn_table.comment is not None:
+ modify_table_ops.ops.append(
+ ops.DropTableCommentOp(
+ tname, existing_comment=conn_table.comment, schema=schema
+ )
+ )
+ elif metadata_table.comment != conn_table.comment:
+ modify_table_ops.ops.append(
+ ops.CreateTableCommentOp(
+ tname,
+ metadata_table.comment,
+ existing_comment=conn_table.comment,
+ schema=schema,
+ )
+ )
diff --git a/.venv/lib/python3.12/site-packages/alembic/autogenerate/render.py b/.venv/lib/python3.12/site-packages/alembic/autogenerate/render.py
new file mode 100644
index 00000000..50c51fa9
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/autogenerate/render.py
@@ -0,0 +1,1125 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from __future__ import annotations
+
+from io import StringIO
+import re
+from typing import Any
+from typing import cast
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
+from mako.pygen import PythonPrinter
+from sqlalchemy import schema as sa_schema
+from sqlalchemy import sql
+from sqlalchemy import types as sqltypes
+from sqlalchemy.sql.elements import conv
+from sqlalchemy.sql.elements import Label
+from sqlalchemy.sql.elements import quoted_name
+
+from .. import util
+from ..operations import ops
+from ..util import sqla_compat
+
+if TYPE_CHECKING:
+ from typing import Literal
+
+ from sqlalchemy import Computed
+ from sqlalchemy import Identity
+ from sqlalchemy.sql.base import DialectKWArgs
+ from sqlalchemy.sql.elements import ColumnElement
+ from sqlalchemy.sql.elements import TextClause
+ from sqlalchemy.sql.schema import CheckConstraint
+ from sqlalchemy.sql.schema import Column
+ from sqlalchemy.sql.schema import Constraint
+ from sqlalchemy.sql.schema import FetchedValue
+ from sqlalchemy.sql.schema import ForeignKey
+ from sqlalchemy.sql.schema import ForeignKeyConstraint
+ from sqlalchemy.sql.schema import Index
+ from sqlalchemy.sql.schema import MetaData
+ from sqlalchemy.sql.schema import PrimaryKeyConstraint
+ from sqlalchemy.sql.schema import UniqueConstraint
+ from sqlalchemy.sql.sqltypes import ARRAY
+ from sqlalchemy.sql.type_api import TypeEngine
+
+ from alembic.autogenerate.api import AutogenContext
+ from alembic.config import Config
+ from alembic.operations.ops import MigrationScript
+ from alembic.operations.ops import ModifyTableOps
+
+
+MAX_PYTHON_ARGS = 255
+
+
+def _render_gen_name(
+ autogen_context: AutogenContext,
+ name: sqla_compat._ConstraintName,
+) -> Optional[Union[quoted_name, str, _f_name]]:
+ if isinstance(name, conv):
+ return _f_name(_alembic_autogenerate_prefix(autogen_context), name)
+ else:
+ return sqla_compat.constraint_name_or_none(name)
+
+
+def _indent(text: str) -> str:
+ text = re.compile(r"^", re.M).sub(" ", text).strip()
+ text = re.compile(r" +$", re.M).sub("", text)
+ return text
+
+
+def _render_python_into_templatevars(
+ autogen_context: AutogenContext,
+ migration_script: MigrationScript,
+ template_args: Dict[str, Union[str, Config]],
+) -> None:
+ imports = autogen_context.imports
+
+ for upgrade_ops, downgrade_ops in zip(
+ migration_script.upgrade_ops_list, migration_script.downgrade_ops_list
+ ):
+ template_args[upgrade_ops.upgrade_token] = _indent(
+ _render_cmd_body(upgrade_ops, autogen_context)
+ )
+ template_args[downgrade_ops.downgrade_token] = _indent(
+ _render_cmd_body(downgrade_ops, autogen_context)
+ )
+ template_args["imports"] = "\n".join(sorted(imports))
+
+
+default_renderers = renderers = util.Dispatcher()
+
+
+def _render_cmd_body(
+ op_container: ops.OpContainer,
+ autogen_context: AutogenContext,
+) -> str:
+ buf = StringIO()
+ printer = PythonPrinter(buf)
+
+ printer.writeline(
+ "# ### commands auto generated by Alembic - please adjust! ###"
+ )
+
+ has_lines = False
+ for op in op_container.ops:
+ lines = render_op(autogen_context, op)
+ has_lines = has_lines or bool(lines)
+
+ for line in lines:
+ printer.writeline(line)
+
+ if not has_lines:
+ printer.writeline("pass")
+
+ printer.writeline("# ### end Alembic commands ###")
+
+ return buf.getvalue()
+
+
+def render_op(
+ autogen_context: AutogenContext, op: ops.MigrateOperation
+) -> List[str]:
+ renderer = renderers.dispatch(op)
+ lines = util.to_list(renderer(autogen_context, op))
+ return lines
+
+
+def render_op_text(
+ autogen_context: AutogenContext, op: ops.MigrateOperation
+) -> str:
+ return "\n".join(render_op(autogen_context, op))
+
+
+@renderers.dispatch_for(ops.ModifyTableOps)
+def _render_modify_table(
+ autogen_context: AutogenContext, op: ModifyTableOps
+) -> List[str]:
+ opts = autogen_context.opts
+ render_as_batch = opts.get("render_as_batch", False)
+
+ if op.ops:
+ lines = []
+ if render_as_batch:
+ with autogen_context._within_batch():
+ lines.append(
+ "with op.batch_alter_table(%r, schema=%r) as batch_op:"
+ % (op.table_name, op.schema)
+ )
+ for t_op in op.ops:
+ t_lines = render_op(autogen_context, t_op)
+ lines.extend(t_lines)
+ lines.append("")
+ else:
+ for t_op in op.ops:
+ t_lines = render_op(autogen_context, t_op)
+ lines.extend(t_lines)
+
+ return lines
+ else:
+ return []
+
+
+@renderers.dispatch_for(ops.CreateTableCommentOp)
+def _render_create_table_comment(
+ autogen_context: AutogenContext, op: ops.CreateTableCommentOp
+) -> str:
+ if autogen_context._has_batch:
+ templ = (
+ "{prefix}create_table_comment(\n"
+ "{indent}{comment},\n"
+ "{indent}existing_comment={existing}\n"
+ ")"
+ )
+ else:
+ templ = (
+ "{prefix}create_table_comment(\n"
+ "{indent}'{tname}',\n"
+ "{indent}{comment},\n"
+ "{indent}existing_comment={existing},\n"
+ "{indent}schema={schema}\n"
+ ")"
+ )
+ return templ.format(
+ prefix=_alembic_autogenerate_prefix(autogen_context),
+ tname=op.table_name,
+ comment="%r" % op.comment if op.comment is not None else None,
+ existing=(
+ "%r" % op.existing_comment
+ if op.existing_comment is not None
+ else None
+ ),
+ schema="'%s'" % op.schema if op.schema is not None else None,
+ indent=" ",
+ )
+
+
+@renderers.dispatch_for(ops.DropTableCommentOp)
+def _render_drop_table_comment(
+ autogen_context: AutogenContext, op: ops.DropTableCommentOp
+) -> str:
+ if autogen_context._has_batch:
+ templ = (
+ "{prefix}drop_table_comment(\n"
+ "{indent}existing_comment={existing}\n"
+ ")"
+ )
+ else:
+ templ = (
+ "{prefix}drop_table_comment(\n"
+ "{indent}'{tname}',\n"
+ "{indent}existing_comment={existing},\n"
+ "{indent}schema={schema}\n"
+ ")"
+ )
+ return templ.format(
+ prefix=_alembic_autogenerate_prefix(autogen_context),
+ tname=op.table_name,
+ existing=(
+ "%r" % op.existing_comment
+ if op.existing_comment is not None
+ else None
+ ),
+ schema="'%s'" % op.schema if op.schema is not None else None,
+ indent=" ",
+ )
+
+
+@renderers.dispatch_for(ops.CreateTableOp)
+def _add_table(autogen_context: AutogenContext, op: ops.CreateTableOp) -> str:
+ table = op.to_table()
+
+ args = [
+ col
+ for col in [
+ _render_column(col, autogen_context) for col in table.columns
+ ]
+ if col
+ ] + sorted(
+ [
+ rcons
+ for rcons in [
+ _render_constraint(
+ cons, autogen_context, op._namespace_metadata
+ )
+ for cons in table.constraints
+ ]
+ if rcons is not None
+ ]
+ )
+
+ if len(args) > MAX_PYTHON_ARGS:
+ args_str = "*[" + ",\n".join(args) + "]"
+ else:
+ args_str = ",\n".join(args)
+
+ text = "%(prefix)screate_table(%(tablename)r,\n%(args)s" % {
+ "tablename": _ident(op.table_name),
+ "prefix": _alembic_autogenerate_prefix(autogen_context),
+ "args": args_str,
+ }
+ if op.schema:
+ text += ",\nschema=%r" % _ident(op.schema)
+
+ comment = table.comment
+ if comment:
+ text += ",\ncomment=%r" % _ident(comment)
+
+ info = table.info
+ if info:
+ text += f",\ninfo={info!r}"
+
+ for k in sorted(op.kw):
+ text += ",\n%s=%r" % (k.replace(" ", "_"), op.kw[k])
+
+ if table._prefixes:
+ prefixes = ", ".join("'%s'" % p for p in table._prefixes)
+ text += ",\nprefixes=[%s]" % prefixes
+
+ if op.if_not_exists is not None:
+ text += ",\nif_not_exists=%r" % bool(op.if_not_exists)
+
+ text += "\n)"
+ return text
+
+
+@renderers.dispatch_for(ops.DropTableOp)
+def _drop_table(autogen_context: AutogenContext, op: ops.DropTableOp) -> str:
+ text = "%(prefix)sdrop_table(%(tname)r" % {
+ "prefix": _alembic_autogenerate_prefix(autogen_context),
+ "tname": _ident(op.table_name),
+ }
+ if op.schema:
+ text += ", schema=%r" % _ident(op.schema)
+
+ if op.if_exists is not None:
+ text += ", if_exists=%r" % bool(op.if_exists)
+
+ text += ")"
+ return text
+
+
+def _render_dialect_kwargs_items(
+ autogen_context: AutogenContext, item: DialectKWArgs
+) -> list[str]:
+ return [
+ f"{key}={_render_potential_expr(val, autogen_context)}"
+ for key, val in item.dialect_kwargs.items()
+ ]
+
+
+@renderers.dispatch_for(ops.CreateIndexOp)
+def _add_index(autogen_context: AutogenContext, op: ops.CreateIndexOp) -> str:
+ index = op.to_index()
+
+ has_batch = autogen_context._has_batch
+
+ if has_batch:
+ tmpl = (
+ "%(prefix)screate_index(%(name)r, [%(columns)s], "
+ "unique=%(unique)r%(kwargs)s)"
+ )
+ else:
+ tmpl = (
+ "%(prefix)screate_index(%(name)r, %(table)r, [%(columns)s], "
+ "unique=%(unique)r%(schema)s%(kwargs)s)"
+ )
+
+ assert index.table is not None
+
+ opts = _render_dialect_kwargs_items(autogen_context, index)
+ if op.if_not_exists is not None:
+ opts.append("if_not_exists=%r" % bool(op.if_not_exists))
+ text = tmpl % {
+ "prefix": _alembic_autogenerate_prefix(autogen_context),
+ "name": _render_gen_name(autogen_context, index.name),
+ "table": _ident(index.table.name),
+ "columns": ", ".join(
+ _get_index_rendered_expressions(index, autogen_context)
+ ),
+ "unique": index.unique or False,
+ "schema": (
+ (", schema=%r" % _ident(index.table.schema))
+ if index.table.schema
+ else ""
+ ),
+ "kwargs": ", " + ", ".join(opts) if opts else "",
+ }
+ return text
+
+
+@renderers.dispatch_for(ops.DropIndexOp)
+def _drop_index(autogen_context: AutogenContext, op: ops.DropIndexOp) -> str:
+ index = op.to_index()
+
+ has_batch = autogen_context._has_batch
+
+ if has_batch:
+ tmpl = "%(prefix)sdrop_index(%(name)r%(kwargs)s)"
+ else:
+ tmpl = (
+ "%(prefix)sdrop_index(%(name)r, "
+ "table_name=%(table_name)r%(schema)s%(kwargs)s)"
+ )
+ opts = _render_dialect_kwargs_items(autogen_context, index)
+ if op.if_exists is not None:
+ opts.append("if_exists=%r" % bool(op.if_exists))
+ text = tmpl % {
+ "prefix": _alembic_autogenerate_prefix(autogen_context),
+ "name": _render_gen_name(autogen_context, op.index_name),
+ "table_name": _ident(op.table_name),
+ "schema": ((", schema=%r" % _ident(op.schema)) if op.schema else ""),
+ "kwargs": ", " + ", ".join(opts) if opts else "",
+ }
+ return text
+
+
+@renderers.dispatch_for(ops.CreateUniqueConstraintOp)
+def _add_unique_constraint(
+ autogen_context: AutogenContext, op: ops.CreateUniqueConstraintOp
+) -> List[str]:
+ return [_uq_constraint(op.to_constraint(), autogen_context, True)]
+
+
+@renderers.dispatch_for(ops.CreateForeignKeyOp)
+def _add_fk_constraint(
+ autogen_context: AutogenContext, op: ops.CreateForeignKeyOp
+) -> str:
+ args = [repr(_render_gen_name(autogen_context, op.constraint_name))]
+ if not autogen_context._has_batch:
+ args.append(repr(_ident(op.source_table)))
+
+ args.extend(
+ [
+ repr(_ident(op.referent_table)),
+ repr([_ident(col) for col in op.local_cols]),
+ repr([_ident(col) for col in op.remote_cols]),
+ ]
+ )
+ kwargs = [
+ "referent_schema",
+ "onupdate",
+ "ondelete",
+ "initially",
+ "deferrable",
+ "use_alter",
+ "match",
+ ]
+ if not autogen_context._has_batch:
+ kwargs.insert(0, "source_schema")
+
+ for k in kwargs:
+ if k in op.kw:
+ value = op.kw[k]
+ if value is not None:
+ args.append("%s=%r" % (k, value))
+
+ return "%(prefix)screate_foreign_key(%(args)s)" % {
+ "prefix": _alembic_autogenerate_prefix(autogen_context),
+ "args": ", ".join(args),
+ }
+
+
+@renderers.dispatch_for(ops.CreatePrimaryKeyOp)
+def _add_pk_constraint(constraint, autogen_context):
+ raise NotImplementedError()
+
+
+@renderers.dispatch_for(ops.CreateCheckConstraintOp)
+def _add_check_constraint(constraint, autogen_context):
+ raise NotImplementedError()
+
+
+@renderers.dispatch_for(ops.DropConstraintOp)
+def _drop_constraint(
+ autogen_context: AutogenContext, op: ops.DropConstraintOp
+) -> str:
+ prefix = _alembic_autogenerate_prefix(autogen_context)
+ name = _render_gen_name(autogen_context, op.constraint_name)
+ schema = _ident(op.schema) if op.schema else None
+ type_ = _ident(op.constraint_type) if op.constraint_type else None
+
+ params_strs = []
+ params_strs.append(repr(name))
+ if not autogen_context._has_batch:
+ params_strs.append(repr(_ident(op.table_name)))
+ if schema is not None:
+ params_strs.append(f"schema={schema!r}")
+ if type_ is not None:
+ params_strs.append(f"type_={type_!r}")
+
+ return f"{prefix}drop_constraint({', '.join(params_strs)})"
+
+
+@renderers.dispatch_for(ops.AddColumnOp)
+def _add_column(autogen_context: AutogenContext, op: ops.AddColumnOp) -> str:
+ schema, tname, column = op.schema, op.table_name, op.column
+ if autogen_context._has_batch:
+ template = "%(prefix)sadd_column(%(column)s)"
+ else:
+ template = "%(prefix)sadd_column(%(tname)r, %(column)s"
+ if schema:
+ template += ", schema=%(schema)r"
+ template += ")"
+ text = template % {
+ "prefix": _alembic_autogenerate_prefix(autogen_context),
+ "tname": tname,
+ "column": _render_column(column, autogen_context),
+ "schema": schema,
+ }
+ return text
+
+
+@renderers.dispatch_for(ops.DropColumnOp)
+def _drop_column(autogen_context: AutogenContext, op: ops.DropColumnOp) -> str:
+ schema, tname, column_name = op.schema, op.table_name, op.column_name
+
+ if autogen_context._has_batch:
+ template = "%(prefix)sdrop_column(%(cname)r)"
+ else:
+ template = "%(prefix)sdrop_column(%(tname)r, %(cname)r"
+ if schema:
+ template += ", schema=%(schema)r"
+ template += ")"
+
+ text = template % {
+ "prefix": _alembic_autogenerate_prefix(autogen_context),
+ "tname": _ident(tname),
+ "cname": _ident(column_name),
+ "schema": _ident(schema),
+ }
+ return text
+
+
+@renderers.dispatch_for(ops.AlterColumnOp)
+def _alter_column(
+ autogen_context: AutogenContext, op: ops.AlterColumnOp
+) -> str:
+ tname = op.table_name
+ cname = op.column_name
+ server_default = op.modify_server_default
+ type_ = op.modify_type
+ nullable = op.modify_nullable
+ comment = op.modify_comment
+ autoincrement = op.kw.get("autoincrement", None)
+ existing_type = op.existing_type
+ existing_nullable = op.existing_nullable
+ existing_comment = op.existing_comment
+ existing_server_default = op.existing_server_default
+ schema = op.schema
+
+ indent = " " * 11
+
+ if autogen_context._has_batch:
+ template = "%(prefix)salter_column(%(cname)r"
+ else:
+ template = "%(prefix)salter_column(%(tname)r, %(cname)r"
+
+ text = template % {
+ "prefix": _alembic_autogenerate_prefix(autogen_context),
+ "tname": tname,
+ "cname": cname,
+ }
+ if existing_type is not None:
+ text += ",\n%sexisting_type=%s" % (
+ indent,
+ _repr_type(existing_type, autogen_context),
+ )
+ if server_default is not False:
+ rendered = _render_server_default(server_default, autogen_context)
+ text += ",\n%sserver_default=%s" % (indent, rendered)
+
+ if type_ is not None:
+ text += ",\n%stype_=%s" % (indent, _repr_type(type_, autogen_context))
+ if nullable is not None:
+ text += ",\n%snullable=%r" % (indent, nullable)
+ if comment is not False:
+ text += ",\n%scomment=%r" % (indent, comment)
+ if existing_comment is not None:
+ text += ",\n%sexisting_comment=%r" % (indent, existing_comment)
+ if nullable is None and existing_nullable is not None:
+ text += ",\n%sexisting_nullable=%r" % (indent, existing_nullable)
+ if autoincrement is not None:
+ text += ",\n%sautoincrement=%r" % (indent, autoincrement)
+ if server_default is False and existing_server_default:
+ rendered = _render_server_default(
+ existing_server_default, autogen_context
+ )
+ text += ",\n%sexisting_server_default=%s" % (indent, rendered)
+ if schema and not autogen_context._has_batch:
+ text += ",\n%sschema=%r" % (indent, schema)
+ text += ")"
+ return text
+
+
+class _f_name:
+ def __init__(self, prefix: str, name: conv) -> None:
+ self.prefix = prefix
+ self.name = name
+
+ def __repr__(self) -> str:
+ return "%sf(%r)" % (self.prefix, _ident(self.name))
+
+
+def _ident(name: Optional[Union[quoted_name, str]]) -> Optional[str]:
+ """produce a __repr__() object for a string identifier that may
+ use quoted_name() in SQLAlchemy 0.9 and greater.
+
+ The issue worked around here is that quoted_name() doesn't have
+ very good repr() behavior by itself when unicode is involved.
+
+ """
+ if name is None:
+ return name
+ elif isinstance(name, quoted_name):
+ return str(name)
+ elif isinstance(name, str):
+ return name
+
+
+def _render_potential_expr(
+ value: Any,
+ autogen_context: AutogenContext,
+ *,
+ wrap_in_element: bool = True,
+ is_server_default: bool = False,
+ is_index: bool = False,
+) -> str:
+ if isinstance(value, sql.ClauseElement):
+ sql_text = autogen_context.migration_context.impl.render_ddl_sql_expr(
+ value, is_server_default=is_server_default, is_index=is_index
+ )
+ if wrap_in_element:
+ prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
+ element = "literal_column" if is_index else "text"
+ value_str = f"{prefix}{element}({sql_text!r})"
+ if (
+ is_index
+ and isinstance(value, Label)
+ and type(value.name) is str
+ ):
+ return value_str + f".label({value.name!r})"
+ else:
+ return value_str
+ else:
+ return repr(sql_text)
+ else:
+ return repr(value)
+
+
+def _get_index_rendered_expressions(
+ idx: Index, autogen_context: AutogenContext
+) -> List[str]:
+ return [
+ (
+ repr(_ident(getattr(exp, "name", None)))
+ if isinstance(exp, sa_schema.Column)
+ else _render_potential_expr(exp, autogen_context, is_index=True)
+ )
+ for exp in idx.expressions
+ ]
+
+
+def _uq_constraint(
+ constraint: UniqueConstraint,
+ autogen_context: AutogenContext,
+ alter: bool,
+) -> str:
+ opts: List[Tuple[str, Any]] = []
+
+ has_batch = autogen_context._has_batch
+
+ if constraint.deferrable:
+ opts.append(("deferrable", constraint.deferrable))
+ if constraint.initially:
+ opts.append(("initially", constraint.initially))
+ if not has_batch and alter and constraint.table.schema:
+ opts.append(("schema", _ident(constraint.table.schema)))
+ if not alter and constraint.name:
+ opts.append(
+ ("name", _render_gen_name(autogen_context, constraint.name))
+ )
+ dialect_options = _render_dialect_kwargs_items(autogen_context, constraint)
+
+ if alter:
+ args = [repr(_render_gen_name(autogen_context, constraint.name))]
+ if not has_batch:
+ args += [repr(_ident(constraint.table.name))]
+ args.append(repr([_ident(col.name) for col in constraint.columns]))
+ args.extend(["%s=%r" % (k, v) for k, v in opts])
+ args.extend(dialect_options)
+ return "%(prefix)screate_unique_constraint(%(args)s)" % {
+ "prefix": _alembic_autogenerate_prefix(autogen_context),
+ "args": ", ".join(args),
+ }
+ else:
+ args = [repr(_ident(col.name)) for col in constraint.columns]
+ args.extend(["%s=%r" % (k, v) for k, v in opts])
+ args.extend(dialect_options)
+ return "%(prefix)sUniqueConstraint(%(args)s)" % {
+ "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+ "args": ", ".join(args),
+ }
+
+
+def _user_autogenerate_prefix(autogen_context, target):
+ prefix = autogen_context.opts["user_module_prefix"]
+ if prefix is None:
+ return "%s." % target.__module__
+ else:
+ return prefix
+
+
+def _sqlalchemy_autogenerate_prefix(autogen_context: AutogenContext) -> str:
+ return autogen_context.opts["sqlalchemy_module_prefix"] or ""
+
+
+def _alembic_autogenerate_prefix(autogen_context: AutogenContext) -> str:
+ if autogen_context._has_batch:
+ return "batch_op."
+ else:
+ return autogen_context.opts["alembic_module_prefix"] or ""
+
+
+def _user_defined_render(
+ type_: str, object_: Any, autogen_context: AutogenContext
+) -> Union[str, Literal[False]]:
+ if "render_item" in autogen_context.opts:
+ render = autogen_context.opts["render_item"]
+ if render:
+ rendered = render(type_, object_, autogen_context)
+ if rendered is not False:
+ return rendered
+ return False
+
+
+def _render_column(
+ column: Column[Any], autogen_context: AutogenContext
+) -> str:
+ rendered = _user_defined_render("column", column, autogen_context)
+ if rendered is not False:
+ return rendered
+
+ args: List[str] = []
+ opts: List[Tuple[str, Any]] = []
+
+ if column.server_default:
+ rendered = _render_server_default( # type:ignore[assignment]
+ column.server_default, autogen_context
+ )
+ if rendered:
+ if _should_render_server_default_positionally(
+ column.server_default
+ ):
+ args.append(rendered)
+ else:
+ opts.append(("server_default", rendered))
+
+ if (
+ column.autoincrement is not None
+ and column.autoincrement != sqla_compat.AUTOINCREMENT_DEFAULT
+ ):
+ opts.append(("autoincrement", column.autoincrement))
+
+ if column.nullable is not None:
+ opts.append(("nullable", column.nullable))
+
+ if column.system:
+ opts.append(("system", column.system))
+
+ comment = column.comment
+ if comment:
+ opts.append(("comment", "%r" % comment))
+
+ # TODO: for non-ascii colname, assign a "key"
+ return "%(prefix)sColumn(%(name)r, %(type)s, %(args)s%(kwargs)s)" % {
+ "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+ "name": _ident(column.name),
+ "type": _repr_type(column.type, autogen_context),
+ "args": ", ".join([str(arg) for arg in args]) + ", " if args else "",
+ "kwargs": (
+ ", ".join(
+ ["%s=%s" % (kwname, val) for kwname, val in opts]
+ + [
+ "%s=%s"
+ % (key, _render_potential_expr(val, autogen_context))
+ for key, val in column.kwargs.items()
+ ]
+ )
+ ),
+ }
+
+
+def _should_render_server_default_positionally(server_default: Any) -> bool:
+ return sqla_compat._server_default_is_computed(
+ server_default
+ ) or sqla_compat._server_default_is_identity(server_default)
+
+
+def _render_server_default(
+ default: Optional[
+ Union[FetchedValue, str, TextClause, ColumnElement[Any]]
+ ],
+ autogen_context: AutogenContext,
+ repr_: bool = True,
+) -> Optional[str]:
+ rendered = _user_defined_render("server_default", default, autogen_context)
+ if rendered is not False:
+ return rendered
+
+ if sqla_compat._server_default_is_computed(default):
+ return _render_computed(cast("Computed", default), autogen_context)
+ elif sqla_compat._server_default_is_identity(default):
+ return _render_identity(cast("Identity", default), autogen_context)
+ elif isinstance(default, sa_schema.DefaultClause):
+ if isinstance(default.arg, str):
+ default = default.arg
+ else:
+ return _render_potential_expr(
+ default.arg, autogen_context, is_server_default=True
+ )
+
+ if isinstance(default, str) and repr_:
+ default = repr(re.sub(r"^'|'$", "", default))
+
+ return cast(str, default)
+
+
+def _render_computed(
+ computed: Computed, autogen_context: AutogenContext
+) -> str:
+ text = _render_potential_expr(
+ computed.sqltext, autogen_context, wrap_in_element=False
+ )
+
+ kwargs = {}
+ if computed.persisted is not None:
+ kwargs["persisted"] = computed.persisted
+ return "%(prefix)sComputed(%(text)s, %(kwargs)s)" % {
+ "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+ "text": text,
+ "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
+ }
+
+
+def _render_identity(
+ identity: Identity, autogen_context: AutogenContext
+) -> str:
+ kwargs = sqla_compat._get_identity_options_dict(
+ identity, dialect_kwargs=True
+ )
+
+ return "%(prefix)sIdentity(%(kwargs)s)" % {
+ "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+ "kwargs": (", ".join("%s=%s" % pair for pair in kwargs.items())),
+ }
+
+
+def _repr_type(
+ type_: TypeEngine,
+ autogen_context: AutogenContext,
+ _skip_variants: bool = False,
+) -> str:
+ rendered = _user_defined_render("type", type_, autogen_context)
+ if rendered is not False:
+ return rendered
+
+ if hasattr(autogen_context.migration_context, "impl"):
+ impl_rt = autogen_context.migration_context.impl.render_type(
+ type_, autogen_context
+ )
+ else:
+ impl_rt = None
+
+ mod = type(type_).__module__
+ imports = autogen_context.imports
+
+ if not _skip_variants and sqla_compat._type_has_variants(type_):
+ return _render_Variant_type(type_, autogen_context)
+ elif mod.startswith("sqlalchemy.dialects"):
+ match = re.match(r"sqlalchemy\.dialects\.(\w+)", mod)
+ assert match is not None
+ dname = match.group(1)
+ if imports is not None:
+ imports.add("from sqlalchemy.dialects import %s" % dname)
+ if impl_rt:
+ return impl_rt
+ else:
+ return "%s.%r" % (dname, type_)
+ elif impl_rt:
+ return impl_rt
+ elif mod.startswith("sqlalchemy."):
+ if "_render_%s_type" % type_.__visit_name__ in globals():
+ fn = globals()["_render_%s_type" % type_.__visit_name__]
+ return fn(type_, autogen_context)
+ else:
+ prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
+ return "%s%r" % (prefix, type_)
+ else:
+ prefix = _user_autogenerate_prefix(autogen_context, type_)
+ return "%s%r" % (prefix, type_)
+
+
+def _render_ARRAY_type(type_: ARRAY, autogen_context: AutogenContext) -> str:
+ return cast(
+ str,
+ _render_type_w_subtype(
+ type_, autogen_context, "item_type", r"(.+?\()"
+ ),
+ )
+
+
+def _render_Variant_type(
+ type_: TypeEngine, autogen_context: AutogenContext
+) -> str:
+ base_type, variant_mapping = sqla_compat._get_variant_mapping(type_)
+ base = _repr_type(base_type, autogen_context, _skip_variants=True)
+ assert base is not None and base is not False # type: ignore[comparison-overlap] # noqa:E501
+ for dialect in sorted(variant_mapping):
+ typ = variant_mapping[dialect]
+ base += ".with_variant(%s, %r)" % (
+ _repr_type(typ, autogen_context, _skip_variants=True),
+ dialect,
+ )
+ return base
+
+
+def _render_type_w_subtype(
+ type_: TypeEngine,
+ autogen_context: AutogenContext,
+ attrname: str,
+ regexp: str,
+ prefix: Optional[str] = None,
+) -> Union[Optional[str], Literal[False]]:
+ outer_repr = repr(type_)
+ inner_type = getattr(type_, attrname, None)
+ if inner_type is None:
+ return False
+
+ inner_repr = repr(inner_type)
+
+ inner_repr = re.sub(r"([\(\)])", r"\\\1", inner_repr)
+ sub_type = _repr_type(getattr(type_, attrname), autogen_context)
+ outer_type = re.sub(regexp + inner_repr, r"\1%s" % sub_type, outer_repr)
+
+ if prefix:
+ return "%s%s" % (prefix, outer_type)
+
+ mod = type(type_).__module__
+ if mod.startswith("sqlalchemy.dialects"):
+ match = re.match(r"sqlalchemy\.dialects\.(\w+)", mod)
+ assert match is not None
+ dname = match.group(1)
+ return "%s.%s" % (dname, outer_type)
+ elif mod.startswith("sqlalchemy"):
+ prefix = _sqlalchemy_autogenerate_prefix(autogen_context)
+ return "%s%s" % (prefix, outer_type)
+ else:
+ return None
+
+
+_constraint_renderers = util.Dispatcher()
+
+
+def _render_constraint(
+ constraint: Constraint,
+ autogen_context: AutogenContext,
+ namespace_metadata: Optional[MetaData],
+) -> Optional[str]:
+ try:
+ renderer = _constraint_renderers.dispatch(constraint)
+ except ValueError:
+ util.warn("No renderer is established for object %r" % constraint)
+ return "[Unknown Python object %r]" % constraint
+ else:
+ return renderer(constraint, autogen_context, namespace_metadata)
+
+
+@_constraint_renderers.dispatch_for(sa_schema.PrimaryKeyConstraint)
+def _render_primary_key(
+ constraint: PrimaryKeyConstraint,
+ autogen_context: AutogenContext,
+ namespace_metadata: Optional[MetaData],
+) -> Optional[str]:
+ rendered = _user_defined_render("primary_key", constraint, autogen_context)
+ if rendered is not False:
+ return rendered
+
+ if not constraint.columns:
+ return None
+
+ opts = []
+ if constraint.name:
+ opts.append(
+ ("name", repr(_render_gen_name(autogen_context, constraint.name)))
+ )
+ return "%(prefix)sPrimaryKeyConstraint(%(args)s)" % {
+ "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+ "args": ", ".join(
+ [repr(c.name) for c in constraint.columns]
+ + ["%s=%s" % (kwname, val) for kwname, val in opts]
+ ),
+ }
+
+
+def _fk_colspec(
+ fk: ForeignKey,
+ metadata_schema: Optional[str],
+ namespace_metadata: MetaData,
+) -> str:
+ """Implement a 'safe' version of ForeignKey._get_colspec() that
+ won't fail if the remote table can't be resolved.
+
+ """
+ colspec = fk._get_colspec()
+ tokens = colspec.split(".")
+ tname, colname = tokens[-2:]
+
+ if metadata_schema is not None and len(tokens) == 2:
+ table_fullname = "%s.%s" % (metadata_schema, tname)
+ else:
+ table_fullname = ".".join(tokens[0:-1])
+
+ if (
+ not fk.link_to_name
+ and fk.parent is not None
+ and fk.parent.table is not None
+ ):
+ # try to resolve the remote table in order to adjust for column.key.
+ # the FK constraint needs to be rendered in terms of the column
+ # name.
+
+ if table_fullname in namespace_metadata.tables:
+ col = namespace_metadata.tables[table_fullname].c.get(colname)
+ if col is not None:
+ colname = _ident(col.name) # type: ignore[assignment]
+
+ colspec = "%s.%s" % (table_fullname, colname)
+
+ return colspec
+
+
+def _populate_render_fk_opts(
+ constraint: ForeignKeyConstraint, opts: List[Tuple[str, str]]
+) -> None:
+ if constraint.onupdate:
+ opts.append(("onupdate", repr(constraint.onupdate)))
+ if constraint.ondelete:
+ opts.append(("ondelete", repr(constraint.ondelete)))
+ if constraint.initially:
+ opts.append(("initially", repr(constraint.initially)))
+ if constraint.deferrable:
+ opts.append(("deferrable", repr(constraint.deferrable)))
+ if constraint.use_alter:
+ opts.append(("use_alter", repr(constraint.use_alter)))
+ if constraint.match:
+ opts.append(("match", repr(constraint.match)))
+
+
+@_constraint_renderers.dispatch_for(sa_schema.ForeignKeyConstraint)
+def _render_foreign_key(
+ constraint: ForeignKeyConstraint,
+ autogen_context: AutogenContext,
+ namespace_metadata: MetaData,
+) -> Optional[str]:
+ rendered = _user_defined_render("foreign_key", constraint, autogen_context)
+ if rendered is not False:
+ return rendered
+
+ opts = []
+ if constraint.name:
+ opts.append(
+ ("name", repr(_render_gen_name(autogen_context, constraint.name)))
+ )
+
+ _populate_render_fk_opts(constraint, opts)
+
+ apply_metadata_schema = namespace_metadata.schema
+ return (
+ "%(prefix)sForeignKeyConstraint([%(cols)s], "
+ "[%(refcols)s], %(args)s)"
+ % {
+ "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+ "cols": ", ".join(
+ repr(_ident(f.parent.name)) for f in constraint.elements
+ ),
+ "refcols": ", ".join(
+ repr(_fk_colspec(f, apply_metadata_schema, namespace_metadata))
+ for f in constraint.elements
+ ),
+ "args": ", ".join(
+ ["%s=%s" % (kwname, val) for kwname, val in opts]
+ ),
+ }
+ )
+
+
+@_constraint_renderers.dispatch_for(sa_schema.UniqueConstraint)
+def _render_unique_constraint(
+ constraint: UniqueConstraint,
+ autogen_context: AutogenContext,
+ namespace_metadata: Optional[MetaData],
+) -> str:
+ rendered = _user_defined_render("unique", constraint, autogen_context)
+ if rendered is not False:
+ return rendered
+
+ return _uq_constraint(constraint, autogen_context, False)
+
+
+@_constraint_renderers.dispatch_for(sa_schema.CheckConstraint)
+def _render_check_constraint(
+ constraint: CheckConstraint,
+ autogen_context: AutogenContext,
+ namespace_metadata: Optional[MetaData],
+) -> Optional[str]:
+ rendered = _user_defined_render("check", constraint, autogen_context)
+ if rendered is not False:
+ return rendered
+
+ # detect the constraint being part of
+ # a parent type which is probably in the Table already.
+ # ideally SQLAlchemy would give us more of a first class
+ # way to detect this.
+ if (
+ constraint._create_rule
+ and hasattr(constraint._create_rule, "target")
+ and isinstance(
+ constraint._create_rule.target,
+ sqltypes.TypeEngine,
+ )
+ ):
+ return None
+ opts = []
+ if constraint.name:
+ opts.append(
+ ("name", repr(_render_gen_name(autogen_context, constraint.name)))
+ )
+ return "%(prefix)sCheckConstraint(%(sqltext)s%(opts)s)" % {
+ "prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
+ "opts": (
+ ", " + (", ".join("%s=%s" % (k, v) for k, v in opts))
+ if opts
+ else ""
+ ),
+ "sqltext": _render_potential_expr(
+ constraint.sqltext, autogen_context, wrap_in_element=False
+ ),
+ }
+
+
+@renderers.dispatch_for(ops.ExecuteSQLOp)
+def _execute_sql(autogen_context: AutogenContext, op: ops.ExecuteSQLOp) -> str:
+ if not isinstance(op.sqltext, str):
+ raise NotImplementedError(
+ "Autogenerate rendering of SQL Expression language constructs "
+ "not supported here; please use a plain SQL string"
+ )
+ return "op.execute(%r)" % op.sqltext
+
+
+renderers = default_renderers.branch()
diff --git a/.venv/lib/python3.12/site-packages/alembic/autogenerate/rewriter.py b/.venv/lib/python3.12/site-packages/alembic/autogenerate/rewriter.py
new file mode 100644
index 00000000..8994dcf8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/autogenerate/rewriter.py
@@ -0,0 +1,240 @@
+from __future__ import annotations
+
+from typing import Any
+from typing import Callable
+from typing import Iterator
+from typing import List
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import Union
+
+from .. import util
+from ..operations import ops
+
+if TYPE_CHECKING:
+ from ..operations.ops import AddColumnOp
+ from ..operations.ops import AlterColumnOp
+ from ..operations.ops import CreateTableOp
+ from ..operations.ops import DowngradeOps
+ from ..operations.ops import MigrateOperation
+ from ..operations.ops import MigrationScript
+ from ..operations.ops import ModifyTableOps
+ from ..operations.ops import OpContainer
+ from ..operations.ops import UpgradeOps
+ from ..runtime.migration import MigrationContext
+ from ..script.revision import _GetRevArg
+
+ProcessRevisionDirectiveFn = Callable[
+ ["MigrationContext", "_GetRevArg", List["MigrationScript"]], None
+]
+
+
+class Rewriter:
+ """A helper object that allows easy 'rewriting' of ops streams.
+
+ The :class:`.Rewriter` object is intended to be passed along
+ to the
+ :paramref:`.EnvironmentContext.configure.process_revision_directives`
+ parameter in an ``env.py`` script. Once constructed, any number
+ of "rewrites" functions can be associated with it, which will be given
+ the opportunity to modify the structure without having to have explicit
+ knowledge of the overall structure.
+
+ The function is passed the :class:`.MigrationContext` object and
+ ``revision`` tuple that are passed to the :paramref:`.Environment
+ Context.configure.process_revision_directives` function normally,
+ and the third argument is an individual directive of the type
+ noted in the decorator. The function has the choice of returning
+ a single op directive, which normally can be the directive that
+ was actually passed, or a new directive to replace it, or a list
+ of zero or more directives to replace it.
+
+ .. seealso::
+
+ :ref:`autogen_rewriter` - usage example
+
+ """
+
+ _traverse = util.Dispatcher()
+
+ _chained: Tuple[Union[ProcessRevisionDirectiveFn, Rewriter], ...] = ()
+
+ def __init__(self) -> None:
+ self.dispatch = util.Dispatcher()
+
+ def chain(
+ self,
+ other: Union[
+ ProcessRevisionDirectiveFn,
+ Rewriter,
+ ],
+ ) -> Rewriter:
+ """Produce a "chain" of this :class:`.Rewriter` to another.
+
+ This allows two or more rewriters to operate serially on a stream,
+ e.g.::
+
+ writer1 = autogenerate.Rewriter()
+ writer2 = autogenerate.Rewriter()
+
+
+ @writer1.rewrites(ops.AddColumnOp)
+ def add_column_nullable(context, revision, op):
+ op.column.nullable = True
+ return op
+
+
+ @writer2.rewrites(ops.AddColumnOp)
+ def add_column_idx(context, revision, op):
+ idx_op = ops.CreateIndexOp(
+ "ixc", op.table_name, [op.column.name]
+ )
+ return [op, idx_op]
+
+ writer = writer1.chain(writer2)
+
+ :param other: a :class:`.Rewriter` instance
+ :return: a new :class:`.Rewriter` that will run the operations
+ of this writer, then the "other" writer, in succession.
+
+ """
+ wr = self.__class__.__new__(self.__class__)
+ wr.__dict__.update(self.__dict__)
+ wr._chained += (other,)
+ return wr
+
+ def rewrites(
+ self,
+ operator: Union[
+ Type[AddColumnOp],
+ Type[MigrateOperation],
+ Type[AlterColumnOp],
+ Type[CreateTableOp],
+ Type[ModifyTableOps],
+ ],
+ ) -> Callable[..., Any]:
+ """Register a function as rewriter for a given type.
+
+ The function should receive three arguments, which are
+ the :class:`.MigrationContext`, a ``revision`` tuple, and
+ an op directive of the type indicated. E.g.::
+
+ @writer1.rewrites(ops.AddColumnOp)
+ def add_column_nullable(context, revision, op):
+ op.column.nullable = True
+ return op
+
+ """
+ return self.dispatch.dispatch_for(operator)
+
+ def _rewrite(
+ self,
+ context: MigrationContext,
+ revision: _GetRevArg,
+ directive: MigrateOperation,
+ ) -> Iterator[MigrateOperation]:
+ try:
+ _rewriter = self.dispatch.dispatch(directive)
+ except ValueError:
+ _rewriter = None
+ yield directive
+ else:
+ if self in directive._mutations:
+ yield directive
+ else:
+ for r_directive in util.to_list(
+ _rewriter(context, revision, directive), []
+ ):
+ r_directive._mutations = r_directive._mutations.union(
+ [self]
+ )
+ yield r_directive
+
+ def __call__(
+ self,
+ context: MigrationContext,
+ revision: _GetRevArg,
+ directives: List[MigrationScript],
+ ) -> None:
+ self.process_revision_directives(context, revision, directives)
+ for process_revision_directives in self._chained:
+ process_revision_directives(context, revision, directives)
+
+ @_traverse.dispatch_for(ops.MigrationScript)
+ def _traverse_script(
+ self,
+ context: MigrationContext,
+ revision: _GetRevArg,
+ directive: MigrationScript,
+ ) -> None:
+ upgrade_ops_list: List[UpgradeOps] = []
+ for upgrade_ops in directive.upgrade_ops_list:
+ ret = self._traverse_for(context, revision, upgrade_ops)
+ if len(ret) != 1:
+ raise ValueError(
+ "Can only return single object for UpgradeOps traverse"
+ )
+ upgrade_ops_list.append(ret[0])
+
+ directive.upgrade_ops = upgrade_ops_list # type: ignore
+
+ downgrade_ops_list: List[DowngradeOps] = []
+ for downgrade_ops in directive.downgrade_ops_list:
+ ret = self._traverse_for(context, revision, downgrade_ops)
+ if len(ret) != 1:
+ raise ValueError(
+ "Can only return single object for DowngradeOps traverse"
+ )
+ downgrade_ops_list.append(ret[0])
+ directive.downgrade_ops = downgrade_ops_list # type: ignore
+
+ @_traverse.dispatch_for(ops.OpContainer)
+ def _traverse_op_container(
+ self,
+ context: MigrationContext,
+ revision: _GetRevArg,
+ directive: OpContainer,
+ ) -> None:
+ self._traverse_list(context, revision, directive.ops)
+
+ @_traverse.dispatch_for(ops.MigrateOperation)
+ def _traverse_any_directive(
+ self,
+ context: MigrationContext,
+ revision: _GetRevArg,
+ directive: MigrateOperation,
+ ) -> None:
+ pass
+
+ def _traverse_for(
+ self,
+ context: MigrationContext,
+ revision: _GetRevArg,
+ directive: MigrateOperation,
+ ) -> Any:
+ directives = list(self._rewrite(context, revision, directive))
+ for directive in directives:
+ traverser = self._traverse.dispatch(directive)
+ traverser(self, context, revision, directive)
+ return directives
+
+ def _traverse_list(
+ self,
+ context: MigrationContext,
+ revision: _GetRevArg,
+ directives: Any,
+ ) -> None:
+ dest = []
+ for directive in directives:
+ dest.extend(self._traverse_for(context, revision, directive))
+
+ directives[:] = dest
+
+ def process_revision_directives(
+ self,
+ context: MigrationContext,
+ revision: _GetRevArg,
+ directives: List[MigrationScript],
+ ) -> None:
+ self._traverse_list(context, revision, directives)