about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/alembic/testing/suite/test_environment.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/alembic/testing/suite/test_environment.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/testing/suite/test_environment.py')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/testing/suite/test_environment.py364
1 files changed, 364 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/testing/suite/test_environment.py b/.venv/lib/python3.12/site-packages/alembic/testing/suite/test_environment.py
new file mode 100644
index 00000000..df2d9afb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/testing/suite/test_environment.py
@@ -0,0 +1,364 @@
+import io
+
+from ...migration import MigrationContext
+from ...testing import assert_raises
+from ...testing import config
+from ...testing import eq_
+from ...testing import is_
+from ...testing import is_false
+from ...testing import is_not_
+from ...testing import is_true
+from ...testing import ne_
+from ...testing.fixtures import TestBase
+
+
+class MigrationTransactionTest(TestBase):
+    __backend__ = True
+
+    conn = None
+
+    def _fixture(self, opts):
+        self.conn = conn = config.db.connect()
+
+        if opts.get("as_sql", False):
+            self.context = MigrationContext.configure(
+                dialect=conn.dialect, opts=opts
+            )
+            self.context.output_buffer = self.context.impl.output_buffer = (
+                io.StringIO()
+            )
+        else:
+            self.context = MigrationContext.configure(
+                connection=conn, opts=opts
+            )
+        return self.context
+
+    def teardown_method(self):
+        if self.conn:
+            self.conn.close()
+
+    def test_proxy_transaction_rollback(self):
+        context = self._fixture(
+            {"transaction_per_migration": True, "transactional_ddl": True}
+        )
+
+        is_false(self.conn.in_transaction())
+        proxy = context.begin_transaction(_per_migration=True)
+        is_true(self.conn.in_transaction())
+        proxy.rollback()
+        is_false(self.conn.in_transaction())
+
+    def test_proxy_transaction_commit(self):
+        context = self._fixture(
+            {"transaction_per_migration": True, "transactional_ddl": True}
+        )
+        proxy = context.begin_transaction(_per_migration=True)
+        is_true(self.conn.in_transaction())
+        proxy.commit()
+        is_false(self.conn.in_transaction())
+
+    def test_proxy_transaction_contextmanager_commit(self):
+        context = self._fixture(
+            {"transaction_per_migration": True, "transactional_ddl": True}
+        )
+        proxy = context.begin_transaction(_per_migration=True)
+        is_true(self.conn.in_transaction())
+        with proxy:
+            pass
+        is_false(self.conn.in_transaction())
+
+    def test_proxy_transaction_contextmanager_rollback(self):
+        context = self._fixture(
+            {"transaction_per_migration": True, "transactional_ddl": True}
+        )
+        proxy = context.begin_transaction(_per_migration=True)
+        is_true(self.conn.in_transaction())
+
+        def go():
+            with proxy:
+                raise Exception("hi")
+
+        assert_raises(Exception, go)
+        is_false(self.conn.in_transaction())
+
+    def test_proxy_transaction_contextmanager_explicit_rollback(self):
+        context = self._fixture(
+            {"transaction_per_migration": True, "transactional_ddl": True}
+        )
+        proxy = context.begin_transaction(_per_migration=True)
+        is_true(self.conn.in_transaction())
+
+        with proxy:
+            is_true(self.conn.in_transaction())
+            proxy.rollback()
+            is_false(self.conn.in_transaction())
+
+        is_false(self.conn.in_transaction())
+
+    def test_proxy_transaction_contextmanager_explicit_commit(self):
+        context = self._fixture(
+            {"transaction_per_migration": True, "transactional_ddl": True}
+        )
+        proxy = context.begin_transaction(_per_migration=True)
+        is_true(self.conn.in_transaction())
+
+        with proxy:
+            is_true(self.conn.in_transaction())
+            proxy.commit()
+            is_false(self.conn.in_transaction())
+
+        is_false(self.conn.in_transaction())
+
+    def test_transaction_per_migration_transactional_ddl(self):
+        context = self._fixture(
+            {"transaction_per_migration": True, "transactional_ddl": True}
+        )
+
+        is_false(self.conn.in_transaction())
+
+        with context.begin_transaction():
+            is_false(self.conn.in_transaction())
+            with context.begin_transaction(_per_migration=True):
+                is_true(self.conn.in_transaction())
+
+            is_false(self.conn.in_transaction())
+        is_false(self.conn.in_transaction())
+
+    def test_transaction_per_migration_non_transactional_ddl(self):
+        context = self._fixture(
+            {"transaction_per_migration": True, "transactional_ddl": False}
+        )
+
+        is_false(self.conn.in_transaction())
+
+        with context.begin_transaction():
+            is_false(self.conn.in_transaction())
+            with context.begin_transaction(_per_migration=True):
+                is_true(self.conn.in_transaction())
+
+            is_false(self.conn.in_transaction())
+        is_false(self.conn.in_transaction())
+
+    def test_transaction_per_all_transactional_ddl(self):
+        context = self._fixture({"transactional_ddl": True})
+
+        is_false(self.conn.in_transaction())
+
+        with context.begin_transaction():
+            is_true(self.conn.in_transaction())
+            with context.begin_transaction(_per_migration=True):
+                is_true(self.conn.in_transaction())
+
+            is_true(self.conn.in_transaction())
+        is_false(self.conn.in_transaction())
+
+    def test_transaction_per_all_non_transactional_ddl(self):
+        context = self._fixture({"transactional_ddl": False})
+
+        is_false(self.conn.in_transaction())
+
+        with context.begin_transaction():
+            is_false(self.conn.in_transaction())
+            with context.begin_transaction(_per_migration=True):
+                is_true(self.conn.in_transaction())
+
+            is_false(self.conn.in_transaction())
+        is_false(self.conn.in_transaction())
+
+    def test_transaction_per_all_sqlmode(self):
+        context = self._fixture({"as_sql": True})
+
+        context.execute("step 1")
+        with context.begin_transaction():
+            context.execute("step 2")
+            with context.begin_transaction(_per_migration=True):
+                context.execute("step 3")
+
+            context.execute("step 4")
+        context.execute("step 5")
+
+        if context.impl.transactional_ddl:
+            self._assert_impl_steps(
+                "step 1",
+                "BEGIN",
+                "step 2",
+                "step 3",
+                "step 4",
+                "COMMIT",
+                "step 5",
+            )
+        else:
+            self._assert_impl_steps(
+                "step 1", "step 2", "step 3", "step 4", "step 5"
+            )
+
+    def test_transaction_per_migration_sqlmode(self):
+        context = self._fixture(
+            {"as_sql": True, "transaction_per_migration": True}
+        )
+
+        context.execute("step 1")
+        with context.begin_transaction():
+            context.execute("step 2")
+            with context.begin_transaction(_per_migration=True):
+                context.execute("step 3")
+
+            context.execute("step 4")
+        context.execute("step 5")
+
+        if context.impl.transactional_ddl:
+            self._assert_impl_steps(
+                "step 1",
+                "step 2",
+                "BEGIN",
+                "step 3",
+                "COMMIT",
+                "step 4",
+                "step 5",
+            )
+        else:
+            self._assert_impl_steps(
+                "step 1", "step 2", "step 3", "step 4", "step 5"
+            )
+
+    @config.requirements.autocommit_isolation
+    def test_autocommit_block(self):
+        context = self._fixture({"transaction_per_migration": True})
+
+        is_false(self.conn.in_transaction())
+
+        with context.begin_transaction():
+            is_false(self.conn.in_transaction())
+            with context.begin_transaction(_per_migration=True):
+                is_true(self.conn.in_transaction())
+
+                with context.autocommit_block():
+                    # in 1.x, self.conn is separate due to the
+                    # execution_options call.  however for future they are the
+                    # same connection and there is a "transaction" block
+                    # despite autocommit
+                    if self.is_sqlalchemy_future:
+                        is_(context.connection, self.conn)
+                    else:
+                        is_not_(context.connection, self.conn)
+                        is_false(self.conn.in_transaction())
+
+                    eq_(
+                        context.connection._execution_options[
+                            "isolation_level"
+                        ],
+                        "AUTOCOMMIT",
+                    )
+
+                ne_(
+                    context.connection._execution_options.get(
+                        "isolation_level", None
+                    ),
+                    "AUTOCOMMIT",
+                )
+                is_true(self.conn.in_transaction())
+
+            is_false(self.conn.in_transaction())
+        is_false(self.conn.in_transaction())
+
+    @config.requirements.autocommit_isolation
+    def test_autocommit_block_no_transaction(self):
+        context = self._fixture({"transaction_per_migration": True})
+
+        is_false(self.conn.in_transaction())
+
+        with context.autocommit_block():
+            is_true(context.connection.in_transaction())
+
+            # in 1.x, self.conn is separate due to the execution_options
+            # call.  however for future they are the same connection and there
+            # is a "transaction" block despite autocommit
+            if self.is_sqlalchemy_future:
+                is_(context.connection, self.conn)
+            else:
+                is_not_(context.connection, self.conn)
+                is_false(self.conn.in_transaction())
+
+            eq_(
+                context.connection._execution_options["isolation_level"],
+                "AUTOCOMMIT",
+            )
+
+        ne_(
+            context.connection._execution_options.get("isolation_level", None),
+            "AUTOCOMMIT",
+        )
+
+        is_false(self.conn.in_transaction())
+
+    def test_autocommit_block_transactional_ddl_sqlmode(self):
+        context = self._fixture(
+            {
+                "transaction_per_migration": True,
+                "transactional_ddl": True,
+                "as_sql": True,
+            }
+        )
+
+        with context.begin_transaction():
+            context.execute("step 1")
+            with context.begin_transaction(_per_migration=True):
+                context.execute("step 2")
+
+                with context.autocommit_block():
+                    context.execute("step 3")
+
+                context.execute("step 4")
+
+            context.execute("step 5")
+
+        self._assert_impl_steps(
+            "step 1",
+            "BEGIN",
+            "step 2",
+            "COMMIT",
+            "step 3",
+            "BEGIN",
+            "step 4",
+            "COMMIT",
+            "step 5",
+        )
+
+    def test_autocommit_block_nontransactional_ddl_sqlmode(self):
+        context = self._fixture(
+            {
+                "transaction_per_migration": True,
+                "transactional_ddl": False,
+                "as_sql": True,
+            }
+        )
+
+        with context.begin_transaction():
+            context.execute("step 1")
+            with context.begin_transaction(_per_migration=True):
+                context.execute("step 2")
+
+                with context.autocommit_block():
+                    context.execute("step 3")
+
+                context.execute("step 4")
+
+            context.execute("step 5")
+
+        self._assert_impl_steps(
+            "step 1", "step 2", "step 3", "step 4", "step 5"
+        )
+
+    def _assert_impl_steps(self, *steps):
+        to_check = self.context.output_buffer.getvalue()
+
+        self.context.impl.output_buffer = buf = io.StringIO()
+        for step in steps:
+            if step == "BEGIN":
+                self.context.impl.emit_begin()
+            elif step == "COMMIT":
+                self.context.impl.emit_commit()
+            else:
+                self.context.impl._exec(step)
+
+        eq_(to_check, buf.getvalue())