about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py650
1 files changed, 650 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py b/.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py
new file mode 100644
index 00000000..811462e8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/autogenerate/api.py
@@ -0,0 +1,650 @@
+from __future__ import annotations
+
+import contextlib
+from typing import Any
+from typing import Dict
+from typing import Iterator
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Set
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy import inspect
+
+from . import compare
+from . import render
+from .. import util
+from ..operations import ops
+from ..util import sqla_compat
+
+"""Provide the 'autogenerate' feature which can produce migration operations
+automatically."""
+
+if TYPE_CHECKING:
+    from sqlalchemy.engine import Connection
+    from sqlalchemy.engine import Dialect
+    from sqlalchemy.engine import Inspector
+    from sqlalchemy.sql.schema import MetaData
+    from sqlalchemy.sql.schema import SchemaItem
+    from sqlalchemy.sql.schema import Table
+
+    from ..config import Config
+    from ..operations.ops import DowngradeOps
+    from ..operations.ops import MigrationScript
+    from ..operations.ops import UpgradeOps
+    from ..runtime.environment import NameFilterParentNames
+    from ..runtime.environment import NameFilterType
+    from ..runtime.environment import ProcessRevisionDirectiveFn
+    from ..runtime.environment import RenderItemFn
+    from ..runtime.migration import MigrationContext
+    from ..script.base import Script
+    from ..script.base import ScriptDirectory
+    from ..script.revision import _GetRevArg
+
+
+def compare_metadata(context: MigrationContext, metadata: MetaData) -> Any:
+    """Compare a database schema to that given in a
+    :class:`~sqlalchemy.schema.MetaData` instance.
+
+    The database connection is presented in the context
+    of a :class:`.MigrationContext` object, which
+    provides database connectivity as well as optional
+    comparison functions to use for datatypes and
+    server defaults - see the "autogenerate" arguments
+    at :meth:`.EnvironmentContext.configure`
+    for details on these.
+
+    The return format is a list of "diff" directives,
+    each representing individual differences::
+
+        from alembic.migration import MigrationContext
+        from alembic.autogenerate import compare_metadata
+        from sqlalchemy import (
+            create_engine,
+            MetaData,
+            Column,
+            Integer,
+            String,
+            Table,
+            text,
+        )
+        import pprint
+
+        engine = create_engine("sqlite://")
+
+        with engine.begin() as conn:
+            conn.execute(
+                text(
+                    '''
+                        create table foo (
+                            id integer not null primary key,
+                            old_data varchar,
+                            x integer
+                        )
+                    '''
+                )
+            )
+            conn.execute(text("create table bar (data varchar)"))
+
+        metadata = MetaData()
+        Table(
+            "foo",
+            metadata,
+            Column("id", Integer, primary_key=True),
+            Column("data", Integer),
+            Column("x", Integer, nullable=False),
+        )
+        Table("bat", metadata, Column("info", String))
+
+        mc = MigrationContext.configure(engine.connect())
+
+        diff = compare_metadata(mc, metadata)
+        pprint.pprint(diff, indent=2, width=20)
+
+    Output::
+
+        [
+            (
+                "add_table",
+                Table(
+                    "bat",
+                    MetaData(),
+                    Column("info", String(), table=<bat>),
+                    schema=None,
+                ),
+            ),
+            (
+                "remove_table",
+                Table(
+                    "bar",
+                    MetaData(),
+                    Column("data", VARCHAR(), table=<bar>),
+                    schema=None,
+                ),
+            ),
+            (
+                "add_column",
+                None,
+                "foo",
+                Column("data", Integer(), table=<foo>),
+            ),
+            [
+                (
+                    "modify_nullable",
+                    None,
+                    "foo",
+                    "x",
+                    {
+                        "existing_comment": None,
+                        "existing_server_default": False,
+                        "existing_type": INTEGER(),
+                    },
+                    True,
+                    False,
+                )
+            ],
+            (
+                "remove_column",
+                None,
+                "foo",
+                Column("old_data", VARCHAR(), table=<foo>),
+            ),
+        ]
+
+    :param context: a :class:`.MigrationContext`
+     instance.
+    :param metadata: a :class:`~sqlalchemy.schema.MetaData`
+     instance.
+
+    .. seealso::
+
+        :func:`.produce_migrations` - produces a :class:`.MigrationScript`
+        structure based on metadata comparison.
+
+    """
+
+    migration_script = produce_migrations(context, metadata)
+    assert migration_script.upgrade_ops is not None
+    return migration_script.upgrade_ops.as_diffs()
+
+
+def produce_migrations(
+    context: MigrationContext, metadata: MetaData
+) -> MigrationScript:
+    """Produce a :class:`.MigrationScript` structure based on schema
+    comparison.
+
+    This function does essentially what :func:`.compare_metadata` does,
+    but then runs the resulting list of diffs to produce the full
+    :class:`.MigrationScript` object.   For an example of what this looks like,
+    see the example in :ref:`customizing_revision`.
+
+    .. seealso::
+
+        :func:`.compare_metadata` - returns more fundamental "diff"
+        data from comparing a schema.
+
+    """
+
+    autogen_context = AutogenContext(context, metadata=metadata)
+
+    migration_script = ops.MigrationScript(
+        rev_id=None,
+        upgrade_ops=ops.UpgradeOps([]),
+        downgrade_ops=ops.DowngradeOps([]),
+    )
+
+    compare._populate_migration_script(autogen_context, migration_script)
+
+    return migration_script
+
+
+def render_python_code(
+    up_or_down_op: Union[UpgradeOps, DowngradeOps],
+    sqlalchemy_module_prefix: str = "sa.",
+    alembic_module_prefix: str = "op.",
+    render_as_batch: bool = False,
+    imports: Sequence[str] = (),
+    render_item: Optional[RenderItemFn] = None,
+    migration_context: Optional[MigrationContext] = None,
+    user_module_prefix: Optional[str] = None,
+) -> str:
+    """Render Python code given an :class:`.UpgradeOps` or
+    :class:`.DowngradeOps` object.
+
+    This is a convenience function that can be used to test the
+    autogenerate output of a user-defined :class:`.MigrationScript` structure.
+
+    :param up_or_down_op: :class:`.UpgradeOps` or :class:`.DowngradeOps` object
+    :param sqlalchemy_module_prefix: module prefix for SQLAlchemy objects
+    :param alembic_module_prefix: module prefix for Alembic constructs
+    :param render_as_batch: use "batch operations" style for rendering
+    :param imports: sequence of import symbols to add
+    :param render_item: callable to render items
+    :param migration_context: optional :class:`.MigrationContext`
+    :param user_module_prefix: optional string prefix for user-defined types
+
+     .. versionadded:: 1.11.0
+
+    """
+    opts = {
+        "sqlalchemy_module_prefix": sqlalchemy_module_prefix,
+        "alembic_module_prefix": alembic_module_prefix,
+        "render_item": render_item,
+        "render_as_batch": render_as_batch,
+        "user_module_prefix": user_module_prefix,
+    }
+
+    if migration_context is None:
+        from ..runtime.migration import MigrationContext
+        from sqlalchemy.engine.default import DefaultDialect
+
+        migration_context = MigrationContext.configure(
+            dialect=DefaultDialect()
+        )
+
+    autogen_context = AutogenContext(migration_context, opts=opts)
+    autogen_context.imports = set(imports)
+    return render._indent(
+        render._render_cmd_body(up_or_down_op, autogen_context)
+    )
+
+
+def _render_migration_diffs(
+    context: MigrationContext, template_args: Dict[Any, Any]
+) -> None:
+    """legacy, used by test_autogen_composition at the moment"""
+
+    autogen_context = AutogenContext(context)
+
+    upgrade_ops = ops.UpgradeOps([])
+    compare._produce_net_changes(autogen_context, upgrade_ops)
+
+    migration_script = ops.MigrationScript(
+        rev_id=None,
+        upgrade_ops=upgrade_ops,
+        downgrade_ops=upgrade_ops.reverse(),
+    )
+
+    render._render_python_into_templatevars(
+        autogen_context, migration_script, template_args
+    )
+
+
+class AutogenContext:
+    """Maintains configuration and state that's specific to an
+    autogenerate operation."""
+
+    metadata: Union[MetaData, Sequence[MetaData], None] = None
+    """The :class:`~sqlalchemy.schema.MetaData` object
+    representing the destination.
+
+    This object is the one that is passed within ``env.py``
+    to the :paramref:`.EnvironmentContext.configure.target_metadata`
+    parameter.  It represents the structure of :class:`.Table` and other
+    objects as stated in the current database model, and represents the
+    destination structure for the database being examined.
+
+    While the :class:`~sqlalchemy.schema.MetaData` object is primarily
+    known as a collection of :class:`~sqlalchemy.schema.Table` objects,
+    it also has an :attr:`~sqlalchemy.schema.MetaData.info` dictionary
+    that may be used by end-user schemes to store additional schema-level
+    objects that are to be compared in custom autogeneration schemes.
+
+    """
+
+    connection: Optional[Connection] = None
+    """The :class:`~sqlalchemy.engine.base.Connection` object currently
+    connected to the database backend being compared.
+
+    This is obtained from the :attr:`.MigrationContext.bind` and is
+    ultimately set up in the ``env.py`` script.
+
+    """
+
+    dialect: Optional[Dialect] = None
+    """The :class:`~sqlalchemy.engine.Dialect` object currently in use.
+
+    This is normally obtained from the
+    :attr:`~sqlalchemy.engine.base.Connection.dialect` attribute.
+
+    """
+
+    imports: Set[str] = None  # type: ignore[assignment]
+    """A ``set()`` which contains string Python import directives.
+
+    The directives are to be rendered into the ``${imports}`` section
+    of a script template.  The set is normally empty and can be modified
+    within hooks such as the
+    :paramref:`.EnvironmentContext.configure.render_item` hook.
+
+    .. seealso::
+
+        :ref:`autogen_render_types`
+
+    """
+
+    migration_context: MigrationContext = None  # type: ignore[assignment]
+    """The :class:`.MigrationContext` established by the ``env.py`` script."""
+
+    def __init__(
+        self,
+        migration_context: MigrationContext,
+        metadata: Union[MetaData, Sequence[MetaData], None] = None,
+        opts: Optional[Dict[str, Any]] = None,
+        autogenerate: bool = True,
+    ) -> None:
+        if (
+            autogenerate
+            and migration_context is not None
+            and migration_context.as_sql
+        ):
+            raise util.CommandError(
+                "autogenerate can't use as_sql=True as it prevents querying "
+                "the database for schema information"
+            )
+
+        if opts is None:
+            opts = migration_context.opts
+
+        self.metadata = metadata = (
+            opts.get("target_metadata", None) if metadata is None else metadata
+        )
+
+        if (
+            autogenerate
+            and metadata is None
+            and migration_context is not None
+            and migration_context.script is not None
+        ):
+            raise util.CommandError(
+                "Can't proceed with --autogenerate option; environment "
+                "script %s does not provide "
+                "a MetaData object or sequence of objects to the context."
+                % (migration_context.script.env_py_location)
+            )
+
+        include_object = opts.get("include_object", None)
+        include_name = opts.get("include_name", None)
+
+        object_filters = []
+        name_filters = []
+        if include_object:
+            object_filters.append(include_object)
+        if include_name:
+            name_filters.append(include_name)
+
+        self._object_filters = object_filters
+        self._name_filters = name_filters
+
+        self.migration_context = migration_context
+        if self.migration_context is not None:
+            self.connection = self.migration_context.bind
+            self.dialect = self.migration_context.dialect
+
+        self.imports = set()
+        self.opts: Dict[str, Any] = opts
+        self._has_batch: bool = False
+
+    @util.memoized_property
+    def inspector(self) -> Inspector:
+        if self.connection is None:
+            raise TypeError(
+                "can't return inspector as this "
+                "AutogenContext has no database connection"
+            )
+        return inspect(self.connection)
+
+    @contextlib.contextmanager
+    def _within_batch(self) -> Iterator[None]:
+        self._has_batch = True
+        yield
+        self._has_batch = False
+
+    def run_name_filters(
+        self,
+        name: Optional[str],
+        type_: NameFilterType,
+        parent_names: NameFilterParentNames,
+    ) -> bool:
+        """Run the context's name filters and return True if the targets
+        should be part of the autogenerate operation.
+
+        This method should be run for every kind of name encountered within the
+        reflection side of an autogenerate operation, giving the environment
+        the chance to filter what names should be reflected as database
+        objects.  The filters here are produced directly via the
+        :paramref:`.EnvironmentContext.configure.include_name` parameter.
+
+        """
+        if "schema_name" in parent_names:
+            if type_ == "table":
+                table_name = name
+            else:
+                table_name = parent_names.get("table_name", None)
+            if table_name:
+                schema_name = parent_names["schema_name"]
+                if schema_name:
+                    parent_names["schema_qualified_table_name"] = "%s.%s" % (
+                        schema_name,
+                        table_name,
+                    )
+                else:
+                    parent_names["schema_qualified_table_name"] = table_name
+
+        for fn in self._name_filters:
+            if not fn(name, type_, parent_names):
+                return False
+        else:
+            return True
+
+    def run_object_filters(
+        self,
+        object_: SchemaItem,
+        name: sqla_compat._ConstraintName,
+        type_: NameFilterType,
+        reflected: bool,
+        compare_to: Optional[SchemaItem],
+    ) -> bool:
+        """Run the context's object filters and return True if the targets
+        should be part of the autogenerate operation.
+
+        This method should be run for every kind of object encountered within
+        an autogenerate operation, giving the environment the chance
+        to filter what objects should be included in the comparison.
+        The filters here are produced directly via the
+        :paramref:`.EnvironmentContext.configure.include_object` parameter.
+
+        """
+        for fn in self._object_filters:
+            if not fn(object_, name, type_, reflected, compare_to):
+                return False
+        else:
+            return True
+
+    run_filters = run_object_filters
+
+    @util.memoized_property
+    def sorted_tables(self) -> List[Table]:
+        """Return an aggregate of the :attr:`.MetaData.sorted_tables`
+        collection(s).
+
+        For a sequence of :class:`.MetaData` objects, this
+        concatenates the :attr:`.MetaData.sorted_tables` collection
+        for each individual :class:`.MetaData`  in the order of the
+        sequence.  It does **not** collate the sorted tables collections.
+
+        """
+        result = []
+        for m in util.to_list(self.metadata):
+            result.extend(m.sorted_tables)
+        return result
+
+    @util.memoized_property
+    def table_key_to_table(self) -> Dict[str, Table]:
+        """Return an aggregate  of the :attr:`.MetaData.tables` dictionaries.
+
+        The :attr:`.MetaData.tables` collection is a dictionary of table key
+        to :class:`.Table`; this method aggregates the dictionary across
+        multiple :class:`.MetaData` objects into one dictionary.
+
+        Duplicate table keys are **not** supported; if two :class:`.MetaData`
+        objects contain the same table key, an exception is raised.
+
+        """
+        result: Dict[str, Table] = {}
+        for m in util.to_list(self.metadata):
+            intersect = set(result).intersection(set(m.tables))
+            if intersect:
+                raise ValueError(
+                    "Duplicate table keys across multiple "
+                    "MetaData objects: %s"
+                    % (", ".join('"%s"' % key for key in sorted(intersect)))
+                )
+
+            result.update(m.tables)
+        return result
+
+
+class RevisionContext:
+    """Maintains configuration and state that's specific to a revision
+    file generation operation."""
+
+    generated_revisions: List[MigrationScript]
+    process_revision_directives: Optional[ProcessRevisionDirectiveFn]
+
+    def __init__(
+        self,
+        config: Config,
+        script_directory: ScriptDirectory,
+        command_args: Dict[str, Any],
+        process_revision_directives: Optional[
+            ProcessRevisionDirectiveFn
+        ] = None,
+    ) -> None:
+        self.config = config
+        self.script_directory = script_directory
+        self.command_args = command_args
+        self.process_revision_directives = process_revision_directives
+        self.template_args = {
+            "config": config  # Let templates use config for
+            # e.g. multiple databases
+        }
+        self.generated_revisions = [self._default_revision()]
+
+    def _to_script(
+        self, migration_script: MigrationScript
+    ) -> Optional[Script]:
+        template_args: Dict[str, Any] = self.template_args.copy()
+
+        if getattr(migration_script, "_needs_render", False):
+            autogen_context = self._last_autogen_context
+
+            # clear out existing imports if we are doing multiple
+            # renders
+            autogen_context.imports = set()
+            if migration_script.imports:
+                autogen_context.imports.update(migration_script.imports)
+            render._render_python_into_templatevars(
+                autogen_context, migration_script, template_args
+            )
+
+        assert migration_script.rev_id is not None
+        return self.script_directory.generate_revision(
+            migration_script.rev_id,
+            migration_script.message,
+            refresh=True,
+            head=migration_script.head,
+            splice=migration_script.splice,
+            branch_labels=migration_script.branch_label,
+            version_path=migration_script.version_path,
+            depends_on=migration_script.depends_on,
+            **template_args,
+        )
+
+    def run_autogenerate(
+        self, rev: _GetRevArg, migration_context: MigrationContext
+    ) -> None:
+        self._run_environment(rev, migration_context, True)
+
+    def run_no_autogenerate(
+        self, rev: _GetRevArg, migration_context: MigrationContext
+    ) -> None:
+        self._run_environment(rev, migration_context, False)
+
+    def _run_environment(
+        self,
+        rev: _GetRevArg,
+        migration_context: MigrationContext,
+        autogenerate: bool,
+    ) -> None:
+        if autogenerate:
+            if self.command_args["sql"]:
+                raise util.CommandError(
+                    "Using --sql with --autogenerate does not make any sense"
+                )
+            if set(self.script_directory.get_revisions(rev)) != set(
+                self.script_directory.get_revisions("heads")
+            ):
+                raise util.CommandError("Target database is not up to date.")
+
+        upgrade_token = migration_context.opts["upgrade_token"]
+        downgrade_token = migration_context.opts["downgrade_token"]
+
+        migration_script = self.generated_revisions[-1]
+        if not getattr(migration_script, "_needs_render", False):
+            migration_script.upgrade_ops_list[-1].upgrade_token = upgrade_token
+            migration_script.downgrade_ops_list[-1].downgrade_token = (
+                downgrade_token
+            )
+            migration_script._needs_render = True
+        else:
+            migration_script._upgrade_ops.append(
+                ops.UpgradeOps([], upgrade_token=upgrade_token)
+            )
+            migration_script._downgrade_ops.append(
+                ops.DowngradeOps([], downgrade_token=downgrade_token)
+            )
+
+        autogen_context = AutogenContext(
+            migration_context, autogenerate=autogenerate
+        )
+        self._last_autogen_context: AutogenContext = autogen_context
+
+        if autogenerate:
+            compare._populate_migration_script(
+                autogen_context, migration_script
+            )
+
+        if self.process_revision_directives:
+            self.process_revision_directives(
+                migration_context, rev, self.generated_revisions
+            )
+
+        hook = migration_context.opts["process_revision_directives"]
+        if hook:
+            hook(migration_context, rev, self.generated_revisions)
+
+        for migration_script in self.generated_revisions:
+            migration_script._needs_render = True
+
+    def _default_revision(self) -> MigrationScript:
+        command_args: Dict[str, Any] = self.command_args
+        op = ops.MigrationScript(
+            rev_id=command_args["rev_id"] or util.rev_id(),
+            message=command_args["message"],
+            upgrade_ops=ops.UpgradeOps([]),
+            downgrade_ops=ops.DowngradeOps([]),
+            head=command_args["head"],
+            splice=command_args["splice"],
+            branch_label=command_args["branch_label"],
+            version_path=command_args["version_path"],
+            depends_on=command_args["depends_on"],
+        )
+        return op
+
+    def generate_scripts(self) -> Iterator[Optional[Script]]:
+        for generated_revision in self.generated_revisions:
+            yield self._to_script(generated_revision)