diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/dialects/oracle/oracledb.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/sqlalchemy/dialects/oracle/oracledb.py | 947 |
1 files changed, 947 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/oracle/oracledb.py b/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/oracle/oracledb.py new file mode 100644 index 00000000..c09d2bae --- /dev/null +++ b/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/oracle/oracledb.py @@ -0,0 +1,947 @@ +# dialects/oracle/oracledb.py +# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors +# <see AUTHORS file> +# +# This module is part of SQLAlchemy and is released under +# the MIT License: https://www.opensource.org/licenses/mit-license.php +# mypy: ignore-errors + +r""".. dialect:: oracle+oracledb + :name: python-oracledb + :dbapi: oracledb + :connectstring: oracle+oracledb://user:pass@hostname:port[/dbname][?service_name=<service>[&key=value&key=value...]] + :url: https://oracle.github.io/python-oracledb/ + +Description +----------- + +Python-oracledb is the Oracle Database driver for Python. It features a default +"thin" client mode that requires no dependencies, and an optional "thick" mode +that uses Oracle Client libraries. It supports SQLAlchemy features including +two phase transactions and Asyncio. + +Python-oracle is the renamed, updated cx_Oracle driver. Oracle is no longer +doing any releases in the cx_Oracle namespace. + +The SQLAlchemy ``oracledb`` dialect provides both a sync and an async +implementation under the same dialect name. The proper version is +selected depending on how the engine is created: + +* calling :func:`_sa.create_engine` with ``oracle+oracledb://...`` will + automatically select the sync version:: + + from sqlalchemy import create_engine + + sync_engine = create_engine( + "oracle+oracledb://scott:tiger@localhost?service_name=FREEPDB1" + ) + +* calling :func:`_asyncio.create_async_engine` with ``oracle+oracledb://...`` + will automatically select the async version:: + + from sqlalchemy.ext.asyncio import create_async_engine + + asyncio_engine = create_async_engine( + "oracle+oracledb://scott:tiger@localhost?service_name=FREEPDB1" + ) + + The asyncio version of the dialect may also be specified explicitly using the + ``oracledb_async`` suffix:: + + from sqlalchemy.ext.asyncio import create_async_engine + + asyncio_engine = create_async_engine( + "oracle+oracledb_async://scott:tiger@localhost?service_name=FREEPDB1" + ) + +.. versionadded:: 2.0.25 added support for the async version of oracledb. + +Thick mode support +------------------ + +By default, the python-oracledb driver runs in a "thin" mode that does not +require Oracle Client libraries to be installed. The driver also supports a +"thick" mode that uses Oracle Client libraries to get functionality such as +Oracle Application Continuity. + +To enable thick mode, call `oracledb.init_oracle_client() +<https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.init_oracle_client>`_ +explicitly, or pass the parameter ``thick_mode=True`` to +:func:`_sa.create_engine`. To pass custom arguments to +``init_oracle_client()``, like the ``lib_dir`` path, a dict may be passed, for +example:: + + engine = sa.create_engine( + "oracle+oracledb://...", + thick_mode={ + "lib_dir": "/path/to/oracle/client/lib", + "config_dir": "/path/to/network_config_file_directory", + "driver_name": "my-app : 1.0.0", + }, + ) + +Note that passing a ``lib_dir`` path should only be done on macOS or +Windows. On Linux it does not behave as you might expect. + +.. seealso:: + + python-oracledb documentation `Enabling python-oracledb Thick mode + <https://python-oracledb.readthedocs.io/en/latest/user_guide/initialization.html#enabling-python-oracledb-thick-mode>`_ + +Connecting to Oracle Database +----------------------------- + +python-oracledb provides several methods of indicating the target database. +The dialect translates from a series of different URL forms. + +Given the hostname, port and service name of the target database, you can +connect in SQLAlchemy using the ``service_name`` query string parameter:: + + engine = create_engine( + "oracle+oracledb://scott:tiger@hostname:port?service_name=myservice" + ) + +Connecting with Easy Connect strings +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +You can pass any valid python-oracledb connection string as the ``dsn`` key +value in a :paramref:`_sa.create_engine.connect_args` dictionary. See +python-oracledb documentation `Oracle Net Services Connection Strings +<https://python-oracledb.readthedocs.io/en/latest/user_guide/connection_handling.html#oracle-net-services-connection-strings>`_. + +For example to use an `Easy Connect string +<https://download.oracle.com/ocomdocs/global/Oracle-Net-Easy-Connect-Plus.pdf>`_ +with a timeout to prevent connection establishment from hanging if the network +transport to the database cannot be establishd in 30 seconds, and also setting +a keep-alive time of 60 seconds to stop idle network connections from being +terminated by a firewall:: + + e = create_engine( + "oracle+oracledb://@", + connect_args={ + "user": "scott", + "password": "tiger", + "dsn": "hostname:port/myservice?transport_connect_timeout=30&expire_time=60", + }, + ) + +The Easy Connect syntax has been enhanced during the life of Oracle Database. +Review the documentation for your database version. The current documentation +is at `Understanding the Easy Connect Naming Method +<https://www.oracle.com/pls/topic/lookup?ctx=dblatest&id=GUID-B0437826-43C1-49EC-A94D-B650B6A4A6EE>`_. + +The general syntax is similar to: + +.. sourcecode:: text + + [[protocol:]//]host[:port][/[service_name]][?parameter_name=value{¶meter_name=value}] + +Note that although the SQLAlchemy URL syntax ``hostname:port/dbname`` looks +like Oracle's Easy Connect syntax, it is different. SQLAlchemy's URL requires a +system identifier (SID) for the ``dbname`` component:: + + engine = create_engine("oracle+oracledb://scott:tiger@hostname:port/sid") + +Easy Connect syntax does not support SIDs. It uses services names, which are +the preferred choice for connecting to Oracle Database. + +Passing python-oracledb connect arguments +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Other python-oracledb driver `connection options +<https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.connect>`_ +can be passed in ``connect_args``. For example:: + + e = create_engine( + "oracle+oracledb://@", + connect_args={ + "user": "scott", + "password": "tiger", + "dsn": "hostname:port/myservice", + "events": True, + "mode": oracledb.AUTH_MODE_SYSDBA, + }, + ) + +Connecting with tnsnames.ora TNS aliases +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +If no port, database name, or service name is provided, the dialect will use an +Oracle Database DSN "connection string". This takes the "hostname" portion of +the URL as the data source name. For example, if the ``tnsnames.ora`` file +contains a `TNS Alias +<https://python-oracledb.readthedocs.io/en/latest/user_guide/connection_handling.html#tns-aliases-for-connection-strings>`_ +of ``myalias`` as below: + +.. sourcecode:: text + + myalias = + (DESCRIPTION = + (ADDRESS = (PROTOCOL = TCP)(HOST = mymachine.example.com)(PORT = 1521)) + (CONNECT_DATA = + (SERVER = DEDICATED) + (SERVICE_NAME = orclpdb1) + ) + ) + +The python-oracledb dialect connects to this database service when ``myalias`` is the +hostname portion of the URL, without specifying a port, database name or +``service_name``:: + + engine = create_engine("oracle+oracledb://scott:tiger@myalias") + +Connecting to Oracle Autonomous Database +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Users of Oracle Autonomous Database should use either use the TNS Alias URL +shown above, or pass the TNS Alias as the ``dsn`` key value in a +:paramref:`_sa.create_engine.connect_args` dictionary. + +If Oracle Autonomous Database is configured for mutual TLS ("mTLS") +connections, then additional configuration is required as shown in `Connecting +to Oracle Cloud Autonomous Databases +<https://python-oracledb.readthedocs.io/en/latest/user_guide/connection_handling.html#connecting-to-oracle-cloud-autonomous-databases>`_. In +summary, Thick mode users should configure file locations and set the wallet +path in ``sqlnet.ora`` appropriately:: + + e = create_engine( + "oracle+oracledb://@", + thick_mode={ + # directory containing tnsnames.ora and cwallet.so + "config_dir": "/opt/oracle/wallet_dir", + }, + connect_args={ + "user": "scott", + "password": "tiger", + "dsn": "mydb_high", + }, + ) + +Thin mode users of mTLS should pass the appropriate directories and PEM wallet +password when creating the engine, similar to:: + + e = create_engine( + "oracle+oracledb://@", + connect_args={ + "user": "scott", + "password": "tiger", + "dsn": "mydb_high", + "config_dir": "/opt/oracle/wallet_dir", # directory containing tnsnames.ora + "wallet_location": "/opt/oracle/wallet_dir", # directory containing ewallet.pem + "wallet_password": "top secret", # password for the PEM file + }, + ) + +Typically ``config_dir`` and ``wallet_location`` are the same directory, which +is where the Oracle Autonomous Database wallet zip file was extracted. Note +this directory should be protected. + +Connection Pooling +------------------ + +Applications with multiple concurrent users should use connection pooling. A +minimal sized connection pool is also beneficial for long-running, single-user +applications that do not frequently use a connection. + +The python-oracledb driver provides its own connection pool implementation that +may be used in place of SQLAlchemy's pooling functionality. The driver pool +gives support for high availability features such as dead connection detection, +connection draining for planned database downtime, support for Oracle +Application Continuity and Transparent Application Continuity, and gives +support for `Database Resident Connection Pooling (DRCP) +<https://python-oracledb.readthedocs.io/en/latest/user_guide/connection_handling.html#database-resident-connection-pooling-drcp>`_. + +To take advantage of python-oracledb's pool, use the +:paramref:`_sa.create_engine.creator` parameter to provide a function that +returns a new connection, along with setting +:paramref:`_sa.create_engine.pool_class` to ``NullPool`` to disable +SQLAlchemy's pooling:: + + import oracledb + from sqlalchemy import create_engine + from sqlalchemy import text + from sqlalchemy.pool import NullPool + + # Uncomment to use the optional python-oracledb Thick mode. + # Review the python-oracledb doc for the appropriate parameters + # oracledb.init_oracle_client(<your parameters>) + + pool = oracledb.create_pool( + user="scott", + password="tiger", + dsn="localhost:1521/freepdb1", + min=1, + max=4, + increment=1, + ) + engine = create_engine( + "oracle+oracledb://", creator=pool.acquire, poolclass=NullPool + ) + +The above engine may then be used normally. Internally, python-oracledb handles +connection pooling:: + + with engine.connect() as conn: + print(conn.scalar(text("select 1 from dual"))) + +Refer to the python-oracledb documentation for `oracledb.create_pool() +<https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#oracledb.create_pool>`_ +for the arguments that can be used when creating a connection pool. + +.. _drcp: + +Using Oracle Database Resident Connection Pooling (DRCP) +-------------------------------------------------------- + +When using Oracle Database's Database Resident Connection Pooling (DRCP), the +best practice is to specify a connection class and "purity". Refer to the +`python-oracledb documentation on DRCP +<https://python-oracledb.readthedocs.io/en/latest/user_guide/connection_handling.html#database-resident-connection-pooling-drcp>`_. +For example:: + + import oracledb + from sqlalchemy import create_engine + from sqlalchemy import text + from sqlalchemy.pool import NullPool + + # Uncomment to use the optional python-oracledb Thick mode. + # Review the python-oracledb doc for the appropriate parameters + # oracledb.init_oracle_client(<your parameters>) + + pool = oracledb.create_pool( + user="scott", + password="tiger", + dsn="localhost:1521/freepdb1", + min=1, + max=4, + increment=1, + cclass="MYCLASS", + purity=oracledb.PURITY_SELF, + ) + engine = create_engine( + "oracle+oracledb://", creator=pool.acquire, poolclass=NullPool + ) + +The above engine may then be used normally where python-oracledb handles +application connection pooling and Oracle Database additionally uses DRCP:: + + with engine.connect() as conn: + print(conn.scalar(text("select 1 from dual"))) + +If you wish to use different connection classes or purities for different +connections, then wrap ``pool.acquire()``:: + + import oracledb + from sqlalchemy import create_engine + from sqlalchemy import text + from sqlalchemy.pool import NullPool + + # Uncomment to use python-oracledb Thick mode. + # Review the python-oracledb doc for the appropriate parameters + # oracledb.init_oracle_client(<your parameters>) + + pool = oracledb.create_pool( + user="scott", + password="tiger", + dsn="localhost:1521/freepdb1", + min=1, + max=4, + increment=1, + cclass="MYCLASS", + purity=oracledb.PURITY_SELF, + ) + + + def creator(): + return pool.acquire(cclass="MYOTHERCLASS", purity=oracledb.PURITY_NEW) + + + engine = create_engine( + "oracle+oracledb://", creator=creator, poolclass=NullPool + ) + +Engine Options consumed by the SQLAlchemy oracledb dialect outside of the driver +-------------------------------------------------------------------------------- + +There are also options that are consumed by the SQLAlchemy oracledb dialect +itself. These options are always passed directly to :func:`_sa.create_engine`, +such as:: + + e = create_engine("oracle+oracledb://user:pass@tnsalias", arraysize=500) + +The parameters accepted by the oracledb dialect are as follows: + +* ``arraysize`` - set the driver cursor.arraysize value. It defaults to + ``None``, indicating that the driver default value of 100 should be used. + This setting controls how many rows are buffered when fetching rows, and can + have a significant effect on performance if increased for queries that return + large numbers of rows. + + .. versionchanged:: 2.0.26 - changed the default value from 50 to None, + to use the default value of the driver itself. + +* ``auto_convert_lobs`` - defaults to True; See :ref:`oracledb_lob`. + +* ``coerce_to_decimal`` - see :ref:`oracledb_numeric` for detail. + +* ``encoding_errors`` - see :ref:`oracledb_unicode_encoding_errors` for detail. + +.. _oracledb_unicode: + +Unicode +------- + +As is the case for all DBAPIs under Python 3, all strings are inherently +Unicode strings. + +Ensuring the Correct Client Encoding +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In python-oracledb, the encoding used for all character data is "UTF-8". + +Unicode-specific Column datatypes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The Core expression language handles unicode data by use of the +:class:`.Unicode` and :class:`.UnicodeText` datatypes. These types correspond +to the VARCHAR2 and CLOB Oracle Database datatypes by default. When using +these datatypes with Unicode data, it is expected that the database is +configured with a Unicode-aware character set so that the VARCHAR2 and CLOB +datatypes can accommodate the data. + +In the case that Oracle Database is not configured with a Unicode character +set, the two options are to use the :class:`_types.NCHAR` and +:class:`_oracle.NCLOB` datatypes explicitly, or to pass the flag +``use_nchar_for_unicode=True`` to :func:`_sa.create_engine`, which will cause +the SQLAlchemy dialect to use NCHAR/NCLOB for the :class:`.Unicode` / +:class:`.UnicodeText` datatypes instead of VARCHAR/CLOB. + +.. versionchanged:: 1.3 The :class:`.Unicode` and :class:`.UnicodeText` + datatypes now correspond to the ``VARCHAR2`` and ``CLOB`` Oracle Database + datatypes unless the ``use_nchar_for_unicode=True`` is passed to the dialect + when :func:`_sa.create_engine` is called. + + +.. _oracledb_unicode_encoding_errors: + +Encoding Errors +^^^^^^^^^^^^^^^ + +For the unusual case that data in Oracle Database is present with a broken +encoding, the dialect accepts a parameter ``encoding_errors`` which will be +passed to Unicode decoding functions in order to affect how decoding errors are +handled. The value is ultimately consumed by the Python `decode +<https://docs.python.org/3/library/stdtypes.html#bytes.decode>`_ function, and +is passed both via python-oracledb's ``encodingErrors`` parameter consumed by +``Cursor.var()``, as well as SQLAlchemy's own decoding function, as the +python-oracledb dialect makes use of both under different circumstances. + +.. versionadded:: 1.3.11 + + +.. _oracledb_setinputsizes: + +Fine grained control over python-oracledb data binding with setinputsizes +------------------------------------------------------------------------- + +The python-oracle DBAPI has a deep and fundamental reliance upon the usage of +the DBAPI ``setinputsizes()`` call. The purpose of this call is to establish +the datatypes that are bound to a SQL statement for Python values being passed +as parameters. While virtually no other DBAPI assigns any use to the +``setinputsizes()`` call, the python-oracledb DBAPI relies upon it heavily in +its interactions with the Oracle Database, and in some scenarios it is not +possible for SQLAlchemy to know exactly how data should be bound, as some +settings can cause profoundly different performance characteristics, while +altering the type coercion behavior at the same time. + +Users of the oracledb dialect are **strongly encouraged** to read through +python-oracledb's list of built-in datatype symbols at `Database Types +<https://python-oracledb.readthedocs.io/en/latest/api_manual/module.html#database-types>`_ +Note that in some cases, significant performance degradation can occur when +using these types vs. not. + +On the SQLAlchemy side, the :meth:`.DialectEvents.do_setinputsizes` event can +be used both for runtime visibility (e.g. logging) of the setinputsizes step as +well as to fully control how ``setinputsizes()`` is used on a per-statement +basis. + +.. versionadded:: 1.2.9 Added :meth:`.DialectEvents.setinputsizes` + + +Example 1 - logging all setinputsizes calls +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The following example illustrates how to log the intermediary values from a +SQLAlchemy perspective before they are converted to the raw ``setinputsizes()`` +parameter dictionary. The keys of the dictionary are :class:`.BindParameter` +objects which have a ``.key`` and a ``.type`` attribute:: + + from sqlalchemy import create_engine, event + + engine = create_engine( + "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1" + ) + + + @event.listens_for(engine, "do_setinputsizes") + def _log_setinputsizes(inputsizes, cursor, statement, parameters, context): + for bindparam, dbapitype in inputsizes.items(): + log.info( + "Bound parameter name: %s SQLAlchemy type: %r DBAPI object: %s", + bindparam.key, + bindparam.type, + dbapitype, + ) + +Example 2 - remove all bindings to CLOB +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +For performance, fetching LOB datatypes from Oracle Database is set by default +for the ``Text`` type within SQLAlchemy. This setting can be modified as +follows:: + + + from sqlalchemy import create_engine, event + from oracledb import CLOB + + engine = create_engine( + "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1" + ) + + + @event.listens_for(engine, "do_setinputsizes") + def _remove_clob(inputsizes, cursor, statement, parameters, context): + for bindparam, dbapitype in list(inputsizes.items()): + if dbapitype is CLOB: + del inputsizes[bindparam] + +.. _oracledb_lob: + +LOB Datatypes +-------------- + +LOB datatypes refer to the "large object" datatypes such as CLOB, NCLOB and +BLOB. Oracle Database can efficiently return these datatypes as a single +buffer. SQLAlchemy makes use of type handlers to do this by default. + +To disable the use of the type handlers and deliver LOB objects as classic +buffered objects with a ``read()`` method, the parameter +``auto_convert_lobs=False`` may be passed to :func:`_sa.create_engine`. + +.. _oracledb_returning: + +RETURNING Support +----------------- + +The oracledb dialect implements RETURNING using OUT parameters. The dialect +supports RETURNING fully. + +Two Phase Transaction Support +----------------------------- + +Two phase transactions are fully supported with python-oracledb. (Thin mode +requires python-oracledb 2.3). APIs for two phase transactions are provided at +the Core level via :meth:`_engine.Connection.begin_twophase` and +:paramref:`_orm.Session.twophase` for transparent ORM use. + +.. versionchanged:: 2.0.32 added support for two phase transactions + +.. _oracledb_numeric: + +Precision Numerics +------------------ + +SQLAlchemy's numeric types can handle receiving and returning values as Python +``Decimal`` objects or float objects. When a :class:`.Numeric` object, or a +subclass such as :class:`.Float`, :class:`_oracle.DOUBLE_PRECISION` etc. is in +use, the :paramref:`.Numeric.asdecimal` flag determines if values should be +coerced to ``Decimal`` upon return, or returned as float objects. To make +matters more complicated under Oracle Database, the ``NUMBER`` type can also +represent integer values if the "scale" is zero, so the Oracle +Database-specific :class:`_oracle.NUMBER` type takes this into account as well. + +The oracledb dialect makes extensive use of connection- and cursor-level +"outputtypehandler" callables in order to coerce numeric values as requested. +These callables are specific to the specific flavor of :class:`.Numeric` in +use, as well as if no SQLAlchemy typing objects are present. There are +observed scenarios where Oracle Database may send incomplete or ambiguous +information about the numeric types being returned, such as a query where the +numeric types are buried under multiple levels of subquery. The type handlers +do their best to make the right decision in all cases, deferring to the +underlying python-oracledb DBAPI for all those cases where the driver can make +the best decision. + +When no typing objects are present, as when executing plain SQL strings, a +default "outputtypehandler" is present which will generally return numeric +values which specify precision and scale as Python ``Decimal`` objects. To +disable this coercion to decimal for performance reasons, pass the flag +``coerce_to_decimal=False`` to :func:`_sa.create_engine`:: + + engine = create_engine( + "oracle+oracledb://scott:tiger@tnsalias", coerce_to_decimal=False + ) + +The ``coerce_to_decimal`` flag only impacts the results of plain string +SQL statements that are not otherwise associated with a :class:`.Numeric` +SQLAlchemy type (or a subclass of such). + +.. versionchanged:: 1.2 The numeric handling system for the oracle dialects has + been reworked to take advantage of newer driver features as well as better + integration of outputtypehandlers. + +.. versionadded:: 2.0.0 added support for the python-oracledb driver. + +""" # noqa +from __future__ import annotations + +import collections +import re +from typing import Any +from typing import TYPE_CHECKING + +from . import cx_oracle as _cx_oracle +from ... import exc +from ... import pool +from ...connectors.asyncio import AsyncAdapt_dbapi_connection +from ...connectors.asyncio import AsyncAdapt_dbapi_cursor +from ...connectors.asyncio import AsyncAdapt_dbapi_ss_cursor +from ...connectors.asyncio import AsyncAdaptFallback_dbapi_connection +from ...engine import default +from ...util import asbool +from ...util import await_fallback +from ...util import await_only + +if TYPE_CHECKING: + from oracledb import AsyncConnection + from oracledb import AsyncCursor + + +class OracleExecutionContext_oracledb( + _cx_oracle.OracleExecutionContext_cx_oracle +): + pass + + +class OracleDialect_oracledb(_cx_oracle.OracleDialect_cx_oracle): + supports_statement_cache = True + execution_ctx_cls = OracleExecutionContext_oracledb + + driver = "oracledb" + _min_version = (1,) + + def __init__( + self, + auto_convert_lobs=True, + coerce_to_decimal=True, + arraysize=None, + encoding_errors=None, + thick_mode=None, + **kwargs, + ): + super().__init__( + auto_convert_lobs, + coerce_to_decimal, + arraysize, + encoding_errors, + **kwargs, + ) + + if self.dbapi is not None and ( + thick_mode or isinstance(thick_mode, dict) + ): + kw = thick_mode if isinstance(thick_mode, dict) else {} + self.dbapi.init_oracle_client(**kw) + + @classmethod + def import_dbapi(cls): + import oracledb + + return oracledb + + @classmethod + def is_thin_mode(cls, connection): + return connection.connection.dbapi_connection.thin + + @classmethod + def get_async_dialect_cls(cls, url): + return OracleDialectAsync_oracledb + + def _load_version(self, dbapi_module): + version = (0, 0, 0) + if dbapi_module is not None: + m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", dbapi_module.version) + if m: + version = tuple( + int(x) for x in m.group(1, 2, 3) if x is not None + ) + self.oracledb_ver = version + if ( + self.oracledb_ver > (0, 0, 0) + and self.oracledb_ver < self._min_version + ): + raise exc.InvalidRequestError( + f"oracledb version {self._min_version} and above are supported" + ) + + def do_begin_twophase(self, connection, xid): + conn_xis = connection.connection.xid(*xid) + connection.connection.tpc_begin(conn_xis) + connection.connection.info["oracledb_xid"] = conn_xis + + def do_prepare_twophase(self, connection, xid): + should_commit = connection.connection.tpc_prepare() + connection.info["oracledb_should_commit"] = should_commit + + def do_rollback_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + if recover: + conn_xid = connection.connection.xid(*xid) + else: + conn_xid = None + connection.connection.tpc_rollback(conn_xid) + + def do_commit_twophase( + self, connection, xid, is_prepared=True, recover=False + ): + conn_xid = None + if not is_prepared: + should_commit = connection.connection.tpc_prepare() + elif recover: + conn_xid = connection.connection.xid(*xid) + should_commit = True + else: + should_commit = connection.info["oracledb_should_commit"] + if should_commit: + connection.connection.tpc_commit(conn_xid) + + def do_recover_twophase(self, connection): + return [ + # oracledb seems to return bytes + ( + fi, + gti.decode() if isinstance(gti, bytes) else gti, + bq.decode() if isinstance(bq, bytes) else bq, + ) + for fi, gti, bq in connection.connection.tpc_recover() + ] + + def _check_max_identifier_length(self, connection): + if self.oracledb_ver >= (2, 5): + max_len = connection.connection.max_identifier_length + if max_len is not None: + return max_len + return super()._check_max_identifier_length(connection) + + +class AsyncAdapt_oracledb_cursor(AsyncAdapt_dbapi_cursor): + _cursor: AsyncCursor + __slots__ = () + + @property + def outputtypehandler(self): + return self._cursor.outputtypehandler + + @outputtypehandler.setter + def outputtypehandler(self, value): + self._cursor.outputtypehandler = value + + def var(self, *args, **kwargs): + return self._cursor.var(*args, **kwargs) + + def close(self): + self._rows.clear() + self._cursor.close() + + def setinputsizes(self, *args: Any, **kwargs: Any) -> Any: + return self._cursor.setinputsizes(*args, **kwargs) + + def _aenter_cursor(self, cursor: AsyncCursor) -> AsyncCursor: + try: + return cursor.__enter__() + except Exception as error: + self._adapt_connection._handle_exception(error) + + async def _execute_async(self, operation, parameters): + # override to not use mutex, oracledb already has a mutex + + if parameters is None: + result = await self._cursor.execute(operation) + else: + result = await self._cursor.execute(operation, parameters) + + if self._cursor.description and not self.server_side: + self._rows = collections.deque(await self._cursor.fetchall()) + return result + + async def _executemany_async( + self, + operation, + seq_of_parameters, + ): + # override to not use mutex, oracledb already has a mutex + return await self._cursor.executemany(operation, seq_of_parameters) + + def __enter__(self): + return self + + def __exit__(self, type_: Any, value: Any, traceback: Any) -> None: + self.close() + + +class AsyncAdapt_oracledb_ss_cursor( + AsyncAdapt_dbapi_ss_cursor, AsyncAdapt_oracledb_cursor +): + __slots__ = () + + def close(self) -> None: + if self._cursor is not None: + self._cursor.close() + self._cursor = None # type: ignore + + +class AsyncAdapt_oracledb_connection(AsyncAdapt_dbapi_connection): + _connection: AsyncConnection + __slots__ = () + + thin = True + + _cursor_cls = AsyncAdapt_oracledb_cursor + _ss_cursor_cls = None + + @property + def autocommit(self): + return self._connection.autocommit + + @autocommit.setter + def autocommit(self, value): + self._connection.autocommit = value + + @property + def outputtypehandler(self): + return self._connection.outputtypehandler + + @outputtypehandler.setter + def outputtypehandler(self, value): + self._connection.outputtypehandler = value + + @property + def version(self): + return self._connection.version + + @property + def stmtcachesize(self): + return self._connection.stmtcachesize + + @stmtcachesize.setter + def stmtcachesize(self, value): + self._connection.stmtcachesize = value + + @property + def max_identifier_length(self): + return self._connection.max_identifier_length + + def cursor(self): + return AsyncAdapt_oracledb_cursor(self) + + def ss_cursor(self): + return AsyncAdapt_oracledb_ss_cursor(self) + + def xid(self, *args: Any, **kwargs: Any) -> Any: + return self._connection.xid(*args, **kwargs) + + def tpc_begin(self, *args: Any, **kwargs: Any) -> Any: + return self.await_(self._connection.tpc_begin(*args, **kwargs)) + + def tpc_commit(self, *args: Any, **kwargs: Any) -> Any: + return self.await_(self._connection.tpc_commit(*args, **kwargs)) + + def tpc_prepare(self, *args: Any, **kwargs: Any) -> Any: + return self.await_(self._connection.tpc_prepare(*args, **kwargs)) + + def tpc_recover(self, *args: Any, **kwargs: Any) -> Any: + return self.await_(self._connection.tpc_recover(*args, **kwargs)) + + def tpc_rollback(self, *args: Any, **kwargs: Any) -> Any: + return self.await_(self._connection.tpc_rollback(*args, **kwargs)) + + +class AsyncAdaptFallback_oracledb_connection( + AsyncAdaptFallback_dbapi_connection, AsyncAdapt_oracledb_connection +): + __slots__ = () + + +class OracledbAdaptDBAPI: + def __init__(self, oracledb) -> None: + self.oracledb = oracledb + + for k, v in self.oracledb.__dict__.items(): + if k != "connect": + self.__dict__[k] = v + + def connect(self, *arg, **kw): + async_fallback = kw.pop("async_fallback", False) + creator_fn = kw.pop("async_creator_fn", self.oracledb.connect_async) + + if asbool(async_fallback): + return AsyncAdaptFallback_oracledb_connection( + self, await_fallback(creator_fn(*arg, **kw)) + ) + + else: + return AsyncAdapt_oracledb_connection( + self, await_only(creator_fn(*arg, **kw)) + ) + + +class OracleExecutionContextAsync_oracledb(OracleExecutionContext_oracledb): + # restore default create cursor + create_cursor = default.DefaultExecutionContext.create_cursor + + def create_default_cursor(self): + # copy of OracleExecutionContext_cx_oracle.create_cursor + c = self._dbapi_connection.cursor() + if self.dialect.arraysize: + c.arraysize = self.dialect.arraysize + + return c + + def create_server_side_cursor(self): + c = self._dbapi_connection.ss_cursor() + if self.dialect.arraysize: + c.arraysize = self.dialect.arraysize + + return c + + +class OracleDialectAsync_oracledb(OracleDialect_oracledb): + is_async = True + supports_server_side_cursors = True + supports_statement_cache = True + execution_ctx_cls = OracleExecutionContextAsync_oracledb + + _min_version = (2,) + + # thick_mode mode is not supported by asyncio, oracledb will raise + @classmethod + def import_dbapi(cls): + import oracledb + + return OracledbAdaptDBAPI(oracledb) + + @classmethod + def get_pool_class(cls, url): + async_fallback = url.query.get("async_fallback", False) + + if asbool(async_fallback): + return pool.FallbackAsyncAdaptedQueuePool + else: + return pool.AsyncAdaptedQueuePool + + def get_driver_connection(self, connection): + return connection._connection + + +dialect = OracleDialect_oracledb +dialect_async = OracleDialectAsync_oracledb |