about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/alembic/command.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/alembic/command.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/command.py')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/command.py760
1 files changed, 760 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/command.py b/.venv/lib/python3.12/site-packages/alembic/command.py
new file mode 100644
index 00000000..0ae1d9a8
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/command.py
@@ -0,0 +1,760 @@
+# mypy: allow-untyped-defs, allow-untyped-calls
+
+from __future__ import annotations
+
+import os
+from typing import List
+from typing import Optional
+from typing import TYPE_CHECKING
+from typing import Union
+
+from . import autogenerate as autogen
+from . import util
+from .runtime.environment import EnvironmentContext
+from .script import ScriptDirectory
+
+if TYPE_CHECKING:
+    from alembic.config import Config
+    from alembic.script.base import Script
+    from alembic.script.revision import _RevIdType
+    from .runtime.environment import ProcessRevisionDirectiveFn
+
+
+def list_templates(config: Config) -> None:
+    """List available templates.
+
+    :param config: a :class:`.Config` object.
+
+    """
+
+    config.print_stdout("Available templates:\n")
+    for tempname in os.listdir(config.get_template_directory()):
+        with open(
+            os.path.join(config.get_template_directory(), tempname, "README")
+        ) as readme:
+            synopsis = next(readme).rstrip()
+        config.print_stdout("%s - %s", tempname, synopsis)
+
+    config.print_stdout("\nTemplates are used via the 'init' command, e.g.:")
+    config.print_stdout("\n  alembic init --template generic ./scripts")
+
+
+def init(
+    config: Config,
+    directory: str,
+    template: str = "generic",
+    package: bool = False,
+) -> None:
+    """Initialize a new scripts directory.
+
+    :param config: a :class:`.Config` object.
+
+    :param directory: string path of the target directory.
+
+    :param template: string name of the migration environment template to
+     use.
+
+    :param package: when True, write ``__init__.py`` files into the
+     environment location as well as the versions/ location.
+
+    """
+
+    if os.access(directory, os.F_OK) and os.listdir(directory):
+        raise util.CommandError(
+            "Directory %s already exists and is not empty" % directory
+        )
+
+    template_dir = os.path.join(config.get_template_directory(), template)
+    if not os.access(template_dir, os.F_OK):
+        raise util.CommandError("No such template %r" % template)
+
+    if not os.access(directory, os.F_OK):
+        with util.status(
+            f"Creating directory {os.path.abspath(directory)!r}",
+            **config.messaging_opts,
+        ):
+            os.makedirs(directory)
+
+    versions = os.path.join(directory, "versions")
+    with util.status(
+        f"Creating directory {os.path.abspath(versions)!r}",
+        **config.messaging_opts,
+    ):
+        os.makedirs(versions)
+
+    script = ScriptDirectory(directory)
+
+    config_file: str | None = None
+    for file_ in os.listdir(template_dir):
+        file_path = os.path.join(template_dir, file_)
+        if file_ == "alembic.ini.mako":
+            assert config.config_file_name is not None
+            config_file = os.path.abspath(config.config_file_name)
+            if os.access(config_file, os.F_OK):
+                util.msg(
+                    f"File {config_file!r} already exists, skipping",
+                    **config.messaging_opts,
+                )
+            else:
+                script._generate_template(
+                    file_path, config_file, script_location=directory
+                )
+        elif os.path.isfile(file_path):
+            output_file = os.path.join(directory, file_)
+            script._copy_file(file_path, output_file)
+
+    if package:
+        for path in [
+            os.path.join(os.path.abspath(directory), "__init__.py"),
+            os.path.join(os.path.abspath(versions), "__init__.py"),
+        ]:
+            with util.status(f"Adding {path!r}", **config.messaging_opts):
+                with open(path, "w"):
+                    pass
+
+    assert config_file is not None
+    util.msg(
+        "Please edit configuration/connection/logging "
+        f"settings in {config_file!r} before proceeding.",
+        **config.messaging_opts,
+    )
+
+
+def revision(
+    config: Config,
+    message: Optional[str] = None,
+    autogenerate: bool = False,
+    sql: bool = False,
+    head: str = "head",
+    splice: bool = False,
+    branch_label: Optional[_RevIdType] = None,
+    version_path: Optional[str] = None,
+    rev_id: Optional[str] = None,
+    depends_on: Optional[str] = None,
+    process_revision_directives: Optional[ProcessRevisionDirectiveFn] = None,
+) -> Union[Optional[Script], List[Optional[Script]]]:
+    """Create a new revision file.
+
+    :param config: a :class:`.Config` object.
+
+    :param message: string message to apply to the revision; this is the
+     ``-m`` option to ``alembic revision``.
+
+    :param autogenerate: whether or not to autogenerate the script from
+     the database; this is the ``--autogenerate`` option to
+     ``alembic revision``.
+
+    :param sql: whether to dump the script out as a SQL string; when specified,
+     the script is dumped to stdout.  This is the ``--sql`` option to
+     ``alembic revision``.
+
+    :param head: head revision to build the new revision upon as a parent;
+     this is the ``--head`` option to ``alembic revision``.
+
+    :param splice: whether or not the new revision should be made into a
+     new head of its own; is required when the given ``head`` is not itself
+     a head.  This is the ``--splice`` option to ``alembic revision``.
+
+    :param branch_label: string label to apply to the branch; this is the
+     ``--branch-label`` option to ``alembic revision``.
+
+    :param version_path: string symbol identifying a specific version path
+     from the configuration; this is the ``--version-path`` option to
+     ``alembic revision``.
+
+    :param rev_id: optional revision identifier to use instead of having
+     one generated; this is the ``--rev-id`` option to ``alembic revision``.
+
+    :param depends_on: optional list of "depends on" identifiers; this is the
+     ``--depends-on`` option to ``alembic revision``.
+
+    :param process_revision_directives: this is a callable that takes the
+     same form as the callable described at
+     :paramref:`.EnvironmentContext.configure.process_revision_directives`;
+     will be applied to the structure generated by the revision process
+     where it can be altered programmatically.   Note that unlike all
+     the other parameters, this option is only available via programmatic
+     use of :func:`.command.revision`.
+
+    """
+
+    script_directory = ScriptDirectory.from_config(config)
+
+    command_args = dict(
+        message=message,
+        autogenerate=autogenerate,
+        sql=sql,
+        head=head,
+        splice=splice,
+        branch_label=branch_label,
+        version_path=version_path,
+        rev_id=rev_id,
+        depends_on=depends_on,
+    )
+    revision_context = autogen.RevisionContext(
+        config,
+        script_directory,
+        command_args,
+        process_revision_directives=process_revision_directives,
+    )
+
+    environment = util.asbool(config.get_main_option("revision_environment"))
+
+    if autogenerate:
+        environment = True
+
+        if sql:
+            raise util.CommandError(
+                "Using --sql with --autogenerate does not make any sense"
+            )
+
+        def retrieve_migrations(rev, context):
+            revision_context.run_autogenerate(rev, context)
+            return []
+
+    elif environment:
+
+        def retrieve_migrations(rev, context):
+            revision_context.run_no_autogenerate(rev, context)
+            return []
+
+    elif sql:
+        raise util.CommandError(
+            "Using --sql with the revision command when "
+            "revision_environment is not configured does not make any sense"
+        )
+
+    if environment:
+        with EnvironmentContext(
+            config,
+            script_directory,
+            fn=retrieve_migrations,
+            as_sql=sql,
+            template_args=revision_context.template_args,
+            revision_context=revision_context,
+        ):
+            script_directory.run_env()
+
+        # the revision_context now has MigrationScript structure(s) present.
+        # these could theoretically be further processed / rewritten *here*,
+        # in addition to the hooks present within each run_migrations() call,
+        # or at the end of env.py run_migrations_online().
+
+    scripts = [script for script in revision_context.generate_scripts()]
+    if len(scripts) == 1:
+        return scripts[0]
+    else:
+        return scripts
+
+
+def check(config: "Config") -> None:
+    """Check if revision command with autogenerate has pending upgrade ops.
+
+    :param config: a :class:`.Config` object.
+
+    .. versionadded:: 1.9.0
+
+    """
+
+    script_directory = ScriptDirectory.from_config(config)
+
+    command_args = dict(
+        message=None,
+        autogenerate=True,
+        sql=False,
+        head="head",
+        splice=False,
+        branch_label=None,
+        version_path=None,
+        rev_id=None,
+        depends_on=None,
+    )
+    revision_context = autogen.RevisionContext(
+        config,
+        script_directory,
+        command_args,
+    )
+
+    def retrieve_migrations(rev, context):
+        revision_context.run_autogenerate(rev, context)
+        return []
+
+    with EnvironmentContext(
+        config,
+        script_directory,
+        fn=retrieve_migrations,
+        as_sql=False,
+        template_args=revision_context.template_args,
+        revision_context=revision_context,
+    ):
+        script_directory.run_env()
+
+    # the revision_context now has MigrationScript structure(s) present.
+
+    migration_script = revision_context.generated_revisions[-1]
+    diffs = []
+    for upgrade_ops in migration_script.upgrade_ops_list:
+        diffs.extend(upgrade_ops.as_diffs())
+
+    if diffs:
+        raise util.AutogenerateDiffsDetected(
+            f"New upgrade operations detected: {diffs}",
+            revision_context=revision_context,
+            diffs=diffs,
+        )
+    else:
+        config.print_stdout("No new upgrade operations detected.")
+
+
+def merge(
+    config: Config,
+    revisions: _RevIdType,
+    message: Optional[str] = None,
+    branch_label: Optional[_RevIdType] = None,
+    rev_id: Optional[str] = None,
+) -> Optional[Script]:
+    """Merge two revisions together.  Creates a new migration file.
+
+    :param config: a :class:`.Config` instance
+
+    :param revisions: The revisions to merge.
+
+    :param message: string message to apply to the revision.
+
+    :param branch_label: string label name to apply to the new revision.
+
+    :param rev_id: hardcoded revision identifier instead of generating a new
+     one.
+
+    .. seealso::
+
+        :ref:`branches`
+
+    """
+
+    script = ScriptDirectory.from_config(config)
+    template_args = {
+        "config": config  # Let templates use config for
+        # e.g. multiple databases
+    }
+
+    environment = util.asbool(config.get_main_option("revision_environment"))
+
+    if environment:
+
+        def nothing(rev, context):
+            return []
+
+        with EnvironmentContext(
+            config,
+            script,
+            fn=nothing,
+            as_sql=False,
+            template_args=template_args,
+        ):
+            script.run_env()
+
+    return script.generate_revision(
+        rev_id or util.rev_id(),
+        message,
+        refresh=True,
+        head=revisions,
+        branch_labels=branch_label,
+        **template_args,  # type:ignore[arg-type]
+    )
+
+
+def upgrade(
+    config: Config,
+    revision: str,
+    sql: bool = False,
+    tag: Optional[str] = None,
+) -> None:
+    """Upgrade to a later version.
+
+    :param config: a :class:`.Config` instance.
+
+    :param revision: string revision target or range for --sql mode. May be
+     ``"heads"`` to target the most recent revision(s).
+
+    :param sql: if True, use ``--sql`` mode.
+
+    :param tag: an arbitrary "tag" that can be intercepted by custom
+     ``env.py`` scripts via the :meth:`.EnvironmentContext.get_tag_argument`
+     method.
+
+    """
+
+    script = ScriptDirectory.from_config(config)
+
+    starting_rev = None
+    if ":" in revision:
+        if not sql:
+            raise util.CommandError("Range revision not allowed")
+        starting_rev, revision = revision.split(":", 2)
+
+    def upgrade(rev, context):
+        return script._upgrade_revs(revision, rev)
+
+    with EnvironmentContext(
+        config,
+        script,
+        fn=upgrade,
+        as_sql=sql,
+        starting_rev=starting_rev,
+        destination_rev=revision,
+        tag=tag,
+    ):
+        script.run_env()
+
+
+def downgrade(
+    config: Config,
+    revision: str,
+    sql: bool = False,
+    tag: Optional[str] = None,
+) -> None:
+    """Revert to a previous version.
+
+    :param config: a :class:`.Config` instance.
+
+    :param revision: string revision target or range for --sql mode. May
+     be ``"base"`` to target the first revision.
+
+    :param sql: if True, use ``--sql`` mode.
+
+    :param tag: an arbitrary "tag" that can be intercepted by custom
+     ``env.py`` scripts via the :meth:`.EnvironmentContext.get_tag_argument`
+     method.
+
+    """
+
+    script = ScriptDirectory.from_config(config)
+    starting_rev = None
+    if ":" in revision:
+        if not sql:
+            raise util.CommandError("Range revision not allowed")
+        starting_rev, revision = revision.split(":", 2)
+    elif sql:
+        raise util.CommandError(
+            "downgrade with --sql requires <fromrev>:<torev>"
+        )
+
+    def downgrade(rev, context):
+        return script._downgrade_revs(revision, rev)
+
+    with EnvironmentContext(
+        config,
+        script,
+        fn=downgrade,
+        as_sql=sql,
+        starting_rev=starting_rev,
+        destination_rev=revision,
+        tag=tag,
+    ):
+        script.run_env()
+
+
+def show(config: Config, rev: str) -> None:
+    """Show the revision(s) denoted by the given symbol.
+
+    :param config: a :class:`.Config` instance.
+
+    :param rev: string revision target. May be ``"current"`` to show the
+     revision(s) currently applied in the database.
+
+    """
+
+    script = ScriptDirectory.from_config(config)
+
+    if rev == "current":
+
+        def show_current(rev, context):
+            for sc in script.get_revisions(rev):
+                config.print_stdout(sc.log_entry)
+            return []
+
+        with EnvironmentContext(config, script, fn=show_current):
+            script.run_env()
+    else:
+        for sc in script.get_revisions(rev):
+            config.print_stdout(sc.log_entry)
+
+
+def history(
+    config: Config,
+    rev_range: Optional[str] = None,
+    verbose: bool = False,
+    indicate_current: bool = False,
+) -> None:
+    """List changeset scripts in chronological order.
+
+    :param config: a :class:`.Config` instance.
+
+    :param rev_range: string revision range.
+
+    :param verbose: output in verbose mode.
+
+    :param indicate_current: indicate current revision.
+
+    """
+    base: Optional[str]
+    head: Optional[str]
+    script = ScriptDirectory.from_config(config)
+    if rev_range is not None:
+        if ":" not in rev_range:
+            raise util.CommandError(
+                "History range requires [start]:[end], " "[start]:, or :[end]"
+            )
+        base, head = rev_range.strip().split(":")
+    else:
+        base = head = None
+
+    environment = (
+        util.asbool(config.get_main_option("revision_environment"))
+        or indicate_current
+    )
+
+    def _display_history(config, script, base, head, currents=()):
+        for sc in script.walk_revisions(
+            base=base or "base", head=head or "heads"
+        ):
+            if indicate_current:
+                sc._db_current_indicator = sc.revision in currents
+
+            config.print_stdout(
+                sc.cmd_format(
+                    verbose=verbose,
+                    include_branches=True,
+                    include_doc=True,
+                    include_parents=True,
+                )
+            )
+
+    def _display_history_w_current(config, script, base, head):
+        def _display_current_history(rev, context):
+            if head == "current":
+                _display_history(config, script, base, rev, rev)
+            elif base == "current":
+                _display_history(config, script, rev, head, rev)
+            else:
+                _display_history(config, script, base, head, rev)
+            return []
+
+        with EnvironmentContext(config, script, fn=_display_current_history):
+            script.run_env()
+
+    if base == "current" or head == "current" or environment:
+        _display_history_w_current(config, script, base, head)
+    else:
+        _display_history(config, script, base, head)
+
+
+def heads(
+    config: Config, verbose: bool = False, resolve_dependencies: bool = False
+) -> None:
+    """Show current available heads in the script directory.
+
+    :param config: a :class:`.Config` instance.
+
+    :param verbose: output in verbose mode.
+
+    :param resolve_dependencies: treat dependency version as down revisions.
+
+    """
+
+    script = ScriptDirectory.from_config(config)
+    if resolve_dependencies:
+        heads = script.get_revisions("heads")
+    else:
+        heads = script.get_revisions(script.get_heads())
+
+    for rev in heads:
+        config.print_stdout(
+            rev.cmd_format(
+                verbose, include_branches=True, tree_indicators=False
+            )
+        )
+
+
+def branches(config: Config, verbose: bool = False) -> None:
+    """Show current branch points.
+
+    :param config: a :class:`.Config` instance.
+
+    :param verbose: output in verbose mode.
+
+    """
+    script = ScriptDirectory.from_config(config)
+    for sc in script.walk_revisions():
+        if sc.is_branch_point:
+            config.print_stdout(
+                "%s\n%s\n",
+                sc.cmd_format(verbose, include_branches=True),
+                "\n".join(
+                    "%s -> %s"
+                    % (
+                        " " * len(str(sc.revision)),
+                        rev_obj.cmd_format(
+                            False, include_branches=True, include_doc=verbose
+                        ),
+                    )
+                    for rev_obj in (
+                        script.get_revision(rev) for rev in sc.nextrev
+                    )
+                ),
+            )
+
+
+def current(config: Config, verbose: bool = False) -> None:
+    """Display the current revision for a database.
+
+    :param config: a :class:`.Config` instance.
+
+    :param verbose: output in verbose mode.
+
+    """
+
+    script = ScriptDirectory.from_config(config)
+
+    def display_version(rev, context):
+        if verbose:
+            config.print_stdout(
+                "Current revision(s) for %s:",
+                util.obfuscate_url_pw(context.connection.engine.url),
+            )
+        for rev in script.get_all_current(rev):
+            config.print_stdout(rev.cmd_format(verbose))
+
+        return []
+
+    with EnvironmentContext(
+        config, script, fn=display_version, dont_mutate=True
+    ):
+        script.run_env()
+
+
+def stamp(
+    config: Config,
+    revision: _RevIdType,
+    sql: bool = False,
+    tag: Optional[str] = None,
+    purge: bool = False,
+) -> None:
+    """'stamp' the revision table with the given revision; don't
+    run any migrations.
+
+    :param config: a :class:`.Config` instance.
+
+    :param revision: target revision or list of revisions.   May be a list
+     to indicate stamping of multiple branch heads; may be ``"base"``
+     to remove all revisions from the table or ``"heads"`` to stamp the
+     most recent revision(s).
+
+     .. note:: this parameter is called "revisions" in the command line
+        interface.
+
+    :param sql: use ``--sql`` mode
+
+    :param tag: an arbitrary "tag" that can be intercepted by custom
+     ``env.py`` scripts via the :class:`.EnvironmentContext.get_tag_argument`
+     method.
+
+    :param purge: delete all entries in the version table before stamping.
+
+    """
+
+    script = ScriptDirectory.from_config(config)
+
+    if sql:
+        destination_revs = []
+        starting_rev = None
+        for _revision in util.to_list(revision):
+            if ":" in _revision:
+                srev, _revision = _revision.split(":", 2)
+
+                if starting_rev != srev:
+                    if starting_rev is None:
+                        starting_rev = srev
+                    else:
+                        raise util.CommandError(
+                            "Stamp operation with --sql only supports a "
+                            "single starting revision at a time"
+                        )
+            destination_revs.append(_revision)
+    else:
+        destination_revs = util.to_list(revision)
+
+    def do_stamp(rev, context):
+        return script._stamp_revs(util.to_tuple(destination_revs), rev)
+
+    with EnvironmentContext(
+        config,
+        script,
+        fn=do_stamp,
+        as_sql=sql,
+        starting_rev=starting_rev if sql else None,
+        destination_rev=util.to_tuple(destination_revs),
+        tag=tag,
+        purge=purge,
+    ):
+        script.run_env()
+
+
+def edit(config: Config, rev: str) -> None:
+    """Edit revision script(s) using $EDITOR.
+
+    :param config: a :class:`.Config` instance.
+
+    :param rev: target revision.
+
+    """
+
+    script = ScriptDirectory.from_config(config)
+
+    if rev == "current":
+
+        def edit_current(rev, context):
+            if not rev:
+                raise util.CommandError("No current revisions")
+            for sc in script.get_revisions(rev):
+                util.open_in_editor(sc.path)
+            return []
+
+        with EnvironmentContext(config, script, fn=edit_current):
+            script.run_env()
+    else:
+        revs = script.get_revisions(rev)
+        if not revs:
+            raise util.CommandError(
+                "No revision files indicated by symbol '%s'" % rev
+            )
+        for sc in revs:
+            assert sc
+            util.open_in_editor(sc.path)
+
+
+def ensure_version(config: Config, sql: bool = False) -> None:
+    """Create the alembic version table if it doesn't exist already .
+
+    :param config: a :class:`.Config` instance.
+
+    :param sql: use ``--sql`` mode.
+
+     .. versionadded:: 1.7.6
+
+    """
+
+    script = ScriptDirectory.from_config(config)
+
+    def do_ensure_version(rev, context):
+        context._ensure_version_table()
+        return []
+
+    with EnvironmentContext(
+        config,
+        script,
+        fn=do_ensure_version,
+        as_sql=sql,
+    ):
+        script.run_env()