about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/aiosqlite/tests/smoke.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/aiosqlite/tests/smoke.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/aiosqlite/tests/smoke.py')
-rw-r--r--.venv/lib/python3.12/site-packages/aiosqlite/tests/smoke.py452
1 files changed, 452 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/aiosqlite/tests/smoke.py b/.venv/lib/python3.12/site-packages/aiosqlite/tests/smoke.py
new file mode 100644
index 00000000..f42106c7
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/aiosqlite/tests/smoke.py
@@ -0,0 +1,452 @@
+# Copyright 2022 Amethyst Reese
+# Licensed under the MIT license
+import asyncio
+import sqlite3
+from pathlib import Path
+from sqlite3 import OperationalError
+from threading import Thread
+from unittest import IsolatedAsyncioTestCase as TestCase, SkipTest
+
+import aiosqlite
+from .helpers import setup_logger
+
+TEST_DB = Path("test.db")
+
+# pypy uses non-standard text factory for low-level sqlite implementation
+try:
+    from _sqlite3 import _unicode_text_factory as default_text_factory
+except ImportError:
+    default_text_factory = str
+
+
+class SmokeTest(TestCase):
+    @classmethod
+    def setUpClass(cls):
+        setup_logger()
+
+    def setUp(self):
+        if TEST_DB.exists():
+            TEST_DB.unlink()
+
+    def tearDown(self):
+        if TEST_DB.exists():
+            TEST_DB.unlink()
+
+    async def test_connection_await(self):
+        db = await aiosqlite.connect(TEST_DB)
+        self.assertIsInstance(db, aiosqlite.Connection)
+
+        async with db.execute("select 1, 2") as cursor:
+            rows = await cursor.fetchall()
+            self.assertEqual(rows, [(1, 2)])
+
+        await db.close()
+
+    async def test_connection_context(self):
+        async with aiosqlite.connect(TEST_DB) as db:
+            self.assertIsInstance(db, aiosqlite.Connection)
+
+            async with db.execute("select 1, 2") as cursor:
+                rows = await cursor.fetchall()
+                self.assertEqual(rows, [(1, 2)])
+
+    async def test_connection_locations(self):
+        class Fake:  # pylint: disable=too-few-public-methods
+            def __str__(self):
+                return str(TEST_DB)
+
+        locs = ("test.db", b"test.db", Path("test.db"), Fake())
+
+        async with aiosqlite.connect(TEST_DB) as db:
+            await db.execute("create table foo (i integer, k integer)")
+            await db.execute("insert into foo (i, k) values (1, 5)")
+            await db.commit()
+
+            cursor = await db.execute("select * from foo")
+            rows = await cursor.fetchall()
+
+        for loc in locs:
+            async with aiosqlite.connect(loc) as db:
+                cursor = await db.execute("select * from foo")
+                self.assertEqual(await cursor.fetchall(), rows)
+
+    async def test_multiple_connections(self):
+        async with aiosqlite.connect(TEST_DB) as db:
+            await db.execute(
+                "create table multiple_connections "
+                "(i integer primary key asc, k integer)"
+            )
+
+        async def do_one_conn(i):
+            async with aiosqlite.connect(TEST_DB) as db:
+                await db.execute("insert into multiple_connections (k) values (?)", [i])
+                await db.commit()
+
+        await asyncio.gather(*[do_one_conn(i) for i in range(10)])
+
+        async with aiosqlite.connect(TEST_DB) as db:
+            cursor = await db.execute("select * from multiple_connections")
+            rows = await cursor.fetchall()
+
+        assert len(rows) == 10
+
+    async def test_multiple_queries(self):
+        async with aiosqlite.connect(TEST_DB) as db:
+            await db.execute(
+                "create table multiple_queries "
+                "(i integer primary key asc, k integer)"
+            )
+
+            await asyncio.gather(
+                *[
+                    db.execute("insert into multiple_queries (k) values (?)", [i])
+                    for i in range(10)
+                ]
+            )
+
+            await db.commit()
+
+        async with aiosqlite.connect(TEST_DB) as db:
+            cursor = await db.execute("select * from multiple_queries")
+            rows = await cursor.fetchall()
+
+        assert len(rows) == 10
+
+    async def test_iterable_cursor(self):
+        async with aiosqlite.connect(TEST_DB) as db:
+            cursor = await db.cursor()
+            await cursor.execute(
+                "create table iterable_cursor " "(i integer primary key asc, k integer)"
+            )
+            await cursor.executemany(
+                "insert into iterable_cursor (k) values (?)", [[i] for i in range(10)]
+            )
+            await db.commit()
+
+        async with aiosqlite.connect(TEST_DB) as db:
+            cursor = await db.execute("select * from iterable_cursor")
+            rows = []
+            async for row in cursor:
+                rows.append(row)
+
+        assert len(rows) == 10
+
+    async def test_multi_loop_usage(self):
+        results = {}
+
+        def runner(k, conn):
+            async def query():
+                async with conn.execute("select * from foo") as cursor:
+                    rows = await cursor.fetchall()
+                    self.assertEqual(len(rows), 2)
+                    return rows
+
+            with self.subTest(k):
+                loop = asyncio.new_event_loop()
+                rows = loop.run_until_complete(query())
+                loop.close()
+                results[k] = rows
+
+        async with aiosqlite.connect(":memory:") as db:
+            await db.execute("create table foo (id int, name varchar)")
+            await db.execute(
+                "insert into foo values (?, ?), (?, ?)", (1, "Sally", 2, "Janet")
+            )
+            await db.commit()
+
+            threads = [Thread(target=runner, args=(k, db)) for k in range(4)]
+            for thread in threads:
+                thread.start()
+            for thread in threads:
+                thread.join()
+
+        self.assertEqual(len(results), 4)
+        for rows in results.values():
+            self.assertEqual(len(rows), 2)
+
+    async def test_context_cursor(self):
+        async with aiosqlite.connect(TEST_DB) as db:
+            async with db.cursor() as cursor:
+                await cursor.execute(
+                    "create table context_cursor "
+                    "(i integer primary key asc, k integer)"
+                )
+                await cursor.executemany(
+                    "insert into context_cursor (k) values (?)",
+                    [[i] for i in range(10)],
+                )
+                await db.commit()
+
+        async with aiosqlite.connect(TEST_DB) as db:
+            async with db.execute("select * from context_cursor") as cursor:
+                rows = []
+                async for row in cursor:
+                    rows.append(row)
+
+        assert len(rows) == 10
+
+    async def test_cursor_return_self(self):
+        async with aiosqlite.connect(TEST_DB) as db:
+            cursor = await db.cursor()
+
+            result = await cursor.execute(
+                "create table test_cursor_return_self (i integer, k integer)"
+            )
+            self.assertEqual(result, cursor, "cursor execute returns itself")
+
+            result = await cursor.executemany(
+                "insert into test_cursor_return_self values (?, ?)", [(1, 1), (2, 2)]
+            )
+            self.assertEqual(result, cursor)
+
+            result = await cursor.executescript(
+                "insert into test_cursor_return_self values (3, 3);"
+                "insert into test_cursor_return_self values (4, 4);"
+                "insert into test_cursor_return_self values (5, 5);"
+            )
+            self.assertEqual(result, cursor)
+
+    async def test_connection_properties(self):
+        async with aiosqlite.connect(TEST_DB) as db:
+            self.assertEqual(db.total_changes, 0)
+
+            async with db.cursor() as cursor:
+                self.assertFalse(db.in_transaction)
+                await cursor.execute(
+                    "create table test_properties "
+                    "(i integer primary key asc, k integer, d text)"
+                )
+                await cursor.execute(
+                    "insert into test_properties (k, d) values (1, 'hi')"
+                )
+                self.assertTrue(db.in_transaction)
+                await db.commit()
+                self.assertFalse(db.in_transaction)
+
+            self.assertEqual(db.total_changes, 1)
+
+            self.assertIsNone(db.row_factory)
+            self.assertEqual(db.text_factory, default_text_factory)
+
+            async with db.cursor() as cursor:
+                await cursor.execute("select * from test_properties")
+                row = await cursor.fetchone()
+                self.assertIsInstance(row, tuple)
+                self.assertEqual(row, (1, 1, "hi"))
+                with self.assertRaises(TypeError):
+                    _ = row["k"]
+
+            async with db.cursor() as cursor:
+                cursor.row_factory = aiosqlite.Row
+                self.assertEqual(cursor.row_factory, aiosqlite.Row)
+                await cursor.execute("select * from test_properties")
+                row = await cursor.fetchone()
+                self.assertIsInstance(row, aiosqlite.Row)
+                self.assertEqual(row[1], 1)
+                self.assertEqual(row[2], "hi")
+                self.assertEqual(row["k"], 1)
+                self.assertEqual(row["d"], "hi")
+
+            db.row_factory = aiosqlite.Row
+            db.text_factory = bytes
+            self.assertEqual(db.row_factory, aiosqlite.Row)
+            self.assertEqual(db.text_factory, bytes)
+
+            async with db.cursor() as cursor:
+                await cursor.execute("select * from test_properties")
+                row = await cursor.fetchone()
+                self.assertIsInstance(row, aiosqlite.Row)
+                self.assertEqual(row[1], 1)
+                self.assertEqual(row[2], b"hi")
+                self.assertEqual(row["k"], 1)
+                self.assertEqual(row["d"], b"hi")
+
+    async def test_fetch_all(self):
+        async with aiosqlite.connect(TEST_DB) as db:
+            await db.execute(
+                "create table test_fetch_all (i integer primary key asc, k integer)"
+            )
+            await db.execute(
+                "insert into test_fetch_all (k) values (10), (24), (16), (32)"
+            )
+            await db.commit()
+
+        async with aiosqlite.connect(TEST_DB) as db:
+            cursor = await db.execute("select k from test_fetch_all where k < 30")
+            rows = await cursor.fetchall()
+            self.assertEqual(rows, [(10,), (24,), (16,)])
+
+    async def test_enable_load_extension(self):
+        """Assert that after enabling extension loading, they can be loaded"""
+        async with aiosqlite.connect(TEST_DB) as db:
+            try:
+                await db.enable_load_extension(True)
+                await db.load_extension("test")
+            except OperationalError as e:
+                assert "not authorized" not in e.args
+            except AttributeError as e:
+                raise SkipTest(
+                    "python was not compiled with sqlite3 "
+                    "extension support, so we can't test it"
+                ) from e
+
+    async def test_set_progress_handler(self):
+        """
+        Assert that after setting a progress handler returning 1, DB operations are aborted
+        """
+        async with aiosqlite.connect(TEST_DB) as db:
+            await db.set_progress_handler(lambda: 1, 1)
+            with self.assertRaises(OperationalError):
+                await db.execute(
+                    "create table test_progress_handler (i integer primary key asc, k integer)"
+                )
+
+    async def test_create_function(self):
+        """Assert that after creating a custom function, it can be used"""
+
+        def no_arg():
+            return "no arg"
+
+        def one_arg(num):
+            return num * 2
+
+        async with aiosqlite.connect(TEST_DB) as db:
+            await db.create_function("no_arg", 0, no_arg)
+            await db.create_function("one_arg", 1, one_arg)
+
+            async with db.execute("SELECT no_arg();") as res:
+                row = await res.fetchone()
+                self.assertEqual(row[0], "no arg")
+
+            async with db.execute("SELECT one_arg(10);") as res:
+                row = await res.fetchone()
+                self.assertEqual(row[0], 20)
+
+    async def test_create_function_deterministic(self):
+        """Assert that after creating a deterministic custom function, it can be used.
+
+        https://sqlite.org/deterministic.html
+        """
+
+        def one_arg(num):
+            return num * 2
+
+        async with aiosqlite.connect(TEST_DB) as db:
+            await db.create_function("one_arg", 1, one_arg, deterministic=True)
+            await db.execute("create table foo (id int, bar int)")
+
+            # Non-deterministic functions cannot be used in indexes
+            await db.execute("create index t on foo(one_arg(bar))")
+
+    async def test_set_trace_callback(self):
+        statements = []
+
+        def callback(statement: str):
+            statements.append(statement)
+
+        async with aiosqlite.connect(TEST_DB) as db:
+            await db.set_trace_callback(callback)
+
+            await db.execute("select 10")
+            self.assertIn("select 10", statements)
+
+    async def test_connect_error(self):
+        bad_db = Path("/something/that/shouldnt/exist.db")
+        with self.assertRaisesRegex(OperationalError, "unable to open database"):
+            async with aiosqlite.connect(bad_db) as db:
+                self.assertIsNone(db)  # should never be reached
+
+        with self.assertRaisesRegex(OperationalError, "unable to open database"):
+            await aiosqlite.connect(bad_db)
+
+    async def test_iterdump(self):
+        async with aiosqlite.connect(":memory:") as db:
+            await db.execute("create table foo (i integer, k charvar(250))")
+            await db.executemany(
+                "insert into foo values (?, ?)", [(1, "hello"), (2, "world")]
+            )
+
+            lines = [line async for line in db.iterdump()]
+            self.assertEqual(
+                lines,
+                [
+                    "BEGIN TRANSACTION;",
+                    "CREATE TABLE foo (i integer, k charvar(250));",
+                    "INSERT INTO \"foo\" VALUES(1,'hello');",
+                    "INSERT INTO \"foo\" VALUES(2,'world');",
+                    "COMMIT;",
+                ],
+            )
+
+    async def test_cursor_on_closed_connection(self):
+        db = await aiosqlite.connect(TEST_DB)
+
+        cursor = await db.execute("select 1, 2")
+        await db.close()
+        with self.assertRaisesRegex(ValueError, "Connection closed"):
+            await cursor.fetchall()
+        with self.assertRaisesRegex(ValueError, "Connection closed"):
+            await cursor.fetchall()
+
+    async def test_cursor_on_closed_connection_loop(self):
+        db = await aiosqlite.connect(TEST_DB)
+
+        cursor = await db.execute("select 1, 2")
+        tasks = []
+        for i in range(100):
+            if i == 50:
+                tasks.append(asyncio.ensure_future(db.close()))
+            tasks.append(asyncio.ensure_future(cursor.fetchall()))
+        for task in tasks:
+            try:
+                await task
+            except sqlite3.ProgrammingError:
+                pass
+
+    async def test_close_twice(self):
+        db = await aiosqlite.connect(TEST_DB)
+
+        await db.close()
+
+        # no error
+        await db.close()
+
+    async def test_backup_aiosqlite(self):
+        def progress(a, b, c):
+            print(a, b, c)
+
+        async with aiosqlite.connect(":memory:") as db1, aiosqlite.connect(
+            ":memory:"
+        ) as db2:
+            await db1.execute("create table foo (i integer, k charvar(250))")
+            await db1.executemany(
+                "insert into foo values (?, ?)", [(1, "hello"), (2, "world")]
+            )
+            await db1.commit()
+
+            with self.assertRaisesRegex(OperationalError, "no such table: foo"):
+                await db2.execute("select * from foo")
+
+            await db1.backup(db2, progress=progress)
+
+            async with db2.execute("select * from foo") as cursor:
+                rows = await cursor.fetchall()
+                self.assertEqual(rows, [(1, "hello"), (2, "world")])
+
+    async def test_backup_sqlite(self):
+        async with aiosqlite.connect(":memory:") as db1:
+            with sqlite3.connect(":memory:") as db2:
+                await db1.execute("create table foo (i integer, k charvar(250))")
+                await db1.executemany(
+                    "insert into foo values (?, ?)", [(1, "hello"), (2, "world")]
+                )
+                await db1.commit()
+
+                with self.assertRaisesRegex(OperationalError, "no such table: foo"):
+                    db2.execute("select * from foo")
+
+                await db1.backup(db2)
+
+                cursor = db2.execute("select * from foo")
+                rows = cursor.fetchall()
+                self.assertEqual(rows, [(1, "hello"), (2, "world")])