about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/oracle/base.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/dialects/oracle/base.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/dialects/oracle/base.py3484
1 files changed, 3484 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/oracle/base.py b/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/oracle/base.py
new file mode 100644
index 00000000..02aa4d53
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/dialects/oracle/base.py
@@ -0,0 +1,3484 @@
+# dialects/oracle/base.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: ignore-errors
+
+
+r"""
+.. dialect:: oracle
+    :name: Oracle Database
+    :normal_support: 11+
+    :best_effort: 9+
+
+
+Auto Increment Behavior
+-----------------------
+
+SQLAlchemy Table objects which include integer primary keys are usually assumed
+to have "autoincrementing" behavior, meaning they can generate their own
+primary key values upon INSERT. For use within Oracle Database, two options are
+available, which are the use of IDENTITY columns (Oracle Database 12 and above
+only) or the association of a SEQUENCE with the column.
+
+Specifying GENERATED AS IDENTITY (Oracle Database 12 and above)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Starting from version 12, Oracle Database can make use of identity columns
+using the :class:`_sql.Identity` to specify the autoincrementing behavior::
+
+    t = Table(
+        "mytable",
+        metadata,
+        Column("id", Integer, Identity(start=3), primary_key=True),
+        Column(...),
+        ...,
+    )
+
+The CREATE TABLE for the above :class:`_schema.Table` object would be:
+
+.. sourcecode:: sql
+
+    CREATE TABLE mytable (
+        id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 3),
+        ...,
+        PRIMARY KEY (id)
+    )
+
+The :class:`_schema.Identity` object support many options to control the
+"autoincrementing" behavior of the column, like the starting value, the
+incrementing value, etc.  In addition to the standard options, Oracle Database
+supports setting :paramref:`_schema.Identity.always` to ``None`` to use the
+default generated mode, rendering GENERATED AS IDENTITY in the DDL. It also supports
+setting :paramref:`_schema.Identity.on_null` to ``True`` to specify ON NULL
+in conjunction with a 'BY DEFAULT' identity column.
+
+Using a SEQUENCE (all Oracle Database versions)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Older version of Oracle Database had no "autoincrement" feature: SQLAlchemy
+relies upon sequences to produce these values.  With the older Oracle Database
+versions, *a sequence must always be explicitly specified to enable
+autoincrement*.  This is divergent with the majority of documentation examples
+which assume the usage of an autoincrement-capable database.  To specify
+sequences, use the sqlalchemy.schema.Sequence object which is passed to a
+Column construct::
+
+  t = Table(
+      "mytable",
+      metadata,
+      Column("id", Integer, Sequence("id_seq", start=1), primary_key=True),
+      Column(...),
+      ...,
+  )
+
+This step is also required when using table reflection, i.e. autoload_with=engine::
+
+  t = Table(
+      "mytable",
+      metadata,
+      Column("id", Integer, Sequence("id_seq", start=1), primary_key=True),
+      autoload_with=engine,
+  )
+
+.. versionchanged::  1.4   Added :class:`_schema.Identity` construct
+   in a :class:`_schema.Column` to specify the option of an autoincrementing
+   column.
+
+.. _oracle_isolation_level:
+
+Transaction Isolation Level / Autocommit
+----------------------------------------
+
+Oracle Database supports "READ COMMITTED" and "SERIALIZABLE" modes of
+isolation. The AUTOCOMMIT isolation level is also supported by the
+python-oracledb and cx_Oracle dialects.
+
+To set using per-connection execution options::
+
+    connection = engine.connect()
+    connection = connection.execution_options(isolation_level="AUTOCOMMIT")
+
+For ``READ COMMITTED`` and ``SERIALIZABLE``, the Oracle Database dialects sets
+the level at the session level using ``ALTER SESSION``, which is reverted back
+to its default setting when the connection is returned to the connection pool.
+
+Valid values for ``isolation_level`` include:
+
+* ``READ COMMITTED``
+* ``AUTOCOMMIT``
+* ``SERIALIZABLE``
+
+.. note:: The implementation for the
+   :meth:`_engine.Connection.get_isolation_level` method as implemented by the
+   Oracle Database dialects necessarily force the start of a transaction using the
+   Oracle Database DBMS_TRANSACTION.LOCAL_TRANSACTION_ID function; otherwise no
+   level is normally readable.
+
+   Additionally, the :meth:`_engine.Connection.get_isolation_level` method will
+   raise an exception if the ``v$transaction`` view is not available due to
+   permissions or other reasons, which is a common occurrence in Oracle Database
+   installations.
+
+   The python-oracledb and cx_Oracle dialects attempt to call the
+   :meth:`_engine.Connection.get_isolation_level` method when the dialect makes
+   its first connection to the database in order to acquire the
+   "default"isolation level.  This default level is necessary so that the level
+   can be reset on a connection after it has been temporarily modified using
+   :meth:`_engine.Connection.execution_options` method.  In the common event
+   that the :meth:`_engine.Connection.get_isolation_level` method raises an
+   exception due to ``v$transaction`` not being readable as well as any other
+   database-related failure, the level is assumed to be "READ COMMITTED".  No
+   warning is emitted for this initial first-connect condition as it is
+   expected to be a common restriction on Oracle databases.
+
+.. versionadded:: 1.3.16 added support for AUTOCOMMIT to the cx_Oracle dialect
+   as well as the notion of a default isolation level
+
+.. versionadded:: 1.3.21 Added support for SERIALIZABLE as well as live
+   reading of the isolation level.
+
+.. versionchanged:: 1.3.22 In the event that the default isolation
+   level cannot be read due to permissions on the v$transaction view as
+   is common in Oracle installations, the default isolation level is hardcoded
+   to "READ COMMITTED" which was the behavior prior to 1.3.21.
+
+.. seealso::
+
+    :ref:`dbapi_autocommit`
+
+Identifier Casing
+-----------------
+
+In Oracle Database, the data dictionary represents all case insensitive
+identifier names using UPPERCASE text.  This is in contradiction to the
+expectations of SQLAlchemy, which assume a case insensitive name is represented
+as lowercase text.
+
+As an example of case insensitive identifier names, consider the following table:
+
+.. sourcecode:: sql
+
+    CREATE TABLE MyTable (Identifier INTEGER PRIMARY KEY)
+
+If you were to ask Oracle Database for information about this table, the
+table name would be reported as ``MYTABLE`` and the column name would
+be reported as ``IDENTIFIER``.    Compare to most other databases such as
+PostgreSQL and MySQL which would report these names as ``mytable`` and
+``identifier``.   The names are **not quoted, therefore are case insensitive**.
+The special casing of ``MyTable`` and ``Identifier`` would only be maintained
+if they were quoted in the table definition:
+
+.. sourcecode:: sql
+
+    CREATE TABLE "MyTable" ("Identifier" INTEGER PRIMARY KEY)
+
+When constructing a SQLAlchemy :class:`.Table` object, **an all lowercase name
+is considered to be case insensitive**.   So the following table assumes
+case insensitive names::
+
+    Table("mytable", metadata, Column("identifier", Integer, primary_key=True))
+
+Whereas when mixed case or UPPERCASE names are used, case sensitivity is
+assumed::
+
+    Table("MyTable", metadata, Column("Identifier", Integer, primary_key=True))
+
+A similar situation occurs at the database driver level when emitting a
+textual SQL SELECT statement and looking at column names in the DBAPI
+``cursor.description`` attribute.  A database like PostgreSQL will normalize
+case insensitive names to be lowercase::
+
+    >>> pg_engine = create_engine("postgresql://scott:tiger@localhost/test")
+    >>> pg_connection = pg_engine.connect()
+    >>> result = pg_connection.exec_driver_sql("SELECT 1 AS SomeName")
+    >>> result.cursor.description
+    (Column(name='somename', type_code=23),)
+
+Whereas Oracle normalizes them to UPPERCASE::
+
+    >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe")
+    >>> oracle_connection = oracle_engine.connect()
+    >>> result = oracle_connection.exec_driver_sql(
+    ...     "SELECT 1 AS SomeName FROM DUAL"
+    ... )
+    >>> result.cursor.description
+    [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
+
+In order to achieve cross-database parity for the two cases of a. table
+reflection and b. textual-only SQL statement round trips, SQLAlchemy performs a step
+called **name normalization** when using the Oracle dialect.  This process may
+also apply to other third party dialects that have similar UPPERCASE handling
+of case insensitive names.
+
+When using name normalization, SQLAlchemy attempts to detect if a name is
+case insensitive by checking if all characters are UPPERCASE letters only;
+if so, then it assumes this is a case insensitive name and is delivered as
+a lowercase name.
+
+For table reflection, a tablename that is seen represented as all UPPERCASE
+in Oracle Database's catalog tables will be assumed to have a case insensitive
+name.  This is what allows the ``Table`` definition to use lower case names
+and be equally compatible from a reflection point of view on Oracle Database
+and all other databases such as PostgreSQL and MySQL::
+
+    # matches a table created with CREATE TABLE mytable
+    Table("mytable", metadata, autoload_with=some_engine)
+
+Above, the all lowercase name ``"mytable"`` is case insensitive; it will match
+a table reported by PostgreSQL as ``"mytable"`` and a table reported by
+Oracle as ``"MYTABLE"``.  If name normalization were not present, it would
+not be possible for the above :class:`.Table` definition to be introspectable
+in a cross-database way, since we are dealing with a case insensitive name
+that is not reported by each database in the same way.
+
+Case sensitivity can be forced on in this case, such as if we wanted to represent
+the quoted tablename ``"MYTABLE"`` with that exact casing, most simply by using
+that casing directly, which will be seen as a case sensitive name::
+
+    # matches a table created with CREATE TABLE "MYTABLE"
+    Table("MYTABLE", metadata, autoload_with=some_engine)
+
+For the unusual case of a quoted all-lowercase name, the :class:`.quoted_name`
+construct may be used::
+
+    from sqlalchemy import quoted_name
+
+    # matches a table created with CREATE TABLE "mytable"
+    Table(
+        quoted_name("mytable", quote=True), metadata, autoload_with=some_engine
+    )
+
+Name normalization also takes place when handling result sets from **purely
+textual SQL strings**, that have no other :class:`.Table` or :class:`.Column`
+metadata associated with them. This includes SQL strings executed using
+:meth:`.Connection.exec_driver_sql` and SQL strings executed using the
+:func:`.text` construct which do not include :class:`.Column` metadata.
+
+Returning to the Oracle Database SELECT statement, we see that even though
+``cursor.description`` reports the column name as ``SOMENAME``, SQLAlchemy
+name normalizes this to ``somename``::
+
+    >>> oracle_engine = create_engine("oracle+oracledb://scott:tiger@oracle18c/xe")
+    >>> oracle_connection = oracle_engine.connect()
+    >>> result = oracle_connection.exec_driver_sql(
+    ...     "SELECT 1 AS SomeName FROM DUAL"
+    ... )
+    >>> result.cursor.description
+    [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
+    >>> result.keys()
+    RMKeyView(['somename'])
+
+The single scenario where the above behavior produces inaccurate results
+is when using an all-uppercase, quoted name.  SQLAlchemy has no way to determine
+that a particular name in ``cursor.description`` was quoted, and is therefore
+case sensitive, or was not quoted, and should be name normalized::
+
+    >>> result = oracle_connection.exec_driver_sql(
+    ...     'SELECT 1 AS "SOMENAME" FROM DUAL'
+    ... )
+    >>> result.cursor.description
+    [('SOMENAME', <DbType DB_TYPE_NUMBER>, 127, None, 0, -127, True)]
+    >>> result.keys()
+    RMKeyView(['somename'])
+
+For this case, a new feature will be available in SQLAlchemy 2.1 to disable
+the name normalization behavior in specific cases.
+
+
+.. _oracle_max_identifier_lengths:
+
+Maximum Identifier Lengths
+--------------------------
+
+SQLAlchemy is sensitive to the maximum identifier length supported by Oracle
+Database. This affects generated SQL label names as well as the generation of
+constraint names, particularly in the case where the constraint naming
+convention feature described at :ref:`constraint_naming_conventions` is being
+used.
+
+Oracle Database 12.2 increased the default maximum identifier length from 30 to
+128. As of SQLAlchemy 1.4, the default maximum identifier length for the Oracle
+dialects is 128 characters.  Upon first connection, the maximum length actually
+supported by the database is obtained. In all cases, setting the
+:paramref:`_sa.create_engine.max_identifier_length` parameter will bypass this
+change and the value given will be used as is::
+
+    engine = create_engine(
+        "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1",
+        max_identifier_length=30,
+    )
+
+If :paramref:`_sa.create_engine.max_identifier_length` is not set, the oracledb
+dialect internally uses the ``max_identifier_length`` attribute available on
+driver connections since python-oracledb version 2.5. When using an older
+driver version, or using the cx_Oracle dialect, SQLAlchemy will instead attempt
+to use the query ``SELECT value FROM v$parameter WHERE name = 'compatible'``
+upon first connect in order to determine the effective compatibility version of
+the database. The "compatibility" version is a version number that is
+independent of the actual database version. It is used to assist database
+migration. It is configured by an Oracle Database initialization parameter. The
+compatibility version then determines the maximum allowed identifier length for
+the database. If the V$ view is not available, the database version information
+is used instead.
+
+The maximum identifier length comes into play both when generating anonymized
+SQL labels in SELECT statements, but more crucially when generating constraint
+names from a naming convention.  It is this area that has created the need for
+SQLAlchemy to change this default conservatively.  For example, the following
+naming convention produces two very different constraint names based on the
+identifier length::
+
+    from sqlalchemy import Column
+    from sqlalchemy import Index
+    from sqlalchemy import Integer
+    from sqlalchemy import MetaData
+    from sqlalchemy import Table
+    from sqlalchemy.dialects import oracle
+    from sqlalchemy.schema import CreateIndex
+
+    m = MetaData(naming_convention={"ix": "ix_%(column_0N_name)s"})
+
+    t = Table(
+        "t",
+        m,
+        Column("some_column_name_1", Integer),
+        Column("some_column_name_2", Integer),
+        Column("some_column_name_3", Integer),
+    )
+
+    ix = Index(
+        None,
+        t.c.some_column_name_1,
+        t.c.some_column_name_2,
+        t.c.some_column_name_3,
+    )
+
+    oracle_dialect = oracle.dialect(max_identifier_length=30)
+    print(CreateIndex(ix).compile(dialect=oracle_dialect))
+
+With an identifier length of 30, the above CREATE INDEX looks like:
+
+.. sourcecode:: sql
+
+    CREATE INDEX ix_some_column_name_1s_70cd ON t
+    (some_column_name_1, some_column_name_2, some_column_name_3)
+
+However with length of 128, it becomes::
+
+.. sourcecode:: sql
+
+    CREATE INDEX ix_some_column_name_1some_column_name_2some_column_name_3 ON t
+    (some_column_name_1, some_column_name_2, some_column_name_3)
+
+Applications which have run versions of SQLAlchemy prior to 1.4 on Oracle
+Database version 12.2 or greater are therefore subject to the scenario of a
+database migration that wishes to "DROP CONSTRAINT" on a name that was
+previously generated with the shorter length.  This migration will fail when
+the identifier length is changed without the name of the index or constraint
+first being adjusted.  Such applications are strongly advised to make use of
+:paramref:`_sa.create_engine.max_identifier_length` in order to maintain
+control of the generation of truncated names, and to fully review and test all
+database migrations in a staging environment when changing this value to ensure
+that the impact of this change has been mitigated.
+
+.. versionchanged:: 1.4 the default max_identifier_length for Oracle Database
+   is 128 characters, which is adjusted down to 30 upon first connect if the
+   Oracle Database, or its compatibility setting, are lower than version 12.2.
+
+
+LIMIT/OFFSET/FETCH Support
+--------------------------
+
+Methods like :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset` make use
+of ``FETCH FIRST N ROW / OFFSET N ROWS`` syntax assuming Oracle Database 12c or
+above, and assuming the SELECT statement is not embedded within a compound
+statement like UNION.  This syntax is also available directly by using the
+:meth:`_sql.Select.fetch` method.
+
+.. versionchanged:: 2.0 the Oracle Database dialects now use ``FETCH FIRST N
+   ROW / OFFSET N ROWS`` for all :meth:`_sql.Select.limit` and
+   :meth:`_sql.Select.offset` usage including within the ORM and legacy
+   :class:`_orm.Query`.  To force the legacy behavior using window functions,
+   specify the ``enable_offset_fetch=False`` dialect parameter to
+   :func:`_sa.create_engine`.
+
+The use of ``FETCH FIRST / OFFSET`` may be disabled on any Oracle Database
+version by passing ``enable_offset_fetch=False`` to :func:`_sa.create_engine`,
+which will force the use of "legacy" mode that makes use of window functions.
+This mode is also selected automatically when using a version of Oracle
+Database prior to 12c.
+
+When using legacy mode, or when a :class:`.Select` statement with limit/offset
+is embedded in a compound statement, an emulated approach for LIMIT / OFFSET
+based on window functions is used, which involves creation of a subquery using
+``ROW_NUMBER`` that is prone to performance issues as well as SQL construction
+issues for complex statements. However, this approach is supported by all
+Oracle Database versions. See notes below.
+
+Notes on LIMIT / OFFSET emulation (when fetch() method cannot be used)
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+If using :meth:`_sql.Select.limit` and :meth:`_sql.Select.offset`, or with the
+ORM the :meth:`_orm.Query.limit` and :meth:`_orm.Query.offset` methods on an
+Oracle Database version prior to 12c, the following notes apply:
+
+* SQLAlchemy currently makes use of ROWNUM to achieve
+  LIMIT/OFFSET; the exact methodology is taken from
+  https://blogs.oracle.com/oraclemagazine/on-rownum-and-limiting-results .
+
+* the "FIRST_ROWS()" optimization keyword is not used by default.  To enable
+  the usage of this optimization directive, specify ``optimize_limits=True``
+  to :func:`_sa.create_engine`.
+
+  .. versionchanged:: 1.4
+
+      The Oracle Database dialect renders limit/offset integer values using a
+      "post compile" scheme which renders the integer directly before passing
+      the statement to the cursor for execution.  The ``use_binds_for_limits``
+      flag no longer has an effect.
+
+      .. seealso::
+
+          :ref:`change_4808`.
+
+.. _oracle_returning:
+
+RETURNING Support
+-----------------
+
+Oracle Database supports RETURNING fully for INSERT, UPDATE and DELETE
+statements that are invoked with a single collection of bound parameters (that
+is, a ``cursor.execute()`` style statement; SQLAlchemy does not generally
+support RETURNING with :term:`executemany` statements).  Multiple rows may be
+returned as well.
+
+.. versionchanged:: 2.0 the Oracle Database backend has full support for
+   RETURNING on parity with other backends.
+
+
+ON UPDATE CASCADE
+-----------------
+
+Oracle Database doesn't have native ON UPDATE CASCADE functionality.  A trigger
+based solution is available at
+https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html
+
+When using the SQLAlchemy ORM, the ORM has limited ability to manually issue
+cascading updates - specify ForeignKey objects using the
+"deferrable=True, initially='deferred'" keyword arguments,
+and specify "passive_updates=False" on each relationship().
+
+Oracle Database 8 Compatibility
+-------------------------------
+
+.. warning:: The status of Oracle Database 8 compatibility is not known for
+   SQLAlchemy 2.0.
+
+When Oracle Database 8 is detected, the dialect internally configures itself to
+the following behaviors:
+
+* the use_ansi flag is set to False.  This has the effect of converting all
+  JOIN phrases into the WHERE clause, and in the case of LEFT OUTER JOIN
+  makes use of Oracle's (+) operator.
+
+* the NVARCHAR2 and NCLOB datatypes are no longer generated as DDL when
+  the :class:`~sqlalchemy.types.Unicode` is used - VARCHAR2 and CLOB are issued
+  instead. This because these types don't seem to work correctly on Oracle 8
+  even though they are available. The :class:`~sqlalchemy.types.NVARCHAR` and
+  :class:`~sqlalchemy.dialects.oracle.NCLOB` types will always generate
+  NVARCHAR2 and NCLOB.
+
+
+Synonym/DBLINK Reflection
+-------------------------
+
+When using reflection with Table objects, the dialect can optionally search
+for tables indicated by synonyms, either in local or remote schemas or
+accessed over DBLINK, by passing the flag ``oracle_resolve_synonyms=True`` as
+a keyword argument to the :class:`_schema.Table` construct::
+
+    some_table = Table(
+        "some_table", autoload_with=some_engine, oracle_resolve_synonyms=True
+    )
+
+When this flag is set, the given name (such as ``some_table`` above) will be
+searched not just in the ``ALL_TABLES`` view, but also within the
+``ALL_SYNONYMS`` view to see if this name is actually a synonym to another
+name.  If the synonym is located and refers to a DBLINK, the Oracle Database
+dialects know how to locate the table's information using DBLINK syntax(e.g.
+``@dblink``).
+
+``oracle_resolve_synonyms`` is accepted wherever reflection arguments are
+accepted, including methods such as :meth:`_schema.MetaData.reflect` and
+:meth:`_reflection.Inspector.get_columns`.
+
+If synonyms are not in use, this flag should be left disabled.
+
+.. _oracle_constraint_reflection:
+
+Constraint Reflection
+---------------------
+
+The Oracle Database dialects can return information about foreign key, unique,
+and CHECK constraints, as well as indexes on tables.
+
+Raw information regarding these constraints can be acquired using
+:meth:`_reflection.Inspector.get_foreign_keys`,
+:meth:`_reflection.Inspector.get_unique_constraints`,
+:meth:`_reflection.Inspector.get_check_constraints`, and
+:meth:`_reflection.Inspector.get_indexes`.
+
+.. versionchanged:: 1.2 The Oracle Database dialect can now reflect UNIQUE and
+   CHECK constraints.
+
+When using reflection at the :class:`_schema.Table` level, the
+:class:`_schema.Table`
+will also include these constraints.
+
+Note the following caveats:
+
+* When using the :meth:`_reflection.Inspector.get_check_constraints` method,
+  Oracle Database builds a special "IS NOT NULL" constraint for columns that
+  specify "NOT NULL".  This constraint is **not** returned by default; to
+  include the "IS NOT NULL" constraints, pass the flag ``include_all=True``::
+
+      from sqlalchemy import create_engine, inspect
+
+      engine = create_engine(
+          "oracle+oracledb://scott:tiger@localhost:1521?service_name=freepdb1"
+      )
+      inspector = inspect(engine)
+      all_check_constraints = inspector.get_check_constraints(
+          "some_table", include_all=True
+      )
+
+* in most cases, when reflecting a :class:`_schema.Table`, a UNIQUE constraint
+  will **not** be available as a :class:`.UniqueConstraint` object, as Oracle
+  Database mirrors unique constraints with a UNIQUE index in most cases (the
+  exception seems to be when two or more unique constraints represent the same
+  columns); the :class:`_schema.Table` will instead represent these using
+  :class:`.Index` with the ``unique=True`` flag set.
+
+* Oracle Database creates an implicit index for the primary key of a table;
+  this index is **excluded** from all index results.
+
+* the list of columns reflected for an index will not include column names
+  that start with SYS_NC.
+
+Table names with SYSTEM/SYSAUX tablespaces
+-------------------------------------------
+
+The :meth:`_reflection.Inspector.get_table_names` and
+:meth:`_reflection.Inspector.get_temp_table_names`
+methods each return a list of table names for the current engine. These methods
+are also part of the reflection which occurs within an operation such as
+:meth:`_schema.MetaData.reflect`.  By default,
+these operations exclude the ``SYSTEM``
+and ``SYSAUX`` tablespaces from the operation.   In order to change this, the
+default list of tablespaces excluded can be changed at the engine level using
+the ``exclude_tablespaces`` parameter::
+
+    # exclude SYSAUX and SOME_TABLESPACE, but not SYSTEM
+    e = create_engine(
+        "oracle+oracledb://scott:tiger@localhost:1521/?service_name=freepdb1",
+        exclude_tablespaces=["SYSAUX", "SOME_TABLESPACE"],
+    )
+
+.. _oracle_float_support:
+
+FLOAT / DOUBLE Support and Behaviors
+------------------------------------
+
+The SQLAlchemy :class:`.Float` and :class:`.Double` datatypes are generic
+datatypes that resolve to the "least surprising" datatype for a given backend.
+For Oracle Database, this means they resolve to the ``FLOAT`` and ``DOUBLE``
+types::
+
+    >>> from sqlalchemy import cast, literal, Float
+    >>> from sqlalchemy.dialects import oracle
+    >>> float_datatype = Float()
+    >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect()))
+    CAST(:param_1 AS FLOAT)
+
+Oracle's ``FLOAT`` / ``DOUBLE`` datatypes are aliases for ``NUMBER``.   Oracle
+Database stores ``NUMBER`` values with full precision, not floating point
+precision, which means that ``FLOAT`` / ``DOUBLE`` do not actually behave like
+native FP values. Oracle Database instead offers special datatypes
+``BINARY_FLOAT`` and ``BINARY_DOUBLE`` to deliver real 4- and 8- byte FP
+values.
+
+SQLAlchemy supports these datatypes directly using :class:`.BINARY_FLOAT` and
+:class:`.BINARY_DOUBLE`.   To use the :class:`.Float` or :class:`.Double`
+datatypes in a database agnostic way, while allowing Oracle backends to utilize
+one of these types, use the :meth:`.TypeEngine.with_variant` method to set up a
+variant::
+
+    >>> from sqlalchemy import cast, literal, Float
+    >>> from sqlalchemy.dialects import oracle
+    >>> float_datatype = Float().with_variant(oracle.BINARY_FLOAT(), "oracle")
+    >>> print(cast(literal(5.0), float_datatype).compile(dialect=oracle.dialect()))
+    CAST(:param_1 AS BINARY_FLOAT)
+
+E.g. to use this datatype in a :class:`.Table` definition::
+
+    my_table = Table(
+        "my_table",
+        metadata,
+        Column(
+            "fp_data", Float().with_variant(oracle.BINARY_FLOAT(), "oracle")
+        ),
+    )
+
+DateTime Compatibility
+----------------------
+
+Oracle Database has no datatype known as ``DATETIME``, it instead has only
+``DATE``, which can actually store a date and time value.  For this reason, the
+Oracle Database dialects provide a type :class:`_oracle.DATE` which is a
+subclass of :class:`.DateTime`.  This type has no special behavior, and is only
+present as a "marker" for this type; additionally, when a database column is
+reflected and the type is reported as ``DATE``, the time-supporting
+:class:`_oracle.DATE` type is used.
+
+.. _oracle_table_options:
+
+Oracle Database Table Options
+-----------------------------
+
+The CREATE TABLE phrase supports the following options with Oracle Database
+dialects in conjunction with the :class:`_schema.Table` construct:
+
+
+* ``ON COMMIT``::
+
+    Table(
+        "some_table",
+        metadata,
+        ...,
+        prefixes=["GLOBAL TEMPORARY"],
+        oracle_on_commit="PRESERVE ROWS",
+    )
+
+*
+  ``COMPRESS``::
+
+     Table(
+         "mytable", metadata, Column("data", String(32)), oracle_compress=True
+     )
+
+     Table("mytable", metadata, Column("data", String(32)), oracle_compress=6)
+
+  The ``oracle_compress`` parameter accepts either an integer compression
+  level, or ``True`` to use the default compression level.
+
+*
+  ``TABLESPACE``::
+
+     Table("mytable", metadata, ..., oracle_tablespace="EXAMPLE_TABLESPACE")
+
+  The ``oracle_tablespace`` parameter specifies the tablespace in which the
+  table is to be created. This is useful when you want to create a table in a
+  tablespace other than the default tablespace of the user.
+
+  .. versionadded:: 2.0.37
+
+.. _oracle_index_options:
+
+Oracle Database Specific Index Options
+--------------------------------------
+
+Bitmap Indexes
+~~~~~~~~~~~~~~
+
+You can specify the ``oracle_bitmap`` parameter to create a bitmap index
+instead of a B-tree index::
+
+    Index("my_index", my_table.c.data, oracle_bitmap=True)
+
+Bitmap indexes cannot be unique and cannot be compressed. SQLAlchemy will not
+check for such limitations, only the database will.
+
+Index compression
+~~~~~~~~~~~~~~~~~
+
+Oracle Database has a more efficient storage mode for indexes containing lots
+of repeated values. Use the ``oracle_compress`` parameter to turn on key
+compression::
+
+    Index("my_index", my_table.c.data, oracle_compress=True)
+
+    Index(
+        "my_index",
+        my_table.c.data1,
+        my_table.c.data2,
+        unique=True,
+        oracle_compress=1,
+    )
+
+The ``oracle_compress`` parameter accepts either an integer specifying the
+number of prefix columns to compress, or ``True`` to use the default (all
+columns for non-unique indexes, all but the last column for unique indexes).
+
+"""  # noqa
+
+from __future__ import annotations
+
+from collections import defaultdict
+from functools import lru_cache
+from functools import wraps
+import re
+
+from . import dictionary
+from .types import _OracleBoolean
+from .types import _OracleDate
+from .types import BFILE
+from .types import BINARY_DOUBLE
+from .types import BINARY_FLOAT
+from .types import DATE
+from .types import FLOAT
+from .types import INTERVAL
+from .types import LONG
+from .types import NCLOB
+from .types import NUMBER
+from .types import NVARCHAR2  # noqa
+from .types import OracleRaw  # noqa
+from .types import RAW
+from .types import ROWID  # noqa
+from .types import TIMESTAMP
+from .types import VARCHAR2  # noqa
+from ... import Computed
+from ... import exc
+from ... import schema as sa_schema
+from ... import sql
+from ... import util
+from ...engine import default
+from ...engine import ObjectKind
+from ...engine import ObjectScope
+from ...engine import reflection
+from ...engine.reflection import ReflectionDefaults
+from ...sql import and_
+from ...sql import bindparam
+from ...sql import compiler
+from ...sql import expression
+from ...sql import func
+from ...sql import null
+from ...sql import or_
+from ...sql import select
+from ...sql import sqltypes
+from ...sql import util as sql_util
+from ...sql import visitors
+from ...sql.visitors import InternalTraversal
+from ...types import BLOB
+from ...types import CHAR
+from ...types import CLOB
+from ...types import DOUBLE_PRECISION
+from ...types import INTEGER
+from ...types import NCHAR
+from ...types import NVARCHAR
+from ...types import REAL
+from ...types import VARCHAR
+
+RESERVED_WORDS = set(
+    "SHARE RAW DROP BETWEEN FROM DESC OPTION PRIOR LONG THEN "
+    "DEFAULT ALTER IS INTO MINUS INTEGER NUMBER GRANT IDENTIFIED "
+    "ALL TO ORDER ON FLOAT DATE HAVING CLUSTER NOWAIT RESOURCE "
+    "ANY TABLE INDEX FOR UPDATE WHERE CHECK SMALLINT WITH DELETE "
+    "BY ASC REVOKE LIKE SIZE RENAME NOCOMPRESS NULL GROUP VALUES "
+    "AS IN VIEW EXCLUSIVE COMPRESS SYNONYM SELECT INSERT EXISTS "
+    "NOT TRIGGER ELSE CREATE INTERSECT PCTFREE DISTINCT USER "
+    "CONNECT SET MODE OF UNIQUE VARCHAR2 VARCHAR LOCK OR CHAR "
+    "DECIMAL UNION PUBLIC AND START UID COMMENT CURRENT LEVEL".split()
+)
+
+NO_ARG_FNS = set(
+    "UID CURRENT_DATE SYSDATE USER CURRENT_TIME CURRENT_TIMESTAMP".split()
+)
+
+
+colspecs = {
+    sqltypes.Boolean: _OracleBoolean,
+    sqltypes.Interval: INTERVAL,
+    sqltypes.DateTime: DATE,
+    sqltypes.Date: _OracleDate,
+}
+
+ischema_names = {
+    "VARCHAR2": VARCHAR,
+    "NVARCHAR2": NVARCHAR,
+    "CHAR": CHAR,
+    "NCHAR": NCHAR,
+    "DATE": DATE,
+    "NUMBER": NUMBER,
+    "BLOB": BLOB,
+    "BFILE": BFILE,
+    "CLOB": CLOB,
+    "NCLOB": NCLOB,
+    "TIMESTAMP": TIMESTAMP,
+    "TIMESTAMP WITH TIME ZONE": TIMESTAMP,
+    "TIMESTAMP WITH LOCAL TIME ZONE": TIMESTAMP,
+    "INTERVAL DAY TO SECOND": INTERVAL,
+    "RAW": RAW,
+    "FLOAT": FLOAT,
+    "DOUBLE PRECISION": DOUBLE_PRECISION,
+    "REAL": REAL,
+    "LONG": LONG,
+    "BINARY_DOUBLE": BINARY_DOUBLE,
+    "BINARY_FLOAT": BINARY_FLOAT,
+    "ROWID": ROWID,
+}
+
+
+class OracleTypeCompiler(compiler.GenericTypeCompiler):
+    # Note:
+    # Oracle DATE == DATETIME
+    # Oracle does not allow milliseconds in DATE
+    # Oracle does not support TIME columns
+
+    def visit_datetime(self, type_, **kw):
+        return self.visit_DATE(type_, **kw)
+
+    def visit_float(self, type_, **kw):
+        return self.visit_FLOAT(type_, **kw)
+
+    def visit_double(self, type_, **kw):
+        return self.visit_DOUBLE_PRECISION(type_, **kw)
+
+    def visit_unicode(self, type_, **kw):
+        if self.dialect._use_nchar_for_unicode:
+            return self.visit_NVARCHAR2(type_, **kw)
+        else:
+            return self.visit_VARCHAR2(type_, **kw)
+
+    def visit_INTERVAL(self, type_, **kw):
+        return "INTERVAL DAY%s TO SECOND%s" % (
+            type_.day_precision is not None
+            and "(%d)" % type_.day_precision
+            or "",
+            type_.second_precision is not None
+            and "(%d)" % type_.second_precision
+            or "",
+        )
+
+    def visit_LONG(self, type_, **kw):
+        return "LONG"
+
+    def visit_TIMESTAMP(self, type_, **kw):
+        if getattr(type_, "local_timezone", False):
+            return "TIMESTAMP WITH LOCAL TIME ZONE"
+        elif type_.timezone:
+            return "TIMESTAMP WITH TIME ZONE"
+        else:
+            return "TIMESTAMP"
+
+    def visit_DOUBLE_PRECISION(self, type_, **kw):
+        return self._generate_numeric(type_, "DOUBLE PRECISION", **kw)
+
+    def visit_BINARY_DOUBLE(self, type_, **kw):
+        return self._generate_numeric(type_, "BINARY_DOUBLE", **kw)
+
+    def visit_BINARY_FLOAT(self, type_, **kw):
+        return self._generate_numeric(type_, "BINARY_FLOAT", **kw)
+
+    def visit_FLOAT(self, type_, **kw):
+        kw["_requires_binary_precision"] = True
+        return self._generate_numeric(type_, "FLOAT", **kw)
+
+    def visit_NUMBER(self, type_, **kw):
+        return self._generate_numeric(type_, "NUMBER", **kw)
+
+    def _generate_numeric(
+        self,
+        type_,
+        name,
+        precision=None,
+        scale=None,
+        _requires_binary_precision=False,
+        **kw,
+    ):
+        if precision is None:
+            precision = getattr(type_, "precision", None)
+
+        if _requires_binary_precision:
+            binary_precision = getattr(type_, "binary_precision", None)
+
+            if precision and binary_precision is None:
+                # https://www.oracletutorial.com/oracle-basics/oracle-float/
+                estimated_binary_precision = int(precision / 0.30103)
+                raise exc.ArgumentError(
+                    "Oracle Database FLOAT types use 'binary precision', "
+                    "which does not convert cleanly from decimal "
+                    "'precision'.  Please specify "
+                    "this type with a separate Oracle Database variant, such "
+                    f"as {type_.__class__.__name__}(precision={precision})."
+                    f"with_variant(oracle.FLOAT"
+                    f"(binary_precision="
+                    f"{estimated_binary_precision}), 'oracle'), so that the "
+                    "Oracle Database specific 'binary_precision' may be "
+                    "specified accurately."
+                )
+            else:
+                precision = binary_precision
+
+        if scale is None:
+            scale = getattr(type_, "scale", None)
+
+        if precision is None:
+            return name
+        elif scale is None:
+            n = "%(name)s(%(precision)s)"
+            return n % {"name": name, "precision": precision}
+        else:
+            n = "%(name)s(%(precision)s, %(scale)s)"
+            return n % {"name": name, "precision": precision, "scale": scale}
+
+    def visit_string(self, type_, **kw):
+        return self.visit_VARCHAR2(type_, **kw)
+
+    def visit_VARCHAR2(self, type_, **kw):
+        return self._visit_varchar(type_, "", "2")
+
+    def visit_NVARCHAR2(self, type_, **kw):
+        return self._visit_varchar(type_, "N", "2")
+
+    visit_NVARCHAR = visit_NVARCHAR2
+
+    def visit_VARCHAR(self, type_, **kw):
+        return self._visit_varchar(type_, "", "")
+
+    def _visit_varchar(self, type_, n, num):
+        if not type_.length:
+            return "%(n)sVARCHAR%(two)s" % {"two": num, "n": n}
+        elif not n and self.dialect._supports_char_length:
+            varchar = "VARCHAR%(two)s(%(length)s CHAR)"
+            return varchar % {"length": type_.length, "two": num}
+        else:
+            varchar = "%(n)sVARCHAR%(two)s(%(length)s)"
+            return varchar % {"length": type_.length, "two": num, "n": n}
+
+    def visit_text(self, type_, **kw):
+        return self.visit_CLOB(type_, **kw)
+
+    def visit_unicode_text(self, type_, **kw):
+        if self.dialect._use_nchar_for_unicode:
+            return self.visit_NCLOB(type_, **kw)
+        else:
+            return self.visit_CLOB(type_, **kw)
+
+    def visit_large_binary(self, type_, **kw):
+        return self.visit_BLOB(type_, **kw)
+
+    def visit_big_integer(self, type_, **kw):
+        return self.visit_NUMBER(type_, precision=19, **kw)
+
+    def visit_boolean(self, type_, **kw):
+        return self.visit_SMALLINT(type_, **kw)
+
+    def visit_RAW(self, type_, **kw):
+        if type_.length:
+            return "RAW(%(length)s)" % {"length": type_.length}
+        else:
+            return "RAW"
+
+    def visit_ROWID(self, type_, **kw):
+        return "ROWID"
+
+
+class OracleCompiler(compiler.SQLCompiler):
+    """Oracle compiler modifies the lexical structure of Select
+    statements to work under non-ANSI configured Oracle databases, if
+    the use_ansi flag is False.
+    """
+
+    compound_keywords = util.update_copy(
+        compiler.SQLCompiler.compound_keywords,
+        {expression.CompoundSelect.EXCEPT: "MINUS"},
+    )
+
+    def __init__(self, *args, **kwargs):
+        self.__wheres = {}
+        super().__init__(*args, **kwargs)
+
+    def visit_mod_binary(self, binary, operator, **kw):
+        return "mod(%s, %s)" % (
+            self.process(binary.left, **kw),
+            self.process(binary.right, **kw),
+        )
+
+    def visit_now_func(self, fn, **kw):
+        return "CURRENT_TIMESTAMP"
+
+    def visit_char_length_func(self, fn, **kw):
+        return "LENGTH" + self.function_argspec(fn, **kw)
+
+    def visit_match_op_binary(self, binary, operator, **kw):
+        return "CONTAINS (%s, %s)" % (
+            self.process(binary.left),
+            self.process(binary.right),
+        )
+
+    def visit_true(self, expr, **kw):
+        return "1"
+
+    def visit_false(self, expr, **kw):
+        return "0"
+
+    def get_cte_preamble(self, recursive):
+        return "WITH"
+
+    def get_select_hint_text(self, byfroms):
+        return " ".join("/*+ %s */" % text for table, text in byfroms.items())
+
+    def function_argspec(self, fn, **kw):
+        if len(fn.clauses) > 0 or fn.name.upper() not in NO_ARG_FNS:
+            return compiler.SQLCompiler.function_argspec(self, fn, **kw)
+        else:
+            return ""
+
+    def visit_function(self, func, **kw):
+        text = super().visit_function(func, **kw)
+        if kw.get("asfrom", False) and func.name.lower() != "table":
+            text = "TABLE (%s)" % text
+        return text
+
+    def visit_table_valued_column(self, element, **kw):
+        text = super().visit_table_valued_column(element, **kw)
+        text = text + ".COLUMN_VALUE"
+        return text
+
+    def default_from(self):
+        """Called when a ``SELECT`` statement has no froms,
+        and no ``FROM`` clause is to be appended.
+
+        The Oracle compiler tacks a "FROM DUAL" to the statement.
+        """
+
+        return " FROM DUAL"
+
+    def visit_join(self, join, from_linter=None, **kwargs):
+        if self.dialect.use_ansi:
+            return compiler.SQLCompiler.visit_join(
+                self, join, from_linter=from_linter, **kwargs
+            )
+        else:
+            if from_linter:
+                from_linter.edges.add((join.left, join.right))
+
+            kwargs["asfrom"] = True
+            if isinstance(join.right, expression.FromGrouping):
+                right = join.right.element
+            else:
+                right = join.right
+            return (
+                self.process(join.left, from_linter=from_linter, **kwargs)
+                + ", "
+                + self.process(right, from_linter=from_linter, **kwargs)
+            )
+
+    def _get_nonansi_join_whereclause(self, froms):
+        clauses = []
+
+        def visit_join(join):
+            if join.isouter:
+                # https://docs.oracle.com/database/121/SQLRF/queries006.htm#SQLRF52354
+                # "apply the outer join operator (+) to all columns of B in
+                # the join condition in the WHERE clause" - that is,
+                # unconditionally regardless of operator or the other side
+                def visit_binary(binary):
+                    if isinstance(
+                        binary.left, expression.ColumnClause
+                    ) and join.right.is_derived_from(binary.left.table):
+                        binary.left = _OuterJoinColumn(binary.left)
+                    elif isinstance(
+                        binary.right, expression.ColumnClause
+                    ) and join.right.is_derived_from(binary.right.table):
+                        binary.right = _OuterJoinColumn(binary.right)
+
+                clauses.append(
+                    visitors.cloned_traverse(
+                        join.onclause, {}, {"binary": visit_binary}
+                    )
+                )
+            else:
+                clauses.append(join.onclause)
+
+            for j in join.left, join.right:
+                if isinstance(j, expression.Join):
+                    visit_join(j)
+                elif isinstance(j, expression.FromGrouping):
+                    visit_join(j.element)
+
+        for f in froms:
+            if isinstance(f, expression.Join):
+                visit_join(f)
+
+        if not clauses:
+            return None
+        else:
+            return sql.and_(*clauses)
+
+    def visit_outer_join_column(self, vc, **kw):
+        return self.process(vc.column, **kw) + "(+)"
+
+    def visit_sequence(self, seq, **kw):
+        return self.preparer.format_sequence(seq) + ".nextval"
+
+    def get_render_as_alias_suffix(self, alias_name_text):
+        """Oracle doesn't like ``FROM table AS alias``"""
+
+        return " " + alias_name_text
+
+    def returning_clause(
+        self, stmt, returning_cols, *, populate_result_map, **kw
+    ):
+        columns = []
+        binds = []
+
+        for i, column in enumerate(
+            expression._select_iterables(returning_cols)
+        ):
+            if (
+                self.isupdate
+                and isinstance(column, sa_schema.Column)
+                and isinstance(column.server_default, Computed)
+                and not self.dialect._supports_update_returning_computed_cols
+            ):
+                util.warn(
+                    "Computed columns don't work with Oracle Database UPDATE "
+                    "statements that use RETURNING; the value of the column "
+                    "*before* the UPDATE takes place is returned.   It is "
+                    "advised to not use RETURNING with an Oracle Database "
+                    "computed column.  Consider setting implicit_returning "
+                    "to False on the Table object in order to avoid implicit "
+                    "RETURNING clauses from being generated for this Table."
+                )
+            if column.type._has_column_expression:
+                col_expr = column.type.column_expression(column)
+            else:
+                col_expr = column
+
+            outparam = sql.outparam("ret_%d" % i, type_=column.type)
+            self.binds[outparam.key] = outparam
+            binds.append(
+                self.bindparam_string(self._truncate_bindparam(outparam))
+            )
+
+            # has_out_parameters would in a normal case be set to True
+            # as a result of the compiler visiting an outparam() object.
+            # in this case, the above outparam() objects are not being
+            # visited.   Ensure the statement itself didn't have other
+            # outparam() objects independently.
+            # technically, this could be supported, but as it would be
+            # a very strange use case without a clear rationale, disallow it
+            if self.has_out_parameters:
+                raise exc.InvalidRequestError(
+                    "Using explicit outparam() objects with "
+                    "UpdateBase.returning() in the same Core DML statement "
+                    "is not supported in the Oracle Database dialects."
+                )
+
+            self._oracle_returning = True
+
+            columns.append(self.process(col_expr, within_columns_clause=False))
+            if populate_result_map:
+                self._add_to_result_map(
+                    getattr(col_expr, "name", col_expr._anon_name_label),
+                    getattr(col_expr, "name", col_expr._anon_name_label),
+                    (
+                        column,
+                        getattr(column, "name", None),
+                        getattr(column, "key", None),
+                    ),
+                    column.type,
+                )
+
+        return "RETURNING " + ", ".join(columns) + " INTO " + ", ".join(binds)
+
+    def _row_limit_clause(self, select, **kw):
+        """Oracle Database 12c supports OFFSET/FETCH operators
+        Use it instead subquery with row_number
+
+        """
+
+        if (
+            select._fetch_clause is not None
+            or not self.dialect._supports_offset_fetch
+        ):
+            return super()._row_limit_clause(
+                select, use_literal_execute_for_simple_int=True, **kw
+            )
+        else:
+            return self.fetch_clause(
+                select,
+                fetch_clause=self._get_limit_or_fetch(select),
+                use_literal_execute_for_simple_int=True,
+                **kw,
+            )
+
+    def _get_limit_or_fetch(self, select):
+        if select._fetch_clause is None:
+            return select._limit_clause
+        else:
+            return select._fetch_clause
+
+    def translate_select_structure(self, select_stmt, **kwargs):
+        select = select_stmt
+
+        if not getattr(select, "_oracle_visit", None):
+            if not self.dialect.use_ansi:
+                froms = self._display_froms_for_select(
+                    select, kwargs.get("asfrom", False)
+                )
+                whereclause = self._get_nonansi_join_whereclause(froms)
+                if whereclause is not None:
+                    select = select.where(whereclause)
+                    select._oracle_visit = True
+
+            # if fetch is used this is not needed
+            if (
+                select._has_row_limiting_clause
+                and not self.dialect._supports_offset_fetch
+                and select._fetch_clause is None
+            ):
+                limit_clause = select._limit_clause
+                offset_clause = select._offset_clause
+
+                if select._simple_int_clause(limit_clause):
+                    limit_clause = limit_clause.render_literal_execute()
+
+                if select._simple_int_clause(offset_clause):
+                    offset_clause = offset_clause.render_literal_execute()
+
+                # currently using form at:
+                # https://blogs.oracle.com/oraclemagazine/\
+                # on-rownum-and-limiting-results
+
+                orig_select = select
+                select = select._generate()
+                select._oracle_visit = True
+
+                # add expressions to accommodate FOR UPDATE OF
+                for_update = select._for_update_arg
+                if for_update is not None and for_update.of:
+                    for_update = for_update._clone()
+                    for_update._copy_internals()
+
+                    for elem in for_update.of:
+                        if not select.selected_columns.contains_column(elem):
+                            select = select.add_columns(elem)
+
+                # Wrap the middle select and add the hint
+                inner_subquery = select.alias()
+                limitselect = sql.select(
+                    *[
+                        c
+                        for c in inner_subquery.c
+                        if orig_select.selected_columns.corresponding_column(c)
+                        is not None
+                    ]
+                )
+
+                if (
+                    limit_clause is not None
+                    and self.dialect.optimize_limits
+                    and select._simple_int_clause(limit_clause)
+                ):
+                    limitselect = limitselect.prefix_with(
+                        expression.text(
+                            "/*+ FIRST_ROWS(%s) */"
+                            % self.process(limit_clause, **kwargs)
+                        )
+                    )
+
+                limitselect._oracle_visit = True
+                limitselect._is_wrapper = True
+
+                # add expressions to accommodate FOR UPDATE OF
+                if for_update is not None and for_update.of:
+                    adapter = sql_util.ClauseAdapter(inner_subquery)
+                    for_update.of = [
+                        adapter.traverse(elem) for elem in for_update.of
+                    ]
+
+                # If needed, add the limiting clause
+                if limit_clause is not None:
+                    if select._simple_int_clause(limit_clause) and (
+                        offset_clause is None
+                        or select._simple_int_clause(offset_clause)
+                    ):
+                        max_row = limit_clause
+
+                        if offset_clause is not None:
+                            max_row = max_row + offset_clause
+
+                    else:
+                        max_row = limit_clause
+
+                        if offset_clause is not None:
+                            max_row = max_row + offset_clause
+                    limitselect = limitselect.where(
+                        sql.literal_column("ROWNUM") <= max_row
+                    )
+
+                # If needed, add the ora_rn, and wrap again with offset.
+                if offset_clause is None:
+                    limitselect._for_update_arg = for_update
+                    select = limitselect
+                else:
+                    limitselect = limitselect.add_columns(
+                        sql.literal_column("ROWNUM").label("ora_rn")
+                    )
+                    limitselect._oracle_visit = True
+                    limitselect._is_wrapper = True
+
+                    if for_update is not None and for_update.of:
+                        limitselect_cols = limitselect.selected_columns
+                        for elem in for_update.of:
+                            if (
+                                limitselect_cols.corresponding_column(elem)
+                                is None
+                            ):
+                                limitselect = limitselect.add_columns(elem)
+
+                    limit_subquery = limitselect.alias()
+                    origselect_cols = orig_select.selected_columns
+                    offsetselect = sql.select(
+                        *[
+                            c
+                            for c in limit_subquery.c
+                            if origselect_cols.corresponding_column(c)
+                            is not None
+                        ]
+                    )
+
+                    offsetselect._oracle_visit = True
+                    offsetselect._is_wrapper = True
+
+                    if for_update is not None and for_update.of:
+                        adapter = sql_util.ClauseAdapter(limit_subquery)
+                        for_update.of = [
+                            adapter.traverse(elem) for elem in for_update.of
+                        ]
+
+                    offsetselect = offsetselect.where(
+                        sql.literal_column("ora_rn") > offset_clause
+                    )
+
+                    offsetselect._for_update_arg = for_update
+                    select = offsetselect
+
+        return select
+
+    def limit_clause(self, select, **kw):
+        return ""
+
+    def visit_empty_set_expr(self, type_, **kw):
+        return "SELECT 1 FROM DUAL WHERE 1!=1"
+
+    def for_update_clause(self, select, **kw):
+        if self.is_subquery():
+            return ""
+
+        tmp = " FOR UPDATE"
+
+        if select._for_update_arg.of:
+            tmp += " OF " + ", ".join(
+                self.process(elem, **kw) for elem in select._for_update_arg.of
+            )
+
+        if select._for_update_arg.nowait:
+            tmp += " NOWAIT"
+        if select._for_update_arg.skip_locked:
+            tmp += " SKIP LOCKED"
+
+        return tmp
+
+    def visit_is_distinct_from_binary(self, binary, operator, **kw):
+        return "DECODE(%s, %s, 0, 1) = 1" % (
+            self.process(binary.left),
+            self.process(binary.right),
+        )
+
+    def visit_is_not_distinct_from_binary(self, binary, operator, **kw):
+        return "DECODE(%s, %s, 0, 1) = 0" % (
+            self.process(binary.left),
+            self.process(binary.right),
+        )
+
+    def visit_regexp_match_op_binary(self, binary, operator, **kw):
+        string = self.process(binary.left, **kw)
+        pattern = self.process(binary.right, **kw)
+        flags = binary.modifiers["flags"]
+        if flags is None:
+            return "REGEXP_LIKE(%s, %s)" % (string, pattern)
+        else:
+            return "REGEXP_LIKE(%s, %s, %s)" % (
+                string,
+                pattern,
+                self.render_literal_value(flags, sqltypes.STRINGTYPE),
+            )
+
+    def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
+        return "NOT %s" % self.visit_regexp_match_op_binary(
+            binary, operator, **kw
+        )
+
+    def visit_regexp_replace_op_binary(self, binary, operator, **kw):
+        string = self.process(binary.left, **kw)
+        pattern_replace = self.process(binary.right, **kw)
+        flags = binary.modifiers["flags"]
+        if flags is None:
+            return "REGEXP_REPLACE(%s, %s)" % (
+                string,
+                pattern_replace,
+            )
+        else:
+            return "REGEXP_REPLACE(%s, %s, %s)" % (
+                string,
+                pattern_replace,
+                self.render_literal_value(flags, sqltypes.STRINGTYPE),
+            )
+
+    def visit_aggregate_strings_func(self, fn, **kw):
+        return "LISTAGG%s" % self.function_argspec(fn, **kw)
+
+    def _visit_bitwise(self, binary, fn_name, custom_right=None, **kw):
+        left = self.process(binary.left, **kw)
+        right = self.process(
+            custom_right if custom_right is not None else binary.right, **kw
+        )
+        return f"{fn_name}({left}, {right})"
+
+    def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
+        return self._visit_bitwise(binary, "BITXOR", **kw)
+
+    def visit_bitwise_or_op_binary(self, binary, operator, **kw):
+        return self._visit_bitwise(binary, "BITOR", **kw)
+
+    def visit_bitwise_and_op_binary(self, binary, operator, **kw):
+        return self._visit_bitwise(binary, "BITAND", **kw)
+
+    def visit_bitwise_rshift_op_binary(self, binary, operator, **kw):
+        raise exc.CompileError("Cannot compile bitwise_rshift in oracle")
+
+    def visit_bitwise_lshift_op_binary(self, binary, operator, **kw):
+        raise exc.CompileError("Cannot compile bitwise_lshift in oracle")
+
+    def visit_bitwise_not_op_unary_operator(self, element, operator, **kw):
+        raise exc.CompileError("Cannot compile bitwise_not in oracle")
+
+
+class OracleDDLCompiler(compiler.DDLCompiler):
+    def define_constraint_cascades(self, constraint):
+        text = ""
+        if constraint.ondelete is not None:
+            text += " ON DELETE %s" % constraint.ondelete
+
+        # oracle has no ON UPDATE CASCADE -
+        # its only available via triggers
+        # https://web.archive.org/web/20090317041251/https://asktom.oracle.com/tkyte/update_cascade/index.html
+        if constraint.onupdate is not None:
+            util.warn(
+                "Oracle Database does not contain native UPDATE CASCADE "
+                "functionality - onupdates will not be rendered for foreign "
+                "keys.  Consider using deferrable=True, initially='deferred' "
+                "or triggers."
+            )
+
+        return text
+
+    def visit_drop_table_comment(self, drop, **kw):
+        return "COMMENT ON TABLE %s IS ''" % self.preparer.format_table(
+            drop.element
+        )
+
+    def visit_create_index(self, create, **kw):
+        index = create.element
+        self._verify_index_table(index)
+        preparer = self.preparer
+        text = "CREATE "
+        if index.unique:
+            text += "UNIQUE "
+        if index.dialect_options["oracle"]["bitmap"]:
+            text += "BITMAP "
+        text += "INDEX %s ON %s (%s)" % (
+            self._prepared_index_name(index, include_schema=True),
+            preparer.format_table(index.table, use_schema=True),
+            ", ".join(
+                self.sql_compiler.process(
+                    expr, include_table=False, literal_binds=True
+                )
+                for expr in index.expressions
+            ),
+        )
+        if index.dialect_options["oracle"]["compress"] is not False:
+            if index.dialect_options["oracle"]["compress"] is True:
+                text += " COMPRESS"
+            else:
+                text += " COMPRESS %d" % (
+                    index.dialect_options["oracle"]["compress"]
+                )
+        return text
+
+    def post_create_table(self, table):
+        table_opts = []
+        opts = table.dialect_options["oracle"]
+
+        if opts["on_commit"]:
+            on_commit_options = opts["on_commit"].replace("_", " ").upper()
+            table_opts.append("\n ON COMMIT %s" % on_commit_options)
+
+        if opts["compress"]:
+            if opts["compress"] is True:
+                table_opts.append("\n COMPRESS")
+            else:
+                table_opts.append("\n COMPRESS FOR %s" % (opts["compress"]))
+        if opts["tablespace"]:
+            table_opts.append(
+                "\n TABLESPACE %s" % self.preparer.quote(opts["tablespace"])
+            )
+        return "".join(table_opts)
+
+    def get_identity_options(self, identity_options):
+        text = super().get_identity_options(identity_options)
+        text = text.replace("NO MINVALUE", "NOMINVALUE")
+        text = text.replace("NO MAXVALUE", "NOMAXVALUE")
+        text = text.replace("NO CYCLE", "NOCYCLE")
+        if identity_options.order is not None:
+            text += " ORDER" if identity_options.order else " NOORDER"
+        return text.strip()
+
+    def visit_computed_column(self, generated, **kw):
+        text = "GENERATED ALWAYS AS (%s)" % self.sql_compiler.process(
+            generated.sqltext, include_table=False, literal_binds=True
+        )
+        if generated.persisted is True:
+            raise exc.CompileError(
+                "Oracle Database computed columns do not support 'stored' "
+                "persistence; set the 'persisted' flag to None or False for "
+                "Oracle Database support."
+            )
+        elif generated.persisted is False:
+            text += " VIRTUAL"
+        return text
+
+    def visit_identity_column(self, identity, **kw):
+        if identity.always is None:
+            kind = ""
+        else:
+            kind = "ALWAYS" if identity.always else "BY DEFAULT"
+        text = "GENERATED %s" % kind
+        if identity.on_null:
+            text += " ON NULL"
+        text += " AS IDENTITY"
+        options = self.get_identity_options(identity)
+        if options:
+            text += " (%s)" % options
+        return text
+
+
+class OracleIdentifierPreparer(compiler.IdentifierPreparer):
+    reserved_words = {x.lower() for x in RESERVED_WORDS}
+    illegal_initial_characters = {str(dig) for dig in range(0, 10)}.union(
+        ["_", "$"]
+    )
+
+    def _bindparam_requires_quotes(self, value):
+        """Return True if the given identifier requires quoting."""
+        lc_value = value.lower()
+        return (
+            lc_value in self.reserved_words
+            or value[0] in self.illegal_initial_characters
+            or not self.legal_characters.match(str(value))
+        )
+
+    def format_savepoint(self, savepoint):
+        name = savepoint.ident.lstrip("_")
+        return super().format_savepoint(savepoint, name)
+
+
+class OracleExecutionContext(default.DefaultExecutionContext):
+    def fire_sequence(self, seq, type_):
+        return self._execute_scalar(
+            "SELECT "
+            + self.identifier_preparer.format_sequence(seq)
+            + ".nextval FROM DUAL",
+            type_,
+        )
+
+    def pre_exec(self):
+        if self.statement and "_oracle_dblink" in self.execution_options:
+            self.statement = self.statement.replace(
+                dictionary.DB_LINK_PLACEHOLDER,
+                self.execution_options["_oracle_dblink"],
+            )
+
+
+class OracleDialect(default.DefaultDialect):
+    name = "oracle"
+    supports_statement_cache = True
+    supports_alter = True
+    max_identifier_length = 128
+
+    _supports_offset_fetch = True
+
+    insert_returning = True
+    update_returning = True
+    delete_returning = True
+
+    div_is_floordiv = False
+
+    supports_simple_order_by_label = False
+    cte_follows_insert = True
+    returns_native_bytes = True
+
+    supports_sequences = True
+    sequences_optional = False
+    postfetch_lastrowid = False
+
+    default_paramstyle = "named"
+    colspecs = colspecs
+    ischema_names = ischema_names
+    requires_name_normalize = True
+
+    supports_comments = True
+
+    supports_default_values = False
+    supports_default_metavalue = True
+    supports_empty_insert = False
+    supports_identity_columns = True
+
+    statement_compiler = OracleCompiler
+    ddl_compiler = OracleDDLCompiler
+    type_compiler_cls = OracleTypeCompiler
+    preparer = OracleIdentifierPreparer
+    execution_ctx_cls = OracleExecutionContext
+
+    reflection_options = ("oracle_resolve_synonyms",)
+
+    _use_nchar_for_unicode = False
+
+    construct_arguments = [
+        (
+            sa_schema.Table,
+            {
+                "resolve_synonyms": False,
+                "on_commit": None,
+                "compress": False,
+                "tablespace": None,
+            },
+        ),
+        (sa_schema.Index, {"bitmap": False, "compress": False}),
+    ]
+
+    @util.deprecated_params(
+        use_binds_for_limits=(
+            "1.4",
+            "The ``use_binds_for_limits`` Oracle Database dialect parameter "
+            "is deprecated. The dialect now renders LIMIT / OFFSET integers "
+            "inline in all cases using a post-compilation hook, so that the "
+            "value is still represented by a 'bound parameter' on the Core "
+            "Expression side.",
+        )
+    )
+    def __init__(
+        self,
+        use_ansi=True,
+        optimize_limits=False,
+        use_binds_for_limits=None,
+        use_nchar_for_unicode=False,
+        exclude_tablespaces=("SYSTEM", "SYSAUX"),
+        enable_offset_fetch=True,
+        **kwargs,
+    ):
+        default.DefaultDialect.__init__(self, **kwargs)
+        self._use_nchar_for_unicode = use_nchar_for_unicode
+        self.use_ansi = use_ansi
+        self.optimize_limits = optimize_limits
+        self.exclude_tablespaces = exclude_tablespaces
+        self.enable_offset_fetch = self._supports_offset_fetch = (
+            enable_offset_fetch
+        )
+
+    def initialize(self, connection):
+        super().initialize(connection)
+
+        # Oracle 8i has RETURNING:
+        # https://docs.oracle.com/cd/A87860_01/doc/index.htm
+
+        # so does Oracle8:
+        # https://docs.oracle.com/cd/A64702_01/doc/index.htm
+
+        if self._is_oracle_8:
+            self.colspecs = self.colspecs.copy()
+            self.colspecs.pop(sqltypes.Interval)
+            self.use_ansi = False
+
+        self.supports_identity_columns = self.server_version_info >= (12,)
+        self._supports_offset_fetch = (
+            self.enable_offset_fetch and self.server_version_info >= (12,)
+        )
+
+    def _get_effective_compat_server_version_info(self, connection):
+        # dialect does not need compat levels below 12.2, so don't query
+        # in those cases
+
+        if self.server_version_info < (12, 2):
+            return self.server_version_info
+        try:
+            compat = connection.exec_driver_sql(
+                "SELECT value FROM v$parameter WHERE name = 'compatible'"
+            ).scalar()
+        except exc.DBAPIError:
+            compat = None
+
+        if compat:
+            try:
+                return tuple(int(x) for x in compat.split("."))
+            except:
+                return self.server_version_info
+        else:
+            return self.server_version_info
+
+    @property
+    def _is_oracle_8(self):
+        return self.server_version_info and self.server_version_info < (9,)
+
+    @property
+    def _supports_table_compression(self):
+        return self.server_version_info and self.server_version_info >= (10, 1)
+
+    @property
+    def _supports_table_compress_for(self):
+        return self.server_version_info and self.server_version_info >= (11,)
+
+    @property
+    def _supports_char_length(self):
+        return not self._is_oracle_8
+
+    @property
+    def _supports_update_returning_computed_cols(self):
+        # on version 18 this error is no longet present while it happens on 11
+        # it may work also on versions before the 18
+        return self.server_version_info and self.server_version_info >= (18,)
+
+    @property
+    def _supports_except_all(self):
+        return self.server_version_info and self.server_version_info >= (21,)
+
+    def do_release_savepoint(self, connection, name):
+        # Oracle does not support RELEASE SAVEPOINT
+        pass
+
+    def _check_max_identifier_length(self, connection):
+        if self._get_effective_compat_server_version_info(connection) < (
+            12,
+            2,
+        ):
+            return 30
+        else:
+            # use the default
+            return None
+
+    def get_isolation_level_values(self, dbapi_connection):
+        return ["READ COMMITTED", "SERIALIZABLE"]
+
+    def get_default_isolation_level(self, dbapi_conn):
+        try:
+            return self.get_isolation_level(dbapi_conn)
+        except NotImplementedError:
+            raise
+        except:
+            return "READ COMMITTED"
+
+    def _execute_reflection(
+        self, connection, query, dblink, returns_long, params=None
+    ):
+        if dblink and not dblink.startswith("@"):
+            dblink = f"@{dblink}"
+        execution_options = {
+            # handle db links
+            "_oracle_dblink": dblink or "",
+            # override any schema translate map
+            "schema_translate_map": None,
+        }
+
+        if dblink and returns_long:
+            # Oracle seems to error with
+            # "ORA-00997: illegal use of LONG datatype" when returning
+            # LONG columns via a dblink in a query with bind params
+            # This type seems to be very hard to cast into something else
+            # so it seems easier to just use bind param in this case
+            def visit_bindparam(bindparam):
+                bindparam.literal_execute = True
+
+            query = visitors.cloned_traverse(
+                query, {}, {"bindparam": visit_bindparam}
+            )
+        return connection.execute(
+            query, params, execution_options=execution_options
+        )
+
+    @util.memoized_property
+    def _has_table_query(self):
+        # materialized views are returned by all_tables
+        tables = (
+            select(
+                dictionary.all_tables.c.table_name,
+                dictionary.all_tables.c.owner,
+            )
+            .union_all(
+                select(
+                    dictionary.all_views.c.view_name.label("table_name"),
+                    dictionary.all_views.c.owner,
+                )
+            )
+            .subquery("tables_and_views")
+        )
+
+        query = select(tables.c.table_name).where(
+            tables.c.table_name == bindparam("table_name"),
+            tables.c.owner == bindparam("owner"),
+        )
+        return query
+
+    @reflection.cache
+    def has_table(
+        self, connection, table_name, schema=None, dblink=None, **kw
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+        self._ensure_has_table_connection(connection)
+
+        if not schema:
+            schema = self.default_schema_name
+
+        params = {
+            "table_name": self.denormalize_name(table_name),
+            "owner": self.denormalize_schema_name(schema),
+        }
+        cursor = self._execute_reflection(
+            connection,
+            self._has_table_query,
+            dblink,
+            returns_long=False,
+            params=params,
+        )
+        return bool(cursor.scalar())
+
+    @reflection.cache
+    def has_sequence(
+        self, connection, sequence_name, schema=None, dblink=None, **kw
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+        if not schema:
+            schema = self.default_schema_name
+
+        query = select(dictionary.all_sequences.c.sequence_name).where(
+            dictionary.all_sequences.c.sequence_name
+            == self.denormalize_schema_name(sequence_name),
+            dictionary.all_sequences.c.sequence_owner
+            == self.denormalize_schema_name(schema),
+        )
+
+        cursor = self._execute_reflection(
+            connection, query, dblink, returns_long=False
+        )
+        return bool(cursor.scalar())
+
+    def _get_default_schema_name(self, connection):
+        return self.normalize_name(
+            connection.exec_driver_sql(
+                "select sys_context( 'userenv', 'current_schema' ) from dual"
+            ).scalar()
+        )
+
+    def denormalize_schema_name(self, name):
+        # look for quoted_name
+        force = getattr(name, "quote", None)
+        if force is None and name == "public":
+            # look for case insensitive, no quoting specified, "public"
+            return "PUBLIC"
+        return super().denormalize_name(name)
+
+    @reflection.flexi_cache(
+        ("schema", InternalTraversal.dp_string),
+        ("filter_names", InternalTraversal.dp_string_list),
+        ("dblink", InternalTraversal.dp_string),
+    )
+    def _get_synonyms(self, connection, schema, filter_names, dblink, **kw):
+        owner = self.denormalize_schema_name(
+            schema or self.default_schema_name
+        )
+
+        has_filter_names, params = self._prepare_filter_names(filter_names)
+        query = select(
+            dictionary.all_synonyms.c.synonym_name,
+            dictionary.all_synonyms.c.table_name,
+            dictionary.all_synonyms.c.table_owner,
+            dictionary.all_synonyms.c.db_link,
+        ).where(dictionary.all_synonyms.c.owner == owner)
+        if has_filter_names:
+            query = query.where(
+                dictionary.all_synonyms.c.synonym_name.in_(
+                    params["filter_names"]
+                )
+            )
+        result = self._execute_reflection(
+            connection, query, dblink, returns_long=False
+        ).mappings()
+        return result.all()
+
+    @lru_cache()
+    def _all_objects_query(
+        self, owner, scope, kind, has_filter_names, has_mat_views
+    ):
+        query = (
+            select(dictionary.all_objects.c.object_name)
+            .select_from(dictionary.all_objects)
+            .where(dictionary.all_objects.c.owner == owner)
+        )
+
+        # NOTE: materialized views are listed in all_objects twice;
+        # once as MATERIALIZE VIEW and once as TABLE
+        if kind is ObjectKind.ANY:
+            # materilaized view are listed also as tables so there is no
+            # need to add them to the in_.
+            query = query.where(
+                dictionary.all_objects.c.object_type.in_(("TABLE", "VIEW"))
+            )
+        else:
+            object_type = []
+            if ObjectKind.VIEW in kind:
+                object_type.append("VIEW")
+            if (
+                ObjectKind.MATERIALIZED_VIEW in kind
+                and ObjectKind.TABLE not in kind
+            ):
+                # materilaized view are listed also as tables so there is no
+                # need to add them to the in_ if also selecting tables.
+                object_type.append("MATERIALIZED VIEW")
+            if ObjectKind.TABLE in kind:
+                object_type.append("TABLE")
+                if has_mat_views and ObjectKind.MATERIALIZED_VIEW not in kind:
+                    # materialized view are listed also as tables,
+                    # so they need to be filtered out
+                    # EXCEPT ALL / MINUS profiles as faster than using
+                    # NOT EXISTS or NOT IN with a subquery, but it's in
+                    # general faster to get the mat view names and exclude
+                    # them only when needed
+                    query = query.where(
+                        dictionary.all_objects.c.object_name.not_in(
+                            bindparam("mat_views")
+                        )
+                    )
+            query = query.where(
+                dictionary.all_objects.c.object_type.in_(object_type)
+            )
+
+        # handles scope
+        if scope is ObjectScope.DEFAULT:
+            query = query.where(dictionary.all_objects.c.temporary == "N")
+        elif scope is ObjectScope.TEMPORARY:
+            query = query.where(dictionary.all_objects.c.temporary == "Y")
+
+        if has_filter_names:
+            query = query.where(
+                dictionary.all_objects.c.object_name.in_(
+                    bindparam("filter_names")
+                )
+            )
+        return query
+
+    @reflection.flexi_cache(
+        ("schema", InternalTraversal.dp_string),
+        ("scope", InternalTraversal.dp_plain_obj),
+        ("kind", InternalTraversal.dp_plain_obj),
+        ("filter_names", InternalTraversal.dp_string_list),
+        ("dblink", InternalTraversal.dp_string),
+    )
+    def _get_all_objects(
+        self, connection, schema, scope, kind, filter_names, dblink, **kw
+    ):
+        owner = self.denormalize_schema_name(
+            schema or self.default_schema_name
+        )
+
+        has_filter_names, params = self._prepare_filter_names(filter_names)
+        has_mat_views = False
+        if (
+            ObjectKind.TABLE in kind
+            and ObjectKind.MATERIALIZED_VIEW not in kind
+        ):
+            # see note in _all_objects_query
+            mat_views = self.get_materialized_view_names(
+                connection, schema, dblink, _normalize=False, **kw
+            )
+            if mat_views:
+                params["mat_views"] = mat_views
+                has_mat_views = True
+
+        query = self._all_objects_query(
+            owner, scope, kind, has_filter_names, has_mat_views
+        )
+
+        result = self._execute_reflection(
+            connection, query, dblink, returns_long=False, params=params
+        ).scalars()
+
+        return result.all()
+
+    def _handle_synonyms_decorator(fn):
+        @wraps(fn)
+        def wrapper(self, *args, **kwargs):
+            return self._handle_synonyms(fn, *args, **kwargs)
+
+        return wrapper
+
+    def _handle_synonyms(self, fn, connection, *args, **kwargs):
+        if not kwargs.get("oracle_resolve_synonyms", False):
+            return fn(self, connection, *args, **kwargs)
+
+        original_kw = kwargs.copy()
+        schema = kwargs.pop("schema", None)
+        result = self._get_synonyms(
+            connection,
+            schema=schema,
+            filter_names=kwargs.pop("filter_names", None),
+            dblink=kwargs.pop("dblink", None),
+            info_cache=kwargs.get("info_cache", None),
+        )
+
+        dblinks_owners = defaultdict(dict)
+        for row in result:
+            key = row["db_link"], row["table_owner"]
+            tn = self.normalize_name(row["table_name"])
+            dblinks_owners[key][tn] = row["synonym_name"]
+
+        if not dblinks_owners:
+            # No synonym, do the plain thing
+            return fn(self, connection, *args, **original_kw)
+
+        data = {}
+        for (dblink, table_owner), mapping in dblinks_owners.items():
+            call_kw = {
+                **original_kw,
+                "schema": table_owner,
+                "dblink": self.normalize_name(dblink),
+                "filter_names": mapping.keys(),
+            }
+            call_result = fn(self, connection, *args, **call_kw)
+            for (_, tn), value in call_result:
+                synonym_name = self.normalize_name(mapping[tn])
+                data[(schema, synonym_name)] = value
+        return data.items()
+
+    @reflection.cache
+    def get_schema_names(self, connection, dblink=None, **kw):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+        query = select(dictionary.all_users.c.username).order_by(
+            dictionary.all_users.c.username
+        )
+        result = self._execute_reflection(
+            connection, query, dblink, returns_long=False
+        ).scalars()
+        return [self.normalize_name(row) for row in result]
+
+    @reflection.cache
+    def get_table_names(self, connection, schema=None, dblink=None, **kw):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+        # note that table_names() isn't loading DBLINKed or synonym'ed tables
+        if schema is None:
+            schema = self.default_schema_name
+
+        den_schema = self.denormalize_schema_name(schema)
+        if kw.get("oracle_resolve_synonyms", False):
+            tables = (
+                select(
+                    dictionary.all_tables.c.table_name,
+                    dictionary.all_tables.c.owner,
+                    dictionary.all_tables.c.iot_name,
+                    dictionary.all_tables.c.duration,
+                    dictionary.all_tables.c.tablespace_name,
+                )
+                .union_all(
+                    select(
+                        dictionary.all_synonyms.c.synonym_name.label(
+                            "table_name"
+                        ),
+                        dictionary.all_synonyms.c.owner,
+                        dictionary.all_tables.c.iot_name,
+                        dictionary.all_tables.c.duration,
+                        dictionary.all_tables.c.tablespace_name,
+                    )
+                    .select_from(dictionary.all_tables)
+                    .join(
+                        dictionary.all_synonyms,
+                        and_(
+                            dictionary.all_tables.c.table_name
+                            == dictionary.all_synonyms.c.table_name,
+                            dictionary.all_tables.c.owner
+                            == func.coalesce(
+                                dictionary.all_synonyms.c.table_owner,
+                                dictionary.all_synonyms.c.owner,
+                            ),
+                        ),
+                    )
+                )
+                .subquery("available_tables")
+            )
+        else:
+            tables = dictionary.all_tables
+
+        query = select(tables.c.table_name)
+        if self.exclude_tablespaces:
+            query = query.where(
+                func.coalesce(
+                    tables.c.tablespace_name, "no tablespace"
+                ).not_in(self.exclude_tablespaces)
+            )
+        query = query.where(
+            tables.c.owner == den_schema,
+            tables.c.iot_name.is_(null()),
+            tables.c.duration.is_(null()),
+        )
+
+        # remove materialized views
+        mat_query = select(
+            dictionary.all_mviews.c.mview_name.label("table_name")
+        ).where(dictionary.all_mviews.c.owner == den_schema)
+
+        query = (
+            query.except_all(mat_query)
+            if self._supports_except_all
+            else query.except_(mat_query)
+        )
+
+        result = self._execute_reflection(
+            connection, query, dblink, returns_long=False
+        ).scalars()
+        return [self.normalize_name(row) for row in result]
+
+    @reflection.cache
+    def get_temp_table_names(self, connection, dblink=None, **kw):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+        schema = self.denormalize_schema_name(self.default_schema_name)
+
+        query = select(dictionary.all_tables.c.table_name)
+        if self.exclude_tablespaces:
+            query = query.where(
+                func.coalesce(
+                    dictionary.all_tables.c.tablespace_name, "no tablespace"
+                ).not_in(self.exclude_tablespaces)
+            )
+        query = query.where(
+            dictionary.all_tables.c.owner == schema,
+            dictionary.all_tables.c.iot_name.is_(null()),
+            dictionary.all_tables.c.duration.is_not(null()),
+        )
+
+        result = self._execute_reflection(
+            connection, query, dblink, returns_long=False
+        ).scalars()
+        return [self.normalize_name(row) for row in result]
+
+    @reflection.cache
+    def get_materialized_view_names(
+        self, connection, schema=None, dblink=None, _normalize=True, **kw
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+        if not schema:
+            schema = self.default_schema_name
+
+        query = select(dictionary.all_mviews.c.mview_name).where(
+            dictionary.all_mviews.c.owner
+            == self.denormalize_schema_name(schema)
+        )
+        result = self._execute_reflection(
+            connection, query, dblink, returns_long=False
+        ).scalars()
+        if _normalize:
+            return [self.normalize_name(row) for row in result]
+        else:
+            return result.all()
+
+    @reflection.cache
+    def get_view_names(self, connection, schema=None, dblink=None, **kw):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+        if not schema:
+            schema = self.default_schema_name
+
+        query = select(dictionary.all_views.c.view_name).where(
+            dictionary.all_views.c.owner
+            == self.denormalize_schema_name(schema)
+        )
+        result = self._execute_reflection(
+            connection, query, dblink, returns_long=False
+        ).scalars()
+        return [self.normalize_name(row) for row in result]
+
+    @reflection.cache
+    def get_sequence_names(self, connection, schema=None, dblink=None, **kw):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link."""
+        if not schema:
+            schema = self.default_schema_name
+        query = select(dictionary.all_sequences.c.sequence_name).where(
+            dictionary.all_sequences.c.sequence_owner
+            == self.denormalize_schema_name(schema)
+        )
+
+        result = self._execute_reflection(
+            connection, query, dblink, returns_long=False
+        ).scalars()
+        return [self.normalize_name(row) for row in result]
+
+    def _value_or_raise(self, data, table, schema):
+        table = self.normalize_name(str(table))
+        try:
+            return dict(data)[(schema, table)]
+        except KeyError:
+            raise exc.NoSuchTableError(
+                f"{schema}.{table}" if schema else table
+            ) from None
+
+    def _prepare_filter_names(self, filter_names):
+        if filter_names:
+            fn = [self.denormalize_name(name) for name in filter_names]
+            return True, {"filter_names": fn}
+        else:
+            return False, {}
+
+    @reflection.cache
+    def get_table_options(self, connection, table_name, schema=None, **kw):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        data = self.get_multi_table_options(
+            connection,
+            schema=schema,
+            filter_names=[table_name],
+            scope=ObjectScope.ANY,
+            kind=ObjectKind.ANY,
+            **kw,
+        )
+        return self._value_or_raise(data, table_name, schema)
+
+    @lru_cache()
+    def _table_options_query(
+        self, owner, scope, kind, has_filter_names, has_mat_views
+    ):
+        query = select(
+            dictionary.all_tables.c.table_name,
+            (
+                dictionary.all_tables.c.compression
+                if self._supports_table_compression
+                else sql.null().label("compression")
+            ),
+            (
+                dictionary.all_tables.c.compress_for
+                if self._supports_table_compress_for
+                else sql.null().label("compress_for")
+            ),
+            dictionary.all_tables.c.tablespace_name,
+        ).where(dictionary.all_tables.c.owner == owner)
+        if has_filter_names:
+            query = query.where(
+                dictionary.all_tables.c.table_name.in_(
+                    bindparam("filter_names")
+                )
+            )
+        if scope is ObjectScope.DEFAULT:
+            query = query.where(dictionary.all_tables.c.duration.is_(null()))
+        elif scope is ObjectScope.TEMPORARY:
+            query = query.where(
+                dictionary.all_tables.c.duration.is_not(null())
+            )
+
+        if (
+            has_mat_views
+            and ObjectKind.TABLE in kind
+            and ObjectKind.MATERIALIZED_VIEW not in kind
+        ):
+            # cant use EXCEPT ALL / MINUS here because we don't have an
+            # excludable row vs. the query above
+            # outerjoin + where null works better on oracle 21 but 11 does
+            # not like it at all. this is the next best thing
+
+            query = query.where(
+                dictionary.all_tables.c.table_name.not_in(
+                    bindparam("mat_views")
+                )
+            )
+        elif (
+            ObjectKind.TABLE not in kind
+            and ObjectKind.MATERIALIZED_VIEW in kind
+        ):
+            query = query.where(
+                dictionary.all_tables.c.table_name.in_(bindparam("mat_views"))
+            )
+        return query
+
+    @_handle_synonyms_decorator
+    def get_multi_table_options(
+        self,
+        connection,
+        *,
+        schema,
+        filter_names,
+        scope,
+        kind,
+        dblink=None,
+        **kw,
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        owner = self.denormalize_schema_name(
+            schema or self.default_schema_name
+        )
+
+        has_filter_names, params = self._prepare_filter_names(filter_names)
+        has_mat_views = False
+
+        if (
+            ObjectKind.TABLE in kind
+            and ObjectKind.MATERIALIZED_VIEW not in kind
+        ):
+            # see note in _table_options_query
+            mat_views = self.get_materialized_view_names(
+                connection, schema, dblink, _normalize=False, **kw
+            )
+            if mat_views:
+                params["mat_views"] = mat_views
+                has_mat_views = True
+        elif (
+            ObjectKind.TABLE not in kind
+            and ObjectKind.MATERIALIZED_VIEW in kind
+        ):
+            mat_views = self.get_materialized_view_names(
+                connection, schema, dblink, _normalize=False, **kw
+            )
+            params["mat_views"] = mat_views
+
+        options = {}
+        default = ReflectionDefaults.table_options
+
+        if ObjectKind.TABLE in kind or ObjectKind.MATERIALIZED_VIEW in kind:
+            query = self._table_options_query(
+                owner, scope, kind, has_filter_names, has_mat_views
+            )
+            result = self._execute_reflection(
+                connection, query, dblink, returns_long=False, params=params
+            )
+
+            for table, compression, compress_for, tablespace in result:
+                data = default()
+                if compression == "ENABLED":
+                    data["oracle_compress"] = compress_for
+                if tablespace:
+                    data["oracle_tablespace"] = tablespace
+                options[(schema, self.normalize_name(table))] = data
+        if ObjectKind.VIEW in kind and ObjectScope.DEFAULT in scope:
+            # add the views (no temporary views)
+            for view in self.get_view_names(connection, schema, dblink, **kw):
+                if not filter_names or view in filter_names:
+                    options[(schema, view)] = default()
+
+        return options.items()
+
+    @reflection.cache
+    def get_columns(self, connection, table_name, schema=None, **kw):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+
+        data = self.get_multi_columns(
+            connection,
+            schema=schema,
+            filter_names=[table_name],
+            scope=ObjectScope.ANY,
+            kind=ObjectKind.ANY,
+            **kw,
+        )
+        return self._value_or_raise(data, table_name, schema)
+
+    def _run_batches(
+        self, connection, query, dblink, returns_long, mappings, all_objects
+    ):
+        each_batch = 500
+        batches = list(all_objects)
+        while batches:
+            batch = batches[0:each_batch]
+            batches[0:each_batch] = []
+
+            result = self._execute_reflection(
+                connection,
+                query,
+                dblink,
+                returns_long=returns_long,
+                params={"all_objects": batch},
+            )
+            if mappings:
+                yield from result.mappings()
+            else:
+                yield from result
+
+    @lru_cache()
+    def _column_query(self, owner):
+        all_cols = dictionary.all_tab_cols
+        all_comments = dictionary.all_col_comments
+        all_ids = dictionary.all_tab_identity_cols
+
+        if self.server_version_info >= (12,):
+            add_cols = (
+                all_cols.c.default_on_null,
+                sql.case(
+                    (all_ids.c.table_name.is_(None), sql.null()),
+                    else_=all_ids.c.generation_type
+                    + ","
+                    + all_ids.c.identity_options,
+                ).label("identity_options"),
+            )
+            join_identity_cols = True
+        else:
+            add_cols = (
+                sql.null().label("default_on_null"),
+                sql.null().label("identity_options"),
+            )
+            join_identity_cols = False
+
+        # NOTE: on oracle cannot create tables/views without columns and
+        # a table cannot have all column hidden:
+        # ORA-54039: table must have at least one column that is not invisible
+        # all_tab_cols returns data for tables/views/mat-views.
+        # all_tab_cols does not return recycled tables
+
+        query = (
+            select(
+                all_cols.c.table_name,
+                all_cols.c.column_name,
+                all_cols.c.data_type,
+                all_cols.c.char_length,
+                all_cols.c.data_precision,
+                all_cols.c.data_scale,
+                all_cols.c.nullable,
+                all_cols.c.data_default,
+                all_comments.c.comments,
+                all_cols.c.virtual_column,
+                *add_cols,
+            ).select_from(all_cols)
+            # NOTE: all_col_comments has a row for each column even if no
+            # comment is present, so a join could be performed, but there
+            # seems to be no difference compared to an outer join
+            .outerjoin(
+                all_comments,
+                and_(
+                    all_cols.c.table_name == all_comments.c.table_name,
+                    all_cols.c.column_name == all_comments.c.column_name,
+                    all_cols.c.owner == all_comments.c.owner,
+                ),
+            )
+        )
+        if join_identity_cols:
+            query = query.outerjoin(
+                all_ids,
+                and_(
+                    all_cols.c.table_name == all_ids.c.table_name,
+                    all_cols.c.column_name == all_ids.c.column_name,
+                    all_cols.c.owner == all_ids.c.owner,
+                ),
+            )
+
+        query = query.where(
+            all_cols.c.table_name.in_(bindparam("all_objects")),
+            all_cols.c.hidden_column == "NO",
+            all_cols.c.owner == owner,
+        ).order_by(all_cols.c.table_name, all_cols.c.column_id)
+        return query
+
+    @_handle_synonyms_decorator
+    def get_multi_columns(
+        self,
+        connection,
+        *,
+        schema,
+        filter_names,
+        scope,
+        kind,
+        dblink=None,
+        **kw,
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        owner = self.denormalize_schema_name(
+            schema or self.default_schema_name
+        )
+        query = self._column_query(owner)
+
+        if (
+            filter_names
+            and kind is ObjectKind.ANY
+            and scope is ObjectScope.ANY
+        ):
+            all_objects = [self.denormalize_name(n) for n in filter_names]
+        else:
+            all_objects = self._get_all_objects(
+                connection, schema, scope, kind, filter_names, dblink, **kw
+            )
+
+        columns = defaultdict(list)
+
+        # all_tab_cols.data_default is LONG
+        result = self._run_batches(
+            connection,
+            query,
+            dblink,
+            returns_long=True,
+            mappings=True,
+            all_objects=all_objects,
+        )
+
+        def maybe_int(value):
+            if isinstance(value, float) and value.is_integer():
+                return int(value)
+            else:
+                return value
+
+        remove_size = re.compile(r"\(\d+\)")
+
+        for row_dict in result:
+            table_name = self.normalize_name(row_dict["table_name"])
+            orig_colname = row_dict["column_name"]
+            colname = self.normalize_name(orig_colname)
+            coltype = row_dict["data_type"]
+            precision = maybe_int(row_dict["data_precision"])
+
+            if coltype == "NUMBER":
+                scale = maybe_int(row_dict["data_scale"])
+                if precision is None and scale == 0:
+                    coltype = INTEGER()
+                else:
+                    coltype = NUMBER(precision, scale)
+            elif coltype == "FLOAT":
+                # https://docs.oracle.com/cd/B14117_01/server.101/b10758/sqlqr06.htm
+                if precision == 126:
+                    # The DOUBLE PRECISION datatype is a floating-point
+                    # number with binary precision 126.
+                    coltype = DOUBLE_PRECISION()
+                elif precision == 63:
+                    # The REAL datatype is a floating-point number with a
+                    # binary precision of 63, or 18 decimal.
+                    coltype = REAL()
+                else:
+                    # non standard precision
+                    coltype = FLOAT(binary_precision=precision)
+
+            elif coltype in ("VARCHAR2", "NVARCHAR2", "CHAR", "NCHAR"):
+                char_length = maybe_int(row_dict["char_length"])
+                coltype = self.ischema_names.get(coltype)(char_length)
+            elif "WITH TIME ZONE" in coltype:
+                coltype = TIMESTAMP(timezone=True)
+            elif "WITH LOCAL TIME ZONE" in coltype:
+                coltype = TIMESTAMP(local_timezone=True)
+            else:
+                coltype = re.sub(remove_size, "", coltype)
+                try:
+                    coltype = self.ischema_names[coltype]
+                except KeyError:
+                    util.warn(
+                        "Did not recognize type '%s' of column '%s'"
+                        % (coltype, colname)
+                    )
+                    coltype = sqltypes.NULLTYPE
+
+            default = row_dict["data_default"]
+            if row_dict["virtual_column"] == "YES":
+                computed = dict(sqltext=default)
+                default = None
+            else:
+                computed = None
+
+            identity_options = row_dict["identity_options"]
+            if identity_options is not None:
+                identity = self._parse_identity_options(
+                    identity_options, row_dict["default_on_null"]
+                )
+                default = None
+            else:
+                identity = None
+
+            cdict = {
+                "name": colname,
+                "type": coltype,
+                "nullable": row_dict["nullable"] == "Y",
+                "default": default,
+                "comment": row_dict["comments"],
+            }
+            if orig_colname.lower() == orig_colname:
+                cdict["quote"] = True
+            if computed is not None:
+                cdict["computed"] = computed
+            if identity is not None:
+                cdict["identity"] = identity
+
+            columns[(schema, table_name)].append(cdict)
+
+        # NOTE: default not needed since all tables have columns
+        # default = ReflectionDefaults.columns
+        # return (
+        #     (key, value if value else default())
+        #     for key, value in columns.items()
+        # )
+        return columns.items()
+
+    def _parse_identity_options(self, identity_options, default_on_null):
+        # identity_options is a string that starts with 'ALWAYS,' or
+        # 'BY DEFAULT,' and continues with
+        # START WITH: 1, INCREMENT BY: 1, MAX_VALUE: 123, MIN_VALUE: 1,
+        # CYCLE_FLAG: N, CACHE_SIZE: 1, ORDER_FLAG: N, SCALE_FLAG: N,
+        # EXTEND_FLAG: N, SESSION_FLAG: N, KEEP_VALUE: N
+        parts = [p.strip() for p in identity_options.split(",")]
+        identity = {
+            "always": parts[0] == "ALWAYS",
+            "on_null": default_on_null == "YES",
+        }
+
+        for part in parts[1:]:
+            option, value = part.split(":")
+            value = value.strip()
+
+            if "START WITH" in option:
+                identity["start"] = int(value)
+            elif "INCREMENT BY" in option:
+                identity["increment"] = int(value)
+            elif "MAX_VALUE" in option:
+                identity["maxvalue"] = int(value)
+            elif "MIN_VALUE" in option:
+                identity["minvalue"] = int(value)
+            elif "CYCLE_FLAG" in option:
+                identity["cycle"] = value == "Y"
+            elif "CACHE_SIZE" in option:
+                identity["cache"] = int(value)
+            elif "ORDER_FLAG" in option:
+                identity["order"] = value == "Y"
+        return identity
+
+    @reflection.cache
+    def get_table_comment(self, connection, table_name, schema=None, **kw):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        data = self.get_multi_table_comment(
+            connection,
+            schema=schema,
+            filter_names=[table_name],
+            scope=ObjectScope.ANY,
+            kind=ObjectKind.ANY,
+            **kw,
+        )
+        return self._value_or_raise(data, table_name, schema)
+
+    @lru_cache()
+    def _comment_query(self, owner, scope, kind, has_filter_names):
+        # NOTE: all_tab_comments / all_mview_comments have a row for all
+        # object even if they don't have comments
+        queries = []
+        if ObjectKind.TABLE in kind or ObjectKind.VIEW in kind:
+            # all_tab_comments returns also plain views
+            tbl_view = select(
+                dictionary.all_tab_comments.c.table_name,
+                dictionary.all_tab_comments.c.comments,
+            ).where(
+                dictionary.all_tab_comments.c.owner == owner,
+                dictionary.all_tab_comments.c.table_name.not_like("BIN$%"),
+            )
+            if ObjectKind.VIEW not in kind:
+                tbl_view = tbl_view.where(
+                    dictionary.all_tab_comments.c.table_type == "TABLE"
+                )
+            elif ObjectKind.TABLE not in kind:
+                tbl_view = tbl_view.where(
+                    dictionary.all_tab_comments.c.table_type == "VIEW"
+                )
+            queries.append(tbl_view)
+        if ObjectKind.MATERIALIZED_VIEW in kind:
+            mat_view = select(
+                dictionary.all_mview_comments.c.mview_name.label("table_name"),
+                dictionary.all_mview_comments.c.comments,
+            ).where(
+                dictionary.all_mview_comments.c.owner == owner,
+                dictionary.all_mview_comments.c.mview_name.not_like("BIN$%"),
+            )
+            queries.append(mat_view)
+        if len(queries) == 1:
+            query = queries[0]
+        else:
+            union = sql.union_all(*queries).subquery("tables_and_views")
+            query = select(union.c.table_name, union.c.comments)
+
+        name_col = query.selected_columns.table_name
+
+        if scope in (ObjectScope.DEFAULT, ObjectScope.TEMPORARY):
+            temp = "Y" if scope is ObjectScope.TEMPORARY else "N"
+            # need distinct since materialized view are listed also
+            # as tables in all_objects
+            query = query.distinct().join(
+                dictionary.all_objects,
+                and_(
+                    dictionary.all_objects.c.owner == owner,
+                    dictionary.all_objects.c.object_name == name_col,
+                    dictionary.all_objects.c.temporary == temp,
+                ),
+            )
+        if has_filter_names:
+            query = query.where(name_col.in_(bindparam("filter_names")))
+        return query
+
+    @_handle_synonyms_decorator
+    def get_multi_table_comment(
+        self,
+        connection,
+        *,
+        schema,
+        filter_names,
+        scope,
+        kind,
+        dblink=None,
+        **kw,
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        owner = self.denormalize_schema_name(
+            schema or self.default_schema_name
+        )
+        has_filter_names, params = self._prepare_filter_names(filter_names)
+        query = self._comment_query(owner, scope, kind, has_filter_names)
+
+        result = self._execute_reflection(
+            connection, query, dblink, returns_long=False, params=params
+        )
+        default = ReflectionDefaults.table_comment
+        # materialized views by default seem to have a comment like
+        # "snapshot table for snapshot owner.mat_view_name"
+        ignore_mat_view = "snapshot table for snapshot "
+        return (
+            (
+                (schema, self.normalize_name(table)),
+                (
+                    {"text": comment}
+                    if comment is not None
+                    and not comment.startswith(ignore_mat_view)
+                    else default()
+                ),
+            )
+            for table, comment in result
+        )
+
+    @reflection.cache
+    def get_indexes(self, connection, table_name, schema=None, **kw):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        data = self.get_multi_indexes(
+            connection,
+            schema=schema,
+            filter_names=[table_name],
+            scope=ObjectScope.ANY,
+            kind=ObjectKind.ANY,
+            **kw,
+        )
+        return self._value_or_raise(data, table_name, schema)
+
+    @lru_cache()
+    def _index_query(self, owner):
+        return (
+            select(
+                dictionary.all_ind_columns.c.table_name,
+                dictionary.all_ind_columns.c.index_name,
+                dictionary.all_ind_columns.c.column_name,
+                dictionary.all_indexes.c.index_type,
+                dictionary.all_indexes.c.uniqueness,
+                dictionary.all_indexes.c.compression,
+                dictionary.all_indexes.c.prefix_length,
+                dictionary.all_ind_columns.c.descend,
+                dictionary.all_ind_expressions.c.column_expression,
+            )
+            .select_from(dictionary.all_ind_columns)
+            .join(
+                dictionary.all_indexes,
+                sql.and_(
+                    dictionary.all_ind_columns.c.index_name
+                    == dictionary.all_indexes.c.index_name,
+                    dictionary.all_ind_columns.c.index_owner
+                    == dictionary.all_indexes.c.owner,
+                ),
+            )
+            .outerjoin(
+                # NOTE: this adds about 20% to the query time. Using a
+                # case expression with a scalar subquery only when needed
+                # with the assumption that most indexes are not expression
+                # would be faster but oracle does not like that with
+                # LONG datatype. It errors with:
+                # ORA-00997: illegal use of LONG datatype
+                dictionary.all_ind_expressions,
+                sql.and_(
+                    dictionary.all_ind_expressions.c.index_name
+                    == dictionary.all_ind_columns.c.index_name,
+                    dictionary.all_ind_expressions.c.index_owner
+                    == dictionary.all_ind_columns.c.index_owner,
+                    dictionary.all_ind_expressions.c.column_position
+                    == dictionary.all_ind_columns.c.column_position,
+                ),
+            )
+            .where(
+                dictionary.all_indexes.c.table_owner == owner,
+                dictionary.all_indexes.c.table_name.in_(
+                    bindparam("all_objects")
+                ),
+            )
+            .order_by(
+                dictionary.all_ind_columns.c.index_name,
+                dictionary.all_ind_columns.c.column_position,
+            )
+        )
+
+    @reflection.flexi_cache(
+        ("schema", InternalTraversal.dp_string),
+        ("dblink", InternalTraversal.dp_string),
+        ("all_objects", InternalTraversal.dp_string_list),
+    )
+    def _get_indexes_rows(self, connection, schema, dblink, all_objects, **kw):
+        owner = self.denormalize_schema_name(
+            schema or self.default_schema_name
+        )
+
+        query = self._index_query(owner)
+
+        pks = {
+            row_dict["constraint_name"]
+            for row_dict in self._get_all_constraint_rows(
+                connection, schema, dblink, all_objects, **kw
+            )
+            if row_dict["constraint_type"] == "P"
+        }
+
+        # all_ind_expressions.column_expression is LONG
+        result = self._run_batches(
+            connection,
+            query,
+            dblink,
+            returns_long=True,
+            mappings=True,
+            all_objects=all_objects,
+        )
+
+        return [
+            row_dict
+            for row_dict in result
+            if row_dict["index_name"] not in pks
+        ]
+
+    @_handle_synonyms_decorator
+    def get_multi_indexes(
+        self,
+        connection,
+        *,
+        schema,
+        filter_names,
+        scope,
+        kind,
+        dblink=None,
+        **kw,
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        all_objects = self._get_all_objects(
+            connection, schema, scope, kind, filter_names, dblink, **kw
+        )
+
+        uniqueness = {"NONUNIQUE": False, "UNIQUE": True}
+        enabled = {"DISABLED": False, "ENABLED": True}
+        is_bitmap = {"BITMAP", "FUNCTION-BASED BITMAP"}
+
+        indexes = defaultdict(dict)
+
+        for row_dict in self._get_indexes_rows(
+            connection, schema, dblink, all_objects, **kw
+        ):
+            index_name = self.normalize_name(row_dict["index_name"])
+            table_name = self.normalize_name(row_dict["table_name"])
+            table_indexes = indexes[(schema, table_name)]
+
+            if index_name not in table_indexes:
+                table_indexes[index_name] = index_dict = {
+                    "name": index_name,
+                    "column_names": [],
+                    "dialect_options": {},
+                    "unique": uniqueness.get(row_dict["uniqueness"], False),
+                }
+                do = index_dict["dialect_options"]
+                if row_dict["index_type"] in is_bitmap:
+                    do["oracle_bitmap"] = True
+                if enabled.get(row_dict["compression"], False):
+                    do["oracle_compress"] = row_dict["prefix_length"]
+
+            else:
+                index_dict = table_indexes[index_name]
+
+            expr = row_dict["column_expression"]
+            if expr is not None:
+                index_dict["column_names"].append(None)
+                if "expressions" in index_dict:
+                    index_dict["expressions"].append(expr)
+                else:
+                    index_dict["expressions"] = index_dict["column_names"][:-1]
+                    index_dict["expressions"].append(expr)
+
+                if row_dict["descend"].lower() != "asc":
+                    assert row_dict["descend"].lower() == "desc"
+                    cs = index_dict.setdefault("column_sorting", {})
+                    cs[expr] = ("desc",)
+            else:
+                assert row_dict["descend"].lower() == "asc"
+                cn = self.normalize_name(row_dict["column_name"])
+                index_dict["column_names"].append(cn)
+                if "expressions" in index_dict:
+                    index_dict["expressions"].append(cn)
+
+        default = ReflectionDefaults.indexes
+
+        return (
+            (key, list(indexes[key].values()) if key in indexes else default())
+            for key in (
+                (schema, self.normalize_name(obj_name))
+                for obj_name in all_objects
+            )
+        )
+
+    @reflection.cache
+    def get_pk_constraint(self, connection, table_name, schema=None, **kw):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        data = self.get_multi_pk_constraint(
+            connection,
+            schema=schema,
+            filter_names=[table_name],
+            scope=ObjectScope.ANY,
+            kind=ObjectKind.ANY,
+            **kw,
+        )
+        return self._value_or_raise(data, table_name, schema)
+
+    @lru_cache()
+    def _constraint_query(self, owner):
+        local = dictionary.all_cons_columns.alias("local")
+        remote = dictionary.all_cons_columns.alias("remote")
+        return (
+            select(
+                dictionary.all_constraints.c.table_name,
+                dictionary.all_constraints.c.constraint_type,
+                dictionary.all_constraints.c.constraint_name,
+                local.c.column_name.label("local_column"),
+                remote.c.table_name.label("remote_table"),
+                remote.c.column_name.label("remote_column"),
+                remote.c.owner.label("remote_owner"),
+                dictionary.all_constraints.c.search_condition,
+                dictionary.all_constraints.c.delete_rule,
+            )
+            .select_from(dictionary.all_constraints)
+            .join(
+                local,
+                and_(
+                    local.c.owner == dictionary.all_constraints.c.owner,
+                    dictionary.all_constraints.c.constraint_name
+                    == local.c.constraint_name,
+                ),
+            )
+            .outerjoin(
+                remote,
+                and_(
+                    dictionary.all_constraints.c.r_owner == remote.c.owner,
+                    dictionary.all_constraints.c.r_constraint_name
+                    == remote.c.constraint_name,
+                    or_(
+                        remote.c.position.is_(sql.null()),
+                        local.c.position == remote.c.position,
+                    ),
+                ),
+            )
+            .where(
+                dictionary.all_constraints.c.owner == owner,
+                dictionary.all_constraints.c.table_name.in_(
+                    bindparam("all_objects")
+                ),
+                dictionary.all_constraints.c.constraint_type.in_(
+                    ("R", "P", "U", "C")
+                ),
+            )
+            .order_by(
+                dictionary.all_constraints.c.constraint_name, local.c.position
+            )
+        )
+
+    @reflection.flexi_cache(
+        ("schema", InternalTraversal.dp_string),
+        ("dblink", InternalTraversal.dp_string),
+        ("all_objects", InternalTraversal.dp_string_list),
+    )
+    def _get_all_constraint_rows(
+        self, connection, schema, dblink, all_objects, **kw
+    ):
+        owner = self.denormalize_schema_name(
+            schema or self.default_schema_name
+        )
+        query = self._constraint_query(owner)
+
+        # since the result is cached a list must be created
+        values = list(
+            self._run_batches(
+                connection,
+                query,
+                dblink,
+                returns_long=False,
+                mappings=True,
+                all_objects=all_objects,
+            )
+        )
+        return values
+
+    @_handle_synonyms_decorator
+    def get_multi_pk_constraint(
+        self,
+        connection,
+        *,
+        scope,
+        schema,
+        filter_names,
+        kind,
+        dblink=None,
+        **kw,
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        all_objects = self._get_all_objects(
+            connection, schema, scope, kind, filter_names, dblink, **kw
+        )
+
+        primary_keys = defaultdict(dict)
+        default = ReflectionDefaults.pk_constraint
+
+        for row_dict in self._get_all_constraint_rows(
+            connection, schema, dblink, all_objects, **kw
+        ):
+            if row_dict["constraint_type"] != "P":
+                continue
+            table_name = self.normalize_name(row_dict["table_name"])
+            constraint_name = self.normalize_name(row_dict["constraint_name"])
+            column_name = self.normalize_name(row_dict["local_column"])
+
+            table_pk = primary_keys[(schema, table_name)]
+            if not table_pk:
+                table_pk["name"] = constraint_name
+                table_pk["constrained_columns"] = [column_name]
+            else:
+                table_pk["constrained_columns"].append(column_name)
+
+        return (
+            (key, primary_keys[key] if key in primary_keys else default())
+            for key in (
+                (schema, self.normalize_name(obj_name))
+                for obj_name in all_objects
+            )
+        )
+
+    @reflection.cache
+    def get_foreign_keys(
+        self,
+        connection,
+        table_name,
+        schema=None,
+        **kw,
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        data = self.get_multi_foreign_keys(
+            connection,
+            schema=schema,
+            filter_names=[table_name],
+            scope=ObjectScope.ANY,
+            kind=ObjectKind.ANY,
+            **kw,
+        )
+        return self._value_or_raise(data, table_name, schema)
+
+    @_handle_synonyms_decorator
+    def get_multi_foreign_keys(
+        self,
+        connection,
+        *,
+        scope,
+        schema,
+        filter_names,
+        kind,
+        dblink=None,
+        **kw,
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        all_objects = self._get_all_objects(
+            connection, schema, scope, kind, filter_names, dblink, **kw
+        )
+
+        resolve_synonyms = kw.get("oracle_resolve_synonyms", False)
+
+        owner = self.denormalize_schema_name(
+            schema or self.default_schema_name
+        )
+
+        all_remote_owners = set()
+        fkeys = defaultdict(dict)
+
+        for row_dict in self._get_all_constraint_rows(
+            connection, schema, dblink, all_objects, **kw
+        ):
+            if row_dict["constraint_type"] != "R":
+                continue
+
+            table_name = self.normalize_name(row_dict["table_name"])
+            constraint_name = self.normalize_name(row_dict["constraint_name"])
+            table_fkey = fkeys[(schema, table_name)]
+
+            assert constraint_name is not None
+
+            local_column = self.normalize_name(row_dict["local_column"])
+            remote_table = self.normalize_name(row_dict["remote_table"])
+            remote_column = self.normalize_name(row_dict["remote_column"])
+            remote_owner_orig = row_dict["remote_owner"]
+            remote_owner = self.normalize_name(remote_owner_orig)
+            if remote_owner_orig is not None:
+                all_remote_owners.add(remote_owner_orig)
+
+            if remote_table is None:
+                # ticket 363
+                if dblink and not dblink.startswith("@"):
+                    dblink = f"@{dblink}"
+                util.warn(
+                    "Got 'None' querying 'table_name' from "
+                    f"all_cons_columns{dblink or ''} - does the user have "
+                    "proper rights to the table?"
+                )
+                continue
+
+            if constraint_name not in table_fkey:
+                table_fkey[constraint_name] = fkey = {
+                    "name": constraint_name,
+                    "constrained_columns": [],
+                    "referred_schema": None,
+                    "referred_table": remote_table,
+                    "referred_columns": [],
+                    "options": {},
+                }
+
+                if resolve_synonyms:
+                    # will be removed below
+                    fkey["_ref_schema"] = remote_owner
+
+                if schema is not None or remote_owner_orig != owner:
+                    fkey["referred_schema"] = remote_owner
+
+                delete_rule = row_dict["delete_rule"]
+                if delete_rule != "NO ACTION":
+                    fkey["options"]["ondelete"] = delete_rule
+
+            else:
+                fkey = table_fkey[constraint_name]
+
+            fkey["constrained_columns"].append(local_column)
+            fkey["referred_columns"].append(remote_column)
+
+        if resolve_synonyms and all_remote_owners:
+            query = select(
+                dictionary.all_synonyms.c.owner,
+                dictionary.all_synonyms.c.table_name,
+                dictionary.all_synonyms.c.table_owner,
+                dictionary.all_synonyms.c.synonym_name,
+            ).where(dictionary.all_synonyms.c.owner.in_(all_remote_owners))
+
+            result = self._execute_reflection(
+                connection, query, dblink, returns_long=False
+            ).mappings()
+
+            remote_owners_lut = {}
+            for row in result:
+                synonym_owner = self.normalize_name(row["owner"])
+                table_name = self.normalize_name(row["table_name"])
+
+                remote_owners_lut[(synonym_owner, table_name)] = (
+                    row["table_owner"],
+                    row["synonym_name"],
+                )
+
+            empty = (None, None)
+            for table_fkeys in fkeys.values():
+                for table_fkey in table_fkeys.values():
+                    key = (
+                        table_fkey.pop("_ref_schema"),
+                        table_fkey["referred_table"],
+                    )
+                    remote_owner, syn_name = remote_owners_lut.get(key, empty)
+                    if syn_name:
+                        sn = self.normalize_name(syn_name)
+                        table_fkey["referred_table"] = sn
+                        if schema is not None or remote_owner != owner:
+                            ro = self.normalize_name(remote_owner)
+                            table_fkey["referred_schema"] = ro
+                        else:
+                            table_fkey["referred_schema"] = None
+        default = ReflectionDefaults.foreign_keys
+
+        return (
+            (key, list(fkeys[key].values()) if key in fkeys else default())
+            for key in (
+                (schema, self.normalize_name(obj_name))
+                for obj_name in all_objects
+            )
+        )
+
+    @reflection.cache
+    def get_unique_constraints(
+        self, connection, table_name, schema=None, **kw
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        data = self.get_multi_unique_constraints(
+            connection,
+            schema=schema,
+            filter_names=[table_name],
+            scope=ObjectScope.ANY,
+            kind=ObjectKind.ANY,
+            **kw,
+        )
+        return self._value_or_raise(data, table_name, schema)
+
+    @_handle_synonyms_decorator
+    def get_multi_unique_constraints(
+        self,
+        connection,
+        *,
+        scope,
+        schema,
+        filter_names,
+        kind,
+        dblink=None,
+        **kw,
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        all_objects = self._get_all_objects(
+            connection, schema, scope, kind, filter_names, dblink, **kw
+        )
+
+        unique_cons = defaultdict(dict)
+
+        index_names = {
+            row_dict["index_name"]
+            for row_dict in self._get_indexes_rows(
+                connection, schema, dblink, all_objects, **kw
+            )
+        }
+
+        for row_dict in self._get_all_constraint_rows(
+            connection, schema, dblink, all_objects, **kw
+        ):
+            if row_dict["constraint_type"] != "U":
+                continue
+            table_name = self.normalize_name(row_dict["table_name"])
+            constraint_name_orig = row_dict["constraint_name"]
+            constraint_name = self.normalize_name(constraint_name_orig)
+            column_name = self.normalize_name(row_dict["local_column"])
+            table_uc = unique_cons[(schema, table_name)]
+
+            assert constraint_name is not None
+
+            if constraint_name not in table_uc:
+                table_uc[constraint_name] = uc = {
+                    "name": constraint_name,
+                    "column_names": [],
+                    "duplicates_index": (
+                        constraint_name
+                        if constraint_name_orig in index_names
+                        else None
+                    ),
+                }
+            else:
+                uc = table_uc[constraint_name]
+
+            uc["column_names"].append(column_name)
+
+        default = ReflectionDefaults.unique_constraints
+
+        return (
+            (
+                key,
+                (
+                    list(unique_cons[key].values())
+                    if key in unique_cons
+                    else default()
+                ),
+            )
+            for key in (
+                (schema, self.normalize_name(obj_name))
+                for obj_name in all_objects
+            )
+        )
+
+    @reflection.cache
+    def get_view_definition(
+        self,
+        connection,
+        view_name,
+        schema=None,
+        dblink=None,
+        **kw,
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        if kw.get("oracle_resolve_synonyms", False):
+            synonyms = self._get_synonyms(
+                connection, schema, filter_names=[view_name], dblink=dblink
+            )
+            if synonyms:
+                assert len(synonyms) == 1
+                row_dict = synonyms[0]
+                dblink = self.normalize_name(row_dict["db_link"])
+                schema = row_dict["table_owner"]
+                view_name = row_dict["table_name"]
+
+        name = self.denormalize_name(view_name)
+        owner = self.denormalize_schema_name(
+            schema or self.default_schema_name
+        )
+        query = (
+            select(dictionary.all_views.c.text)
+            .where(
+                dictionary.all_views.c.view_name == name,
+                dictionary.all_views.c.owner == owner,
+            )
+            .union_all(
+                select(dictionary.all_mviews.c.query).where(
+                    dictionary.all_mviews.c.mview_name == name,
+                    dictionary.all_mviews.c.owner == owner,
+                )
+            )
+        )
+
+        rp = self._execute_reflection(
+            connection, query, dblink, returns_long=False
+        ).scalar()
+        if rp is None:
+            raise exc.NoSuchTableError(
+                f"{schema}.{view_name}" if schema else view_name
+            )
+        else:
+            return rp
+
+    @reflection.cache
+    def get_check_constraints(
+        self, connection, table_name, schema=None, include_all=False, **kw
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        data = self.get_multi_check_constraints(
+            connection,
+            schema=schema,
+            filter_names=[table_name],
+            scope=ObjectScope.ANY,
+            include_all=include_all,
+            kind=ObjectKind.ANY,
+            **kw,
+        )
+        return self._value_or_raise(data, table_name, schema)
+
+    @_handle_synonyms_decorator
+    def get_multi_check_constraints(
+        self,
+        connection,
+        *,
+        schema,
+        filter_names,
+        dblink=None,
+        scope,
+        kind,
+        include_all=False,
+        **kw,
+    ):
+        """Supported kw arguments are: ``dblink`` to reflect via a db link;
+        ``oracle_resolve_synonyms`` to resolve names to synonyms
+        """
+        all_objects = self._get_all_objects(
+            connection, schema, scope, kind, filter_names, dblink, **kw
+        )
+
+        not_null = re.compile(r"..+?. IS NOT NULL$")
+
+        check_constraints = defaultdict(list)
+
+        for row_dict in self._get_all_constraint_rows(
+            connection, schema, dblink, all_objects, **kw
+        ):
+            if row_dict["constraint_type"] != "C":
+                continue
+            table_name = self.normalize_name(row_dict["table_name"])
+            constraint_name = self.normalize_name(row_dict["constraint_name"])
+            search_condition = row_dict["search_condition"]
+
+            table_checks = check_constraints[(schema, table_name)]
+            if constraint_name is not None and (
+                include_all or not not_null.match(search_condition)
+            ):
+                table_checks.append(
+                    {"name": constraint_name, "sqltext": search_condition}
+                )
+
+        default = ReflectionDefaults.check_constraints
+
+        return (
+            (
+                key,
+                (
+                    check_constraints[key]
+                    if key in check_constraints
+                    else default()
+                ),
+            )
+            for key in (
+                (schema, self.normalize_name(obj_name))
+                for obj_name in all_objects
+            )
+        )
+
+    def _list_dblinks(self, connection, dblink=None):
+        query = select(dictionary.all_db_links.c.db_link)
+        links = self._execute_reflection(
+            connection, query, dblink, returns_long=False
+        ).scalars()
+        return [self.normalize_name(link) for link in links]
+
+
+class _OuterJoinColumn(sql.ClauseElement):
+    __visit_name__ = "outer_join_column"
+
+    def __init__(self, column):
+        self.column = column