about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/asyncmy.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/asyncmy.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/asyncmy.py339
1 files changed, 339 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/asyncmy.py b/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/asyncmy.py
new file mode 100644
index 00000000..9ec54e69
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/mysql/asyncmy.py
@@ -0,0 +1,339 @@
+# dialects/mysql/asyncmy.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors <see AUTHORS
+# file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+r"""
+.. dialect:: mysql+asyncmy
+    :name: asyncmy
+    :dbapi: asyncmy
+    :connectstring: mysql+asyncmy://user:password@host:port/dbname[?key=value&key=value...]
+    :url: https://github.com/long2ice/asyncmy
+
+Using a special asyncio mediation layer, the asyncmy dialect is usable
+as the backend for the :ref:`SQLAlchemy asyncio <asyncio_toplevel>`
+extension package.
+
+This dialect should normally be used only with the
+:func:`_asyncio.create_async_engine` engine creation function::
+
+    from sqlalchemy.ext.asyncio import create_async_engine
+
+    engine = create_async_engine(
+        "mysql+asyncmy://user:pass@hostname/dbname?charset=utf8mb4"
+    )
+
+"""  # noqa
+from collections import deque
+from contextlib import asynccontextmanager
+
+from .pymysql import MySQLDialect_pymysql
+from ... import pool
+from ... import util
+from ...engine import AdaptedConnection
+from ...util.concurrency import asyncio
+from ...util.concurrency import await_fallback
+from ...util.concurrency import await_only
+
+
+class AsyncAdapt_asyncmy_cursor:
+    # TODO: base on connectors/asyncio.py
+    # see #10415
+    server_side = False
+    __slots__ = (
+        "_adapt_connection",
+        "_connection",
+        "await_",
+        "_cursor",
+        "_rows",
+    )
+
+    def __init__(self, adapt_connection):
+        self._adapt_connection = adapt_connection
+        self._connection = adapt_connection._connection
+        self.await_ = adapt_connection.await_
+
+        cursor = self._connection.cursor()
+
+        self._cursor = self.await_(cursor.__aenter__())
+        self._rows = deque()
+
+    @property
+    def description(self):
+        return self._cursor.description
+
+    @property
+    def rowcount(self):
+        return self._cursor.rowcount
+
+    @property
+    def arraysize(self):
+        return self._cursor.arraysize
+
+    @arraysize.setter
+    def arraysize(self, value):
+        self._cursor.arraysize = value
+
+    @property
+    def lastrowid(self):
+        return self._cursor.lastrowid
+
+    def close(self):
+        # note we aren't actually closing the cursor here,
+        # we are just letting GC do it.   to allow this to be async
+        # we would need the Result to change how it does "Safe close cursor".
+        # MySQL "cursors" don't actually have state to be "closed" besides
+        # exhausting rows, which we already have done for sync cursor.
+        # another option would be to emulate aiosqlite dialect and assign
+        # cursor only if we are doing server side cursor operation.
+        self._rows.clear()
+
+    def execute(self, operation, parameters=None):
+        return self.await_(self._execute_async(operation, parameters))
+
+    def executemany(self, operation, seq_of_parameters):
+        return self.await_(
+            self._executemany_async(operation, seq_of_parameters)
+        )
+
+    async def _execute_async(self, operation, parameters):
+        async with self._adapt_connection._mutex_and_adapt_errors():
+            if parameters is None:
+                result = await self._cursor.execute(operation)
+            else:
+                result = await self._cursor.execute(operation, parameters)
+
+            if not self.server_side:
+                # asyncmy has a "fake" async result, so we have to pull it out
+                # of that here since our default result is not async.
+                # we could just as easily grab "_rows" here and be done with it
+                # but this is safer.
+                self._rows = deque(await self._cursor.fetchall())
+            return result
+
+    async def _executemany_async(self, operation, seq_of_parameters):
+        async with self._adapt_connection._mutex_and_adapt_errors():
+            return await self._cursor.executemany(operation, seq_of_parameters)
+
+    def setinputsizes(self, *inputsizes):
+        pass
+
+    def __iter__(self):
+        while self._rows:
+            yield self._rows.popleft()
+
+    def fetchone(self):
+        if self._rows:
+            return self._rows.popleft()
+        else:
+            return None
+
+    def fetchmany(self, size=None):
+        if size is None:
+            size = self.arraysize
+
+        rr = self._rows
+        return [rr.popleft() for _ in range(min(size, len(rr)))]
+
+    def fetchall(self):
+        retval = list(self._rows)
+        self._rows.clear()
+        return retval
+
+
+class AsyncAdapt_asyncmy_ss_cursor(AsyncAdapt_asyncmy_cursor):
+    # TODO: base on connectors/asyncio.py
+    # see #10415
+    __slots__ = ()
+    server_side = True
+
+    def __init__(self, adapt_connection):
+        self._adapt_connection = adapt_connection
+        self._connection = adapt_connection._connection
+        self.await_ = adapt_connection.await_
+
+        cursor = self._connection.cursor(
+            adapt_connection.dbapi.asyncmy.cursors.SSCursor
+        )
+
+        self._cursor = self.await_(cursor.__aenter__())
+
+    def close(self):
+        if self._cursor is not None:
+            self.await_(self._cursor.close())
+            self._cursor = None
+
+    def fetchone(self):
+        return self.await_(self._cursor.fetchone())
+
+    def fetchmany(self, size=None):
+        return self.await_(self._cursor.fetchmany(size=size))
+
+    def fetchall(self):
+        return self.await_(self._cursor.fetchall())
+
+
+class AsyncAdapt_asyncmy_connection(AdaptedConnection):
+    # TODO: base on connectors/asyncio.py
+    # see #10415
+    await_ = staticmethod(await_only)
+    __slots__ = ("dbapi", "_execute_mutex")
+
+    def __init__(self, dbapi, connection):
+        self.dbapi = dbapi
+        self._connection = connection
+        self._execute_mutex = asyncio.Lock()
+
+    @asynccontextmanager
+    async def _mutex_and_adapt_errors(self):
+        async with self._execute_mutex:
+            try:
+                yield
+            except AttributeError:
+                raise self.dbapi.InternalError(
+                    "network operation failed due to asyncmy attribute error"
+                )
+
+    def ping(self, reconnect):
+        assert not reconnect
+        return self.await_(self._do_ping())
+
+    async def _do_ping(self):
+        async with self._mutex_and_adapt_errors():
+            return await self._connection.ping(False)
+
+    def character_set_name(self):
+        return self._connection.character_set_name()
+
+    def autocommit(self, value):
+        self.await_(self._connection.autocommit(value))
+
+    def cursor(self, server_side=False):
+        if server_side:
+            return AsyncAdapt_asyncmy_ss_cursor(self)
+        else:
+            return AsyncAdapt_asyncmy_cursor(self)
+
+    def rollback(self):
+        self.await_(self._connection.rollback())
+
+    def commit(self):
+        self.await_(self._connection.commit())
+
+    def terminate(self):
+        # it's not awaitable.
+        self._connection.close()
+
+    def close(self) -> None:
+        self.await_(self._connection.ensure_closed())
+
+
+class AsyncAdaptFallback_asyncmy_connection(AsyncAdapt_asyncmy_connection):
+    __slots__ = ()
+
+    await_ = staticmethod(await_fallback)
+
+
+def _Binary(x):
+    """Return x as a binary type."""
+    return bytes(x)
+
+
+class AsyncAdapt_asyncmy_dbapi:
+    def __init__(self, asyncmy):
+        self.asyncmy = asyncmy
+        self.paramstyle = "format"
+        self._init_dbapi_attributes()
+
+    def _init_dbapi_attributes(self):
+        for name in (
+            "Warning",
+            "Error",
+            "InterfaceError",
+            "DataError",
+            "DatabaseError",
+            "OperationalError",
+            "InterfaceError",
+            "IntegrityError",
+            "ProgrammingError",
+            "InternalError",
+            "NotSupportedError",
+        ):
+            setattr(self, name, getattr(self.asyncmy.errors, name))
+
+    STRING = util.symbol("STRING")
+    NUMBER = util.symbol("NUMBER")
+    BINARY = util.symbol("BINARY")
+    DATETIME = util.symbol("DATETIME")
+    TIMESTAMP = util.symbol("TIMESTAMP")
+    Binary = staticmethod(_Binary)
+
+    def connect(self, *arg, **kw):
+        async_fallback = kw.pop("async_fallback", False)
+        creator_fn = kw.pop("async_creator_fn", self.asyncmy.connect)
+
+        if util.asbool(async_fallback):
+            return AsyncAdaptFallback_asyncmy_connection(
+                self,
+                await_fallback(creator_fn(*arg, **kw)),
+            )
+        else:
+            return AsyncAdapt_asyncmy_connection(
+                self,
+                await_only(creator_fn(*arg, **kw)),
+            )
+
+
+class MySQLDialect_asyncmy(MySQLDialect_pymysql):
+    driver = "asyncmy"
+    supports_statement_cache = True
+
+    supports_server_side_cursors = True
+    _sscursor = AsyncAdapt_asyncmy_ss_cursor
+
+    is_async = True
+    has_terminate = True
+
+    @classmethod
+    def import_dbapi(cls):
+        return AsyncAdapt_asyncmy_dbapi(__import__("asyncmy"))
+
+    @classmethod
+    def get_pool_class(cls, url):
+        async_fallback = url.query.get("async_fallback", False)
+
+        if util.asbool(async_fallback):
+            return pool.FallbackAsyncAdaptedQueuePool
+        else:
+            return pool.AsyncAdaptedQueuePool
+
+    def do_terminate(self, dbapi_connection) -> None:
+        dbapi_connection.terminate()
+
+    def create_connect_args(self, url):
+        return super().create_connect_args(
+            url, _translate_args=dict(username="user", database="db")
+        )
+
+    def is_disconnect(self, e, connection, cursor):
+        if super().is_disconnect(e, connection, cursor):
+            return True
+        else:
+            str_e = str(e).lower()
+            return (
+                "not connected" in str_e or "network operation failed" in str_e
+            )
+
+    def _found_rows_client_flag(self):
+        from asyncmy.constants import CLIENT
+
+        return CLIENT.FOUND_ROWS
+
+    def get_driver_connection(self, connection):
+        return connection._connection
+
+
+dialect = MySQLDialect_asyncmy