aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/alembic/testing/env.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/testing/env.py')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/testing/env.py502
1 files changed, 502 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/testing/env.py b/.venv/lib/python3.12/site-packages/alembic/testing/env.py
new file mode 100644
index 00000000..9a457b7f
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/testing/env.py
@@ -0,0 +1,502 @@
+import importlib.machinery
+import os
+from pathlib import Path
+import shutil
+import textwrap
+
+from sqlalchemy.testing import config
+from sqlalchemy.testing import provision
+
+from . import util as testing_util
+from .. import command
+from .. import script
+from .. import util
+from ..script import Script
+from ..script import ScriptDirectory
+
+
+def _get_staging_directory():
+ if provision.FOLLOWER_IDENT:
+ return f"scratch_{provision.FOLLOWER_IDENT}"
+ else:
+ return "scratch"
+
+
+def staging_env(create=True, template="generic", sourceless=False):
+ cfg = _testing_config()
+ if create:
+ path = _join_path(_get_staging_directory(), "scripts")
+ assert not os.path.exists(path), (
+ "staging directory %s already exists; poor cleanup?" % path
+ )
+
+ command.init(cfg, path, template=template)
+ if sourceless:
+ try:
+ # do an import so that a .pyc/.pyo is generated.
+ util.load_python_file(path, "env.py")
+ except AttributeError:
+ # we don't have the migration context set up yet
+ # so running the .env py throws this exception.
+ # theoretically we could be using py_compiler here to
+ # generate .pyc/.pyo without importing but not really
+ # worth it.
+ pass
+ assert sourceless in (
+ "pep3147_envonly",
+ "simple",
+ "pep3147_everything",
+ ), sourceless
+ make_sourceless(
+ _join_path(path, "env.py"),
+ "pep3147" if "pep3147" in sourceless else "simple",
+ )
+
+ sc = script.ScriptDirectory.from_config(cfg)
+ return sc
+
+
+def clear_staging_env():
+ from sqlalchemy.testing import engines
+
+ engines.testing_reaper.close_all()
+ shutil.rmtree(_get_staging_directory(), True)
+
+
+def script_file_fixture(txt):
+ dir_ = _join_path(_get_staging_directory(), "scripts")
+ path = _join_path(dir_, "script.py.mako")
+ with open(path, "w") as f:
+ f.write(txt)
+
+
+def env_file_fixture(txt):
+ dir_ = _join_path(_get_staging_directory(), "scripts")
+ txt = (
+ """
+from alembic import context
+
+config = context.config
+"""
+ + txt
+ )
+
+ path = _join_path(dir_, "env.py")
+ pyc_path = util.pyc_file_from_path(path)
+ if pyc_path:
+ os.unlink(pyc_path)
+
+ with open(path, "w") as f:
+ f.write(txt)
+
+
+def _sqlite_file_db(tempname="foo.db", future=False, scope=None, **options):
+ dir_ = _join_path(_get_staging_directory(), "scripts")
+ url = "sqlite:///%s/%s" % (dir_, tempname)
+ if scope:
+ options["scope"] = scope
+ return testing_util.testing_engine(url=url, future=future, options=options)
+
+
+def _sqlite_testing_config(sourceless=False, future=False):
+ dir_ = _join_path(_get_staging_directory(), "scripts")
+ url = f"sqlite:///{dir_}/foo.db"
+
+ sqlalchemy_future = future or ("future" in config.db.__class__.__module__)
+
+ return _write_config_file(
+ f"""
+[alembic]
+script_location = {dir_}
+sqlalchemy.url = {url}
+sourceless = {"true" if sourceless else "false"}
+{"sqlalchemy.future = true" if sqlalchemy_future else ""}
+
+[loggers]
+keys = root,sqlalchemy
+
+[handlers]
+keys = console
+
+[logger_root]
+level = WARNING
+handlers = console
+qualname =
+
+[logger_sqlalchemy]
+level = DEBUG
+handlers =
+qualname = sqlalchemy.engine
+
+[handler_console]
+class = StreamHandler
+args = (sys.stderr,)
+level = NOTSET
+formatter = generic
+
+[formatters]
+keys = generic
+
+[formatter_generic]
+format = %%(levelname)-5.5s [%%(name)s] %%(message)s
+datefmt = %%H:%%M:%%S
+ """
+ )
+
+
+def _multi_dir_testing_config(sourceless=False, extra_version_location=""):
+ dir_ = _join_path(_get_staging_directory(), "scripts")
+ sqlalchemy_future = "future" in config.db.__class__.__module__
+
+ url = "sqlite:///%s/foo.db" % dir_
+
+ return _write_config_file(
+ f"""
+[alembic]
+script_location = {dir_}
+sqlalchemy.url = {url}
+sqlalchemy.future = {"true" if sqlalchemy_future else "false"}
+sourceless = {"true" if sourceless else "false"}
+version_locations = %(here)s/model1/ %(here)s/model2/ %(here)s/model3/ \
+{extra_version_location}
+
+[loggers]
+keys = root
+
+[handlers]
+keys = console
+
+[logger_root]
+level = WARNING
+handlers = console
+qualname =
+
+[handler_console]
+class = StreamHandler
+args = (sys.stderr,)
+level = NOTSET
+formatter = generic
+
+[formatters]
+keys = generic
+
+[formatter_generic]
+format = %%(levelname)-5.5s [%%(name)s] %%(message)s
+datefmt = %%H:%%M:%%S
+ """
+ )
+
+
+def _no_sql_testing_config(dialect="postgresql", directives=""):
+ """use a postgresql url with no host so that
+ connections guaranteed to fail"""
+ dir_ = _join_path(_get_staging_directory(), "scripts")
+ return _write_config_file(
+ f"""
+[alembic]
+script_location ={dir_}
+sqlalchemy.url = {dialect}://
+{directives}
+
+[loggers]
+keys = root
+
+[handlers]
+keys = console
+
+[logger_root]
+level = WARNING
+handlers = console
+qualname =
+
+[handler_console]
+class = StreamHandler
+args = (sys.stderr,)
+level = NOTSET
+formatter = generic
+
+[formatters]
+keys = generic
+
+[formatter_generic]
+format = %%(levelname)-5.5s [%%(name)s] %%(message)s
+datefmt = %%H:%%M:%%S
+
+"""
+ )
+
+
+def _write_config_file(text):
+ cfg = _testing_config()
+ with open(cfg.config_file_name, "w") as f:
+ f.write(text)
+ return cfg
+
+
+def _testing_config():
+ from alembic.config import Config
+
+ if not os.access(_get_staging_directory(), os.F_OK):
+ os.mkdir(_get_staging_directory())
+ return Config(_join_path(_get_staging_directory(), "test_alembic.ini"))
+
+
+def write_script(
+ scriptdir, rev_id, content, encoding="ascii", sourceless=False
+):
+ old = scriptdir.revision_map.get_revision(rev_id)
+ path = old.path
+
+ content = textwrap.dedent(content)
+ if encoding:
+ content = content.encode(encoding)
+ with open(path, "wb") as fp:
+ fp.write(content)
+ pyc_path = util.pyc_file_from_path(path)
+ if pyc_path:
+ os.unlink(pyc_path)
+ script = Script._from_path(scriptdir, path)
+ old = scriptdir.revision_map.get_revision(script.revision)
+ if old.down_revision != script.down_revision:
+ raise Exception("Can't change down_revision on a refresh operation.")
+ scriptdir.revision_map.add_revision(script, _replace=True)
+
+ if sourceless:
+ make_sourceless(
+ path, "pep3147" if sourceless == "pep3147_everything" else "simple"
+ )
+
+
+def make_sourceless(path, style):
+ import py_compile
+
+ py_compile.compile(path)
+
+ if style == "simple":
+ pyc_path = util.pyc_file_from_path(path)
+ suffix = importlib.machinery.BYTECODE_SUFFIXES[0]
+ filepath, ext = os.path.splitext(path)
+ simple_pyc_path = filepath + suffix
+ shutil.move(pyc_path, simple_pyc_path)
+ pyc_path = simple_pyc_path
+ else:
+ assert style in ("pep3147", "simple")
+ pyc_path = util.pyc_file_from_path(path)
+
+ assert os.access(pyc_path, os.F_OK)
+
+ os.unlink(path)
+
+
+def three_rev_fixture(cfg):
+ a = util.rev_id()
+ b = util.rev_id()
+ c = util.rev_id()
+
+ script = ScriptDirectory.from_config(cfg)
+ script.generate_revision(a, "revision a", refresh=True, head="base")
+ write_script(
+ script,
+ a,
+ f"""\
+"Rev A"
+revision = '{a}'
+down_revision = None
+
+from alembic import op
+
+
+def upgrade():
+ op.execute("CREATE STEP 1")
+
+
+def downgrade():
+ op.execute("DROP STEP 1")
+
+""",
+ )
+
+ script.generate_revision(b, "revision b", refresh=True, head=a)
+ write_script(
+ script,
+ b,
+ f"""# coding: utf-8
+"Rev B, méil, %3"
+revision = '{b}'
+down_revision = '{a}'
+
+from alembic import op
+
+
+def upgrade():
+ op.execute("CREATE STEP 2")
+
+
+def downgrade():
+ op.execute("DROP STEP 2")
+
+""",
+ encoding="utf-8",
+ )
+
+ script.generate_revision(c, "revision c", refresh=True, head=b)
+ write_script(
+ script,
+ c,
+ f"""\
+"Rev C"
+revision = '{c}'
+down_revision = '{b}'
+
+from alembic import op
+
+
+def upgrade():
+ op.execute("CREATE STEP 3")
+
+
+def downgrade():
+ op.execute("DROP STEP 3")
+
+""",
+ )
+ return a, b, c
+
+
+def multi_heads_fixture(cfg, a, b, c):
+ """Create a multiple head fixture from the three-revs fixture"""
+
+ # a->b->c
+ # -> d -> e
+ # -> f
+ d = util.rev_id()
+ e = util.rev_id()
+ f = util.rev_id()
+
+ script = ScriptDirectory.from_config(cfg)
+ script.generate_revision(
+ d, "revision d from b", head=b, splice=True, refresh=True
+ )
+ write_script(
+ script,
+ d,
+ f"""\
+"Rev D"
+revision = '{d}'
+down_revision = '{b}'
+
+from alembic import op
+
+
+def upgrade():
+ op.execute("CREATE STEP 4")
+
+
+def downgrade():
+ op.execute("DROP STEP 4")
+
+""",
+ )
+
+ script.generate_revision(
+ e, "revision e from d", head=d, splice=True, refresh=True
+ )
+ write_script(
+ script,
+ e,
+ f"""\
+"Rev E"
+revision = '{e}'
+down_revision = '{d}'
+
+from alembic import op
+
+
+def upgrade():
+ op.execute("CREATE STEP 5")
+
+
+def downgrade():
+ op.execute("DROP STEP 5")
+
+""",
+ )
+
+ script.generate_revision(
+ f, "revision f from b", head=b, splice=True, refresh=True
+ )
+ write_script(
+ script,
+ f,
+ f"""\
+"Rev F"
+revision = '{f}'
+down_revision = '{b}'
+
+from alembic import op
+
+
+def upgrade():
+ op.execute("CREATE STEP 6")
+
+
+def downgrade():
+ op.execute("DROP STEP 6")
+
+""",
+ )
+
+ return d, e, f
+
+
+def _multidb_testing_config(engines):
+ """alembic.ini fixture to work exactly with the 'multidb' template"""
+
+ dir_ = _join_path(_get_staging_directory(), "scripts")
+
+ sqlalchemy_future = "future" in config.db.__class__.__module__
+
+ databases = ", ".join(engines.keys())
+ engines = "\n\n".join(
+ f"[{key}]\nsqlalchemy.url = {value.url}"
+ for key, value in engines.items()
+ )
+
+ return _write_config_file(
+ f"""
+[alembic]
+script_location = {dir_}
+sourceless = false
+sqlalchemy.future = {"true" if sqlalchemy_future else "false"}
+databases = {databases}
+
+{engines}
+[loggers]
+keys = root
+
+[handlers]
+keys = console
+
+[logger_root]
+level = WARNING
+handlers = console
+qualname =
+
+[handler_console]
+class = StreamHandler
+args = (sys.stderr,)
+level = NOTSET
+formatter = generic
+
+[formatters]
+keys = generic
+
+[formatter_generic]
+format = %%(levelname)-5.5s [%%(name)s] %%(message)s
+datefmt = %%H:%%M:%%S
+ """
+ )
+
+
+def _join_path(base: str, *more: str):
+ return str(Path(base).joinpath(*more).as_posix())