about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/connectors/asyncio.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/connectors/asyncio.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/connectors/asyncio.py213
1 files changed, 213 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/connectors/asyncio.py b/.venv/lib/python3.12/site-packages/sqlalchemy/connectors/asyncio.py
new file mode 100644
index 00000000..c4f0d715
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/connectors/asyncio.py
@@ -0,0 +1,213 @@
+# connectors/asyncio.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+"""generic asyncio-adapted versions of DBAPI connection and cursor"""
+
+from __future__ import annotations
+
+import collections
+
+from ..engine import AdaptedConnection
+from ..util.concurrency import asyncio
+from ..util.concurrency import await_fallback
+from ..util.concurrency import await_only
+
+
+class AsyncAdapt_dbapi_cursor:
+    server_side = False
+    __slots__ = (
+        "_adapt_connection",
+        "_connection",
+        "await_",
+        "_cursor",
+        "_rows",
+    )
+
+    def __init__(self, adapt_connection):
+        self._adapt_connection = adapt_connection
+        self._connection = adapt_connection._connection
+        self.await_ = adapt_connection.await_
+
+        cursor = self._connection.cursor()
+        self._cursor = self._aenter_cursor(cursor)
+
+        if not self.server_side:
+            self._rows = collections.deque()
+
+    def _aenter_cursor(self, cursor):
+        return self.await_(cursor.__aenter__())
+
+    @property
+    def description(self):
+        return self._cursor.description
+
+    @property
+    def rowcount(self):
+        return self._cursor.rowcount
+
+    @property
+    def arraysize(self):
+        return self._cursor.arraysize
+
+    @arraysize.setter
+    def arraysize(self, value):
+        self._cursor.arraysize = value
+
+    @property
+    def lastrowid(self):
+        return self._cursor.lastrowid
+
+    def close(self):
+        # note we aren't actually closing the cursor here,
+        # we are just letting GC do it.  see notes in aiomysql dialect
+        self._rows.clear()
+
+    def execute(self, operation, parameters=None):
+        return self.await_(self._execute_async(operation, parameters))
+
+    def executemany(self, operation, seq_of_parameters):
+        return self.await_(
+            self._executemany_async(operation, seq_of_parameters)
+        )
+
+    async def _execute_async(self, operation, parameters):
+        async with self._adapt_connection._execute_mutex:
+            result = await self._cursor.execute(operation, parameters or ())
+
+            if self._cursor.description and not self.server_side:
+                self._rows = collections.deque(await self._cursor.fetchall())
+            return result
+
+    async def _executemany_async(self, operation, seq_of_parameters):
+        async with self._adapt_connection._execute_mutex:
+            return await self._cursor.executemany(operation, seq_of_parameters)
+
+    def nextset(self):
+        self.await_(self._cursor.nextset())
+        if self._cursor.description and not self.server_side:
+            self._rows = collections.deque(
+                self.await_(self._cursor.fetchall())
+            )
+
+    def setinputsizes(self, *inputsizes):
+        # NOTE: this is overrridden in aioodbc due to
+        # see https://github.com/aio-libs/aioodbc/issues/451
+        # right now
+
+        return self.await_(self._cursor.setinputsizes(*inputsizes))
+
+    def __iter__(self):
+        while self._rows:
+            yield self._rows.popleft()
+
+    def fetchone(self):
+        if self._rows:
+            return self._rows.popleft()
+        else:
+            return None
+
+    def fetchmany(self, size=None):
+        if size is None:
+            size = self.arraysize
+        rr = self._rows
+        return [rr.popleft() for _ in range(min(size, len(rr)))]
+
+    def fetchall(self):
+        retval = list(self._rows)
+        self._rows.clear()
+        return retval
+
+
+class AsyncAdapt_dbapi_ss_cursor(AsyncAdapt_dbapi_cursor):
+    __slots__ = ()
+    server_side = True
+
+    def __init__(self, adapt_connection):
+        self._adapt_connection = adapt_connection
+        self._connection = adapt_connection._connection
+        self.await_ = adapt_connection.await_
+
+        cursor = self._connection.cursor()
+
+        self._cursor = self.await_(cursor.__aenter__())
+
+    def close(self):
+        if self._cursor is not None:
+            self.await_(self._cursor.close())
+            self._cursor = None
+
+    def fetchone(self):
+        return self.await_(self._cursor.fetchone())
+
+    def fetchmany(self, size=None):
+        return self.await_(self._cursor.fetchmany(size=size))
+
+    def fetchall(self):
+        return self.await_(self._cursor.fetchall())
+
+    def __iter__(self):
+        iterator = self._cursor.__aiter__()
+        while True:
+            try:
+                yield self.await_(iterator.__anext__())
+            except StopAsyncIteration:
+                break
+
+
+class AsyncAdapt_dbapi_connection(AdaptedConnection):
+    _cursor_cls = AsyncAdapt_dbapi_cursor
+    _ss_cursor_cls = AsyncAdapt_dbapi_ss_cursor
+
+    await_ = staticmethod(await_only)
+    __slots__ = ("dbapi", "_execute_mutex")
+
+    def __init__(self, dbapi, connection):
+        self.dbapi = dbapi
+        self._connection = connection
+        self._execute_mutex = asyncio.Lock()
+
+    def ping(self, reconnect):
+        return self.await_(self._connection.ping(reconnect))
+
+    def add_output_converter(self, *arg, **kw):
+        self._connection.add_output_converter(*arg, **kw)
+
+    def character_set_name(self):
+        return self._connection.character_set_name()
+
+    @property
+    def autocommit(self):
+        return self._connection.autocommit
+
+    @autocommit.setter
+    def autocommit(self, value):
+        # https://github.com/aio-libs/aioodbc/issues/448
+        # self._connection.autocommit = value
+
+        self._connection._conn.autocommit = value
+
+    def cursor(self, server_side=False):
+        if server_side:
+            return self._ss_cursor_cls(self)
+        else:
+            return self._cursor_cls(self)
+
+    def rollback(self):
+        self.await_(self._connection.rollback())
+
+    def commit(self):
+        self.await_(self._connection.commit())
+
+    def close(self):
+        self.await_(self._connection.close())
+
+
+class AsyncAdaptFallback_dbapi_connection(AsyncAdapt_dbapi_connection):
+    __slots__ = ()
+
+    await_ = staticmethod(await_fallback)