diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/connectors/asyncio.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/sqlalchemy/connectors/asyncio.py | 213 |
1 files changed, 213 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/connectors/asyncio.py b/.venv/lib/python3.12/site-packages/sqlalchemy/connectors/asyncio.py new file mode 100644 index 00000000..c4f0d715 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/sqlalchemy/connectors/asyncio.py @@ -0,0 +1,213 @@ +# connectors/asyncio.py +# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +"""generic asyncio-adapted versions of DBAPI connection and cursor""" + +from __future__ import annotations + +import collections + +from ..engine import AdaptedConnection +from ..util.concurrency import asyncio +from ..util.concurrency import await_fallback +from ..util.concurrency import await_only + + +class AsyncAdapt_dbapi_cursor: + server_side = False + __slots__ = ( + "_adapt_connection", + "_connection", + "await_", + "_cursor", + "_rows", + ) + + def __init__(self, adapt_connection): + self._adapt_connection = adapt_connection + self._connection = adapt_connection._connection + self.await_ = adapt_connection.await_ + + cursor = self._connection.cursor() + self._cursor = self._aenter_cursor(cursor) + + if not self.server_side: + self._rows = collections.deque() + + def _aenter_cursor(self, cursor): + return self.await_(cursor.__aenter__()) + + @property + def description(self): + return self._cursor.description + + @property + def rowcount(self): + return self._cursor.rowcount + + @property + def arraysize(self): + return self._cursor.arraysize + + @arraysize.setter + def arraysize(self, value): + self._cursor.arraysize = value + + @property + def lastrowid(self): + return self._cursor.lastrowid + + def close(self): + # note we aren't actually closing the cursor here, + # we are just letting GC do it. see notes in aiomysql dialect + self._rows.clear() + + def execute(self, operation, parameters=None): + return self.await_(self._execute_async(operation, parameters)) + + def executemany(self, operation, seq_of_parameters): + return self.await_( + self._executemany_async(operation, seq_of_parameters) + ) + + async def _execute_async(self, operation, parameters): + async with self._adapt_connection._execute_mutex: + result = await self._cursor.execute(operation, parameters or ()) + + if self._cursor.description and not self.server_side: + self._rows = collections.deque(await self._cursor.fetchall()) + return result + + async def _executemany_async(self, operation, seq_of_parameters): + async with self._adapt_connection._execute_mutex: + return await self._cursor.executemany(operation, seq_of_parameters) + + def nextset(self): + self.await_(self._cursor.nextset()) + if self._cursor.description and not self.server_side: + self._rows = collections.deque( + self.await_(self._cursor.fetchall()) + ) + + def setinputsizes(self, *inputsizes): + # NOTE: this is overrridden in aioodbc due to + # see https://github.com/aio-libs/aioodbc/issues/451 + # right now + + return self.await_(self._cursor.setinputsizes(*inputsizes)) + + def __iter__(self): + while self._rows: + yield self._rows.popleft() + + def fetchone(self): + if self._rows: + return self._rows.popleft() + else: + return None + + def fetchmany(self, size=None): + if size is None: + size = self.arraysize + rr = self._rows + return [rr.popleft() for _ in range(min(size, len(rr)))] + + def fetchall(self): + retval = list(self._rows) + self._rows.clear() + return retval + + +class AsyncAdapt_dbapi_ss_cursor(AsyncAdapt_dbapi_cursor): + __slots__ = () + server_side = True + + def __init__(self, adapt_connection): + self._adapt_connection = adapt_connection + self._connection = adapt_connection._connection + self.await_ = adapt_connection.await_ + + cursor = self._connection.cursor() + + self._cursor = self.await_(cursor.__aenter__()) + + def close(self): + if self._cursor is not None: + self.await_(self._cursor.close()) + self._cursor = None + + def fetchone(self): + return self.await_(self._cursor.fetchone()) + + def fetchmany(self, size=None): + return self.await_(self._cursor.fetchmany(size=size)) + + def fetchall(self): + return self.await_(self._cursor.fetchall()) + + def __iter__(self): + iterator = self._cursor.__aiter__() + while True: + try: + yield self.await_(iterator.__anext__()) + except StopAsyncIteration: + break + + +class AsyncAdapt_dbapi_connection(AdaptedConnection): + _cursor_cls = AsyncAdapt_dbapi_cursor + _ss_cursor_cls = AsyncAdapt_dbapi_ss_cursor + + await_ = staticmethod(await_only) + __slots__ = ("dbapi", "_execute_mutex") + + def __init__(self, dbapi, connection): + self.dbapi = dbapi + self._connection = connection + self._execute_mutex = asyncio.Lock() + + def ping(self, reconnect): + return self.await_(self._connection.ping(reconnect)) + + def add_output_converter(self, *arg, **kw): + self._connection.add_output_converter(*arg, **kw) + + def character_set_name(self): + return self._connection.character_set_name() + + @property + def autocommit(self): + return self._connection.autocommit + + @autocommit.setter + def autocommit(self, value): + # https://github.com/aio-libs/aioodbc/issues/448 + # self._connection.autocommit = value + + self._connection._conn.autocommit = value + + def cursor(self, server_side=False): + if server_side: + return self._ss_cursor_cls(self) + else: + return self._cursor_cls(self) + + def rollback(self): + self.await_(self._connection.rollback()) + + def commit(self): + self.await_(self._connection.commit()) + + def close(self): + self.await_(self._connection.close()) + + +class AsyncAdaptFallback_dbapi_connection(AsyncAdapt_dbapi_connection): + __slots__ = () + + await_ = staticmethod(await_fallback) |