about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/alembic/ddl/postgresql.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/ddl/postgresql.py')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/ddl/postgresql.py850
1 files changed, 850 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/ddl/postgresql.py b/.venv/lib/python3.12/site-packages/alembic/ddl/postgresql.py
new file mode 100644
index 00000000..7cd8d35b
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/ddl/postgresql.py
@@ -0,0 +1,850 @@
+# mypy: allow-untyped-defs, allow-incomplete-defs, allow-untyped-calls
+# mypy: no-warn-return-any, allow-any-generics
+
+from __future__ import annotations
+
+import logging
+import re
+from typing import Any
+from typing import cast
+from typing import Dict
+from typing import List
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import Union
+
+from sqlalchemy import Column
+from sqlalchemy import Float
+from sqlalchemy import Identity
+from sqlalchemy import literal_column
+from sqlalchemy import Numeric
+from sqlalchemy import select
+from sqlalchemy import text
+from sqlalchemy import types as sqltypes
+from sqlalchemy.dialects.postgresql import BIGINT
+from sqlalchemy.dialects.postgresql import ExcludeConstraint
+from sqlalchemy.dialects.postgresql import INTEGER
+from sqlalchemy.schema import CreateIndex
+from sqlalchemy.sql.elements import ColumnClause
+from sqlalchemy.sql.elements import TextClause
+from sqlalchemy.sql.functions import FunctionElement
+from sqlalchemy.types import NULLTYPE
+
+from .base import alter_column
+from .base import alter_table
+from .base import AlterColumn
+from .base import ColumnComment
+from .base import format_column_name
+from .base import format_table_name
+from .base import format_type
+from .base import IdentityColumnDefault
+from .base import RenameTable
+from .impl import ComparisonResult
+from .impl import DefaultImpl
+from .. import util
+from ..autogenerate import render
+from ..operations import ops
+from ..operations import schemaobj
+from ..operations.base import BatchOperations
+from ..operations.base import Operations
+from ..util import sqla_compat
+from ..util.sqla_compat import compiles
+
+if TYPE_CHECKING:
+    from typing import Literal
+
+    from sqlalchemy import Index
+    from sqlalchemy import UniqueConstraint
+    from sqlalchemy.dialects.postgresql.array import ARRAY
+    from sqlalchemy.dialects.postgresql.base import PGDDLCompiler
+    from sqlalchemy.dialects.postgresql.hstore import HSTORE
+    from sqlalchemy.dialects.postgresql.json import JSON
+    from sqlalchemy.dialects.postgresql.json import JSONB
+    from sqlalchemy.sql.elements import ClauseElement
+    from sqlalchemy.sql.elements import ColumnElement
+    from sqlalchemy.sql.elements import quoted_name
+    from sqlalchemy.sql.schema import MetaData
+    from sqlalchemy.sql.schema import Table
+    from sqlalchemy.sql.type_api import TypeEngine
+
+    from .base import _ServerDefault
+    from ..autogenerate.api import AutogenContext
+    from ..autogenerate.render import _f_name
+    from ..runtime.migration import MigrationContext
+
+
+log = logging.getLogger(__name__)
+
+
+class PostgresqlImpl(DefaultImpl):
+    __dialect__ = "postgresql"
+    transactional_ddl = True
+    type_synonyms = DefaultImpl.type_synonyms + (
+        {"FLOAT", "DOUBLE PRECISION"},
+    )
+
+    def create_index(self, index: Index, **kw: Any) -> None:
+        # this likely defaults to None if not present, so get()
+        # should normally not return the default value.  being
+        # defensive in any case
+        postgresql_include = index.kwargs.get("postgresql_include", None) or ()
+        for col in postgresql_include:
+            if col not in index.table.c:  # type: ignore[union-attr]
+                index.table.append_column(  # type: ignore[union-attr]
+                    Column(col, sqltypes.NullType)
+                )
+        self._exec(CreateIndex(index, **kw))
+
+    def prep_table_for_batch(self, batch_impl, table):
+        for constraint in table.constraints:
+            if (
+                constraint.name is not None
+                and constraint.name in batch_impl.named_constraints
+            ):
+                self.drop_constraint(constraint)
+
+    def compare_server_default(
+        self,
+        inspector_column,
+        metadata_column,
+        rendered_metadata_default,
+        rendered_inspector_default,
+    ):
+        # don't do defaults for SERIAL columns
+        if (
+            metadata_column.primary_key
+            and metadata_column is metadata_column.table._autoincrement_column
+        ):
+            return False
+
+        conn_col_default = rendered_inspector_default
+
+        defaults_equal = conn_col_default == rendered_metadata_default
+        if defaults_equal:
+            return False
+
+        if None in (
+            conn_col_default,
+            rendered_metadata_default,
+            metadata_column.server_default,
+        ):
+            return not defaults_equal
+
+        metadata_default = metadata_column.server_default.arg
+
+        if isinstance(metadata_default, str):
+            if not isinstance(inspector_column.type, (Numeric, Float)):
+                metadata_default = re.sub(r"^'|'$", "", metadata_default)
+                metadata_default = f"'{metadata_default}'"
+
+            metadata_default = literal_column(metadata_default)
+
+        # run a real compare against the server
+        conn = self.connection
+        assert conn is not None
+        return not conn.scalar(
+            select(literal_column(conn_col_default) == metadata_default)
+        )
+
+    def alter_column(  # type:ignore[override]
+        self,
+        table_name: str,
+        column_name: str,
+        nullable: Optional[bool] = None,
+        server_default: Union[_ServerDefault, Literal[False]] = False,
+        name: Optional[str] = None,
+        type_: Optional[TypeEngine] = None,
+        schema: Optional[str] = None,
+        autoincrement: Optional[bool] = None,
+        existing_type: Optional[TypeEngine] = None,
+        existing_server_default: Optional[_ServerDefault] = None,
+        existing_nullable: Optional[bool] = None,
+        existing_autoincrement: Optional[bool] = None,
+        **kw: Any,
+    ) -> None:
+        using = kw.pop("postgresql_using", None)
+
+        if using is not None and type_ is None:
+            raise util.CommandError(
+                "postgresql_using must be used with the type_ parameter"
+            )
+
+        if type_ is not None:
+            self._exec(
+                PostgresqlColumnType(
+                    table_name,
+                    column_name,
+                    type_,
+                    schema=schema,
+                    using=using,
+                    existing_type=existing_type,
+                    existing_server_default=existing_server_default,
+                    existing_nullable=existing_nullable,
+                )
+            )
+
+        super().alter_column(
+            table_name,
+            column_name,
+            nullable=nullable,
+            server_default=server_default,
+            name=name,
+            schema=schema,
+            autoincrement=autoincrement,
+            existing_type=existing_type,
+            existing_server_default=existing_server_default,
+            existing_nullable=existing_nullable,
+            existing_autoincrement=existing_autoincrement,
+            **kw,
+        )
+
+    def autogen_column_reflect(self, inspector, table, column_info):
+        if column_info.get("default") and isinstance(
+            column_info["type"], (INTEGER, BIGINT)
+        ):
+            seq_match = re.match(
+                r"nextval\('(.+?)'::regclass\)", column_info["default"]
+            )
+            if seq_match:
+                info = sqla_compat._exec_on_inspector(
+                    inspector,
+                    text(
+                        "select c.relname, a.attname "
+                        "from pg_class as c join "
+                        "pg_depend d on d.objid=c.oid and "
+                        "d.classid='pg_class'::regclass and "
+                        "d.refclassid='pg_class'::regclass "
+                        "join pg_class t on t.oid=d.refobjid "
+                        "join pg_attribute a on a.attrelid=t.oid and "
+                        "a.attnum=d.refobjsubid "
+                        "where c.relkind='S' and "
+                        "c.oid=cast(:seqname as regclass)"
+                    ),
+                    seqname=seq_match.group(1),
+                ).first()
+                if info:
+                    seqname, colname = info
+                    if colname == column_info["name"]:
+                        log.info(
+                            "Detected sequence named '%s' as "
+                            "owned by integer column '%s(%s)', "
+                            "assuming SERIAL and omitting",
+                            seqname,
+                            table.name,
+                            colname,
+                        )
+                        # sequence, and the owner is this column,
+                        # its a SERIAL - whack it!
+                        del column_info["default"]
+
+    def correct_for_autogen_constraints(
+        self,
+        conn_unique_constraints,
+        conn_indexes,
+        metadata_unique_constraints,
+        metadata_indexes,
+    ):
+        doubled_constraints = {
+            index
+            for index in conn_indexes
+            if index.info.get("duplicates_constraint")
+        }
+
+        for ix in doubled_constraints:
+            conn_indexes.remove(ix)
+
+        if not sqla_compat.sqla_2:
+            self._skip_functional_indexes(metadata_indexes, conn_indexes)
+
+    # pg behavior regarding modifiers
+    # | # | compiled sql     | returned sql     | regexp. group is removed |
+    # | - | ---------------- | -----------------| ------------------------ |
+    # | 1 | nulls first      | nulls first      | -                        |
+    # | 2 | nulls last       |                  | (?<! desc)( nulls last)$ |
+    # | 3 | asc              |                  | ( asc)$                  |
+    # | 4 | asc nulls first  | nulls first      | ( asc) nulls first$      |
+    # | 5 | asc nulls last   |                  | ( asc nulls last)$       |
+    # | 6 | desc             | desc             | -                        |
+    # | 7 | desc nulls first | desc             | desc( nulls first)$      |
+    # | 8 | desc nulls last  | desc nulls last  | -                        |
+    _default_modifiers_re = (  # order of case 2 and 5 matters
+        re.compile("( asc nulls last)$"),  # case 5
+        re.compile("(?<! desc)( nulls last)$"),  # case 2
+        re.compile("( asc)$"),  # case 3
+        re.compile("( asc) nulls first$"),  # case 4
+        re.compile(" desc( nulls first)$"),  # case 7
+    )
+
+    def _cleanup_index_expr(self, index: Index, expr: str) -> str:
+        expr = expr.lower().replace('"', "").replace("'", "")
+        if index.table is not None:
+            # should not be needed, since include_table=False is in compile
+            expr = expr.replace(f"{index.table.name.lower()}.", "")
+
+        if "::" in expr:
+            # strip :: cast. types can have spaces in them
+            expr = re.sub(r"(::[\w ]+\w)", "", expr)
+
+        while expr and expr[0] == "(" and expr[-1] == ")":
+            expr = expr[1:-1]
+
+        # NOTE: when parsing the connection expression this cleanup could
+        # be skipped
+        for rs in self._default_modifiers_re:
+            if match := rs.search(expr):
+                start, end = match.span(1)
+                expr = expr[:start] + expr[end:]
+                break
+
+        while expr and expr[0] == "(" and expr[-1] == ")":
+            expr = expr[1:-1]
+
+        # strip casts
+        cast_re = re.compile(r"cast\s*\(")
+        if cast_re.match(expr):
+            expr = cast_re.sub("", expr)
+            # remove the as type
+            expr = re.sub(r"as\s+[^)]+\)", "", expr)
+        # remove spaces
+        expr = expr.replace(" ", "")
+        return expr
+
+    def _dialect_options(
+        self, item: Union[Index, UniqueConstraint]
+    ) -> Tuple[Any, ...]:
+        # only the positive case is returned by sqlalchemy reflection so
+        # None and False are threated the same
+        if item.dialect_kwargs.get("postgresql_nulls_not_distinct"):
+            return ("nulls_not_distinct",)
+        return ()
+
+    def compare_indexes(
+        self,
+        metadata_index: Index,
+        reflected_index: Index,
+    ) -> ComparisonResult:
+        msg = []
+        unique_msg = self._compare_index_unique(
+            metadata_index, reflected_index
+        )
+        if unique_msg:
+            msg.append(unique_msg)
+        m_exprs = metadata_index.expressions
+        r_exprs = reflected_index.expressions
+        if len(m_exprs) != len(r_exprs):
+            msg.append(f"expression number {len(r_exprs)} to {len(m_exprs)}")
+        if msg:
+            # no point going further, return early
+            return ComparisonResult.Different(msg)
+        skip = []
+        for pos, (m_e, r_e) in enumerate(zip(m_exprs, r_exprs), 1):
+            m_compile = self._compile_element(m_e)
+            m_text = self._cleanup_index_expr(metadata_index, m_compile)
+            # print(f"META ORIG: {m_compile!r} CLEANUP: {m_text!r}")
+            r_compile = self._compile_element(r_e)
+            r_text = self._cleanup_index_expr(metadata_index, r_compile)
+            # print(f"CONN ORIG: {r_compile!r} CLEANUP: {r_text!r}")
+            if m_text == r_text:
+                continue  # expressions these are equal
+            elif m_compile.strip().endswith("_ops") and (
+                " " in m_compile or ")" in m_compile  # is an expression
+            ):
+                skip.append(
+                    f"expression #{pos} {m_compile!r} detected "
+                    "as including operator clause."
+                )
+                util.warn(
+                    f"Expression #{pos} {m_compile!r} in index "
+                    f"{reflected_index.name!r} detected to include "
+                    "an operator clause. Expression compare cannot proceed. "
+                    "Please move the operator clause to the "
+                    "``postgresql_ops`` dict to enable proper compare "
+                    "of the index expressions: "
+                    "https://docs.sqlalchemy.org/en/latest/dialects/postgresql.html#operator-classes",  # noqa: E501
+                )
+            else:
+                msg.append(f"expression #{pos} {r_compile!r} to {m_compile!r}")
+
+        m_options = self._dialect_options(metadata_index)
+        r_options = self._dialect_options(reflected_index)
+        if m_options != r_options:
+            msg.extend(f"options {r_options} to {m_options}")
+
+        if msg:
+            return ComparisonResult.Different(msg)
+        elif skip:
+            # if there are other changes detected don't skip the index
+            return ComparisonResult.Skip(skip)
+        else:
+            return ComparisonResult.Equal()
+
+    def compare_unique_constraint(
+        self,
+        metadata_constraint: UniqueConstraint,
+        reflected_constraint: UniqueConstraint,
+    ) -> ComparisonResult:
+        metadata_tup = self._create_metadata_constraint_sig(
+            metadata_constraint
+        )
+        reflected_tup = self._create_reflected_constraint_sig(
+            reflected_constraint
+        )
+
+        meta_sig = metadata_tup.unnamed
+        conn_sig = reflected_tup.unnamed
+        if conn_sig != meta_sig:
+            return ComparisonResult.Different(
+                f"expression {conn_sig} to {meta_sig}"
+            )
+
+        metadata_do = self._dialect_options(metadata_tup.const)
+        conn_do = self._dialect_options(reflected_tup.const)
+        if metadata_do != conn_do:
+            return ComparisonResult.Different(
+                f"expression {conn_do} to {metadata_do}"
+            )
+
+        return ComparisonResult.Equal()
+
+    def adjust_reflected_dialect_options(
+        self, reflected_options: Dict[str, Any], kind: str
+    ) -> Dict[str, Any]:
+        options: Dict[str, Any]
+        options = reflected_options.get("dialect_options", {}).copy()
+        if not options.get("postgresql_include"):
+            options.pop("postgresql_include", None)
+        return options
+
+    def _compile_element(self, element: Union[ClauseElement, str]) -> str:
+        if isinstance(element, str):
+            return element
+        return element.compile(
+            dialect=self.dialect,
+            compile_kwargs={"literal_binds": True, "include_table": False},
+        ).string
+
+    def render_ddl_sql_expr(
+        self,
+        expr: ClauseElement,
+        is_server_default: bool = False,
+        is_index: bool = False,
+        **kw: Any,
+    ) -> str:
+        """Render a SQL expression that is typically a server default,
+        index expression, etc.
+
+        """
+
+        # apply self_group to index expressions;
+        # see https://github.com/sqlalchemy/sqlalchemy/blob/
+        # 82fa95cfce070fab401d020c6e6e4a6a96cc2578/
+        # lib/sqlalchemy/dialects/postgresql/base.py#L2261
+        if is_index and not isinstance(expr, ColumnClause):
+            expr = expr.self_group()
+
+        return super().render_ddl_sql_expr(
+            expr, is_server_default=is_server_default, is_index=is_index, **kw
+        )
+
+    def render_type(
+        self, type_: TypeEngine, autogen_context: AutogenContext
+    ) -> Union[str, Literal[False]]:
+        mod = type(type_).__module__
+        if not mod.startswith("sqlalchemy.dialects.postgresql"):
+            return False
+
+        if hasattr(self, "_render_%s_type" % type_.__visit_name__):
+            meth = getattr(self, "_render_%s_type" % type_.__visit_name__)
+            return meth(type_, autogen_context)
+
+        return False
+
+    def _render_HSTORE_type(
+        self, type_: HSTORE, autogen_context: AutogenContext
+    ) -> str:
+        return cast(
+            str,
+            render._render_type_w_subtype(
+                type_, autogen_context, "text_type", r"(.+?\(.*text_type=)"
+            ),
+        )
+
+    def _render_ARRAY_type(
+        self, type_: ARRAY, autogen_context: AutogenContext
+    ) -> str:
+        return cast(
+            str,
+            render._render_type_w_subtype(
+                type_, autogen_context, "item_type", r"(.+?\()"
+            ),
+        )
+
+    def _render_JSON_type(
+        self, type_: JSON, autogen_context: AutogenContext
+    ) -> str:
+        return cast(
+            str,
+            render._render_type_w_subtype(
+                type_, autogen_context, "astext_type", r"(.+?\(.*astext_type=)"
+            ),
+        )
+
+    def _render_JSONB_type(
+        self, type_: JSONB, autogen_context: AutogenContext
+    ) -> str:
+        return cast(
+            str,
+            render._render_type_w_subtype(
+                type_, autogen_context, "astext_type", r"(.+?\(.*astext_type=)"
+            ),
+        )
+
+
+class PostgresqlColumnType(AlterColumn):
+    def __init__(
+        self, name: str, column_name: str, type_: TypeEngine, **kw
+    ) -> None:
+        using = kw.pop("using", None)
+        super().__init__(name, column_name, **kw)
+        self.type_ = sqltypes.to_instance(type_)
+        self.using = using
+
+
+@compiles(RenameTable, "postgresql")
+def visit_rename_table(
+    element: RenameTable, compiler: PGDDLCompiler, **kw
+) -> str:
+    return "%s RENAME TO %s" % (
+        alter_table(compiler, element.table_name, element.schema),
+        format_table_name(compiler, element.new_table_name, None),
+    )
+
+
+@compiles(PostgresqlColumnType, "postgresql")
+def visit_column_type(
+    element: PostgresqlColumnType, compiler: PGDDLCompiler, **kw
+) -> str:
+    return "%s %s %s %s" % (
+        alter_table(compiler, element.table_name, element.schema),
+        alter_column(compiler, element.column_name),
+        "TYPE %s" % format_type(compiler, element.type_),
+        "USING %s" % element.using if element.using else "",
+    )
+
+
+@compiles(ColumnComment, "postgresql")
+def visit_column_comment(
+    element: ColumnComment, compiler: PGDDLCompiler, **kw
+) -> str:
+    ddl = "COMMENT ON COLUMN {table_name}.{column_name} IS {comment}"
+    comment = (
+        compiler.sql_compiler.render_literal_value(
+            element.comment, sqltypes.String()
+        )
+        if element.comment is not None
+        else "NULL"
+    )
+
+    return ddl.format(
+        table_name=format_table_name(
+            compiler, element.table_name, element.schema
+        ),
+        column_name=format_column_name(compiler, element.column_name),
+        comment=comment,
+    )
+
+
+@compiles(IdentityColumnDefault, "postgresql")
+def visit_identity_column(
+    element: IdentityColumnDefault, compiler: PGDDLCompiler, **kw
+):
+    text = "%s %s " % (
+        alter_table(compiler, element.table_name, element.schema),
+        alter_column(compiler, element.column_name),
+    )
+    if element.default is None:
+        # drop identity
+        text += "DROP IDENTITY"
+        return text
+    elif element.existing_server_default is None:
+        # add identity options
+        text += "ADD "
+        text += compiler.visit_identity_column(element.default)
+        return text
+    else:
+        # alter identity
+        diff, _, _ = element.impl._compare_identity_default(
+            element.default, element.existing_server_default
+        )
+        identity = element.default
+        for attr in sorted(diff):
+            if attr == "always":
+                text += "SET GENERATED %s " % (
+                    "ALWAYS" if identity.always else "BY DEFAULT"
+                )
+            else:
+                text += "SET %s " % compiler.get_identity_options(
+                    Identity(**{attr: getattr(identity, attr)})
+                )
+        return text
+
+
+@Operations.register_operation("create_exclude_constraint")
+@BatchOperations.register_operation(
+    "create_exclude_constraint", "batch_create_exclude_constraint"
+)
+@ops.AddConstraintOp.register_add_constraint("exclude_constraint")
+class CreateExcludeConstraintOp(ops.AddConstraintOp):
+    """Represent a create exclude constraint operation."""
+
+    constraint_type = "exclude"
+
+    def __init__(
+        self,
+        constraint_name: sqla_compat._ConstraintName,
+        table_name: Union[str, quoted_name],
+        elements: Union[
+            Sequence[Tuple[str, str]],
+            Sequence[Tuple[ColumnClause[Any], str]],
+        ],
+        where: Optional[Union[ColumnElement[bool], str]] = None,
+        schema: Optional[str] = None,
+        _orig_constraint: Optional[ExcludeConstraint] = None,
+        **kw,
+    ) -> None:
+        self.constraint_name = constraint_name
+        self.table_name = table_name
+        self.elements = elements
+        self.where = where
+        self.schema = schema
+        self._orig_constraint = _orig_constraint
+        self.kw = kw
+
+    @classmethod
+    def from_constraint(  # type:ignore[override]
+        cls, constraint: ExcludeConstraint
+    ) -> CreateExcludeConstraintOp:
+        constraint_table = sqla_compat._table_for_constraint(constraint)
+        return cls(
+            constraint.name,
+            constraint_table.name,
+            [  # type: ignore
+                (expr, op) for expr, name, op in constraint._render_exprs
+            ],
+            where=cast("ColumnElement[bool] | None", constraint.where),
+            schema=constraint_table.schema,
+            _orig_constraint=constraint,
+            deferrable=constraint.deferrable,
+            initially=constraint.initially,
+            using=constraint.using,
+        )
+
+    def to_constraint(
+        self, migration_context: Optional[MigrationContext] = None
+    ) -> ExcludeConstraint:
+        if self._orig_constraint is not None:
+            return self._orig_constraint
+        schema_obj = schemaobj.SchemaObjects(migration_context)
+        t = schema_obj.table(self.table_name, schema=self.schema)
+        excl = ExcludeConstraint(
+            *self.elements,
+            name=self.constraint_name,
+            where=self.where,
+            **self.kw,
+        )
+        for (
+            expr,
+            name,
+            oper,
+        ) in excl._render_exprs:
+            t.append_column(Column(name, NULLTYPE))
+        t.append_constraint(excl)
+        return excl
+
+    @classmethod
+    def create_exclude_constraint(
+        cls,
+        operations: Operations,
+        constraint_name: str,
+        table_name: str,
+        *elements: Any,
+        **kw: Any,
+    ) -> Optional[Table]:
+        """Issue an alter to create an EXCLUDE constraint using the
+        current migration context.
+
+        .. note::  This method is Postgresql specific, and additionally
+           requires at least SQLAlchemy 1.0.
+
+        e.g.::
+
+            from alembic import op
+
+            op.create_exclude_constraint(
+                "user_excl",
+                "user",
+                ("period", "&&"),
+                ("group", "="),
+                where=("group != 'some group'"),
+            )
+
+        Note that the expressions work the same way as that of
+        the ``ExcludeConstraint`` object itself; if plain strings are
+        passed, quoting rules must be applied manually.
+
+        :param name: Name of the constraint.
+        :param table_name: String name of the source table.
+        :param elements: exclude conditions.
+        :param where: SQL expression or SQL string with optional WHERE
+         clause.
+        :param deferrable: optional bool. If set, emit DEFERRABLE or
+         NOT DEFERRABLE when issuing DDL for this constraint.
+        :param initially: optional string. If set, emit INITIALLY <value>
+         when issuing DDL for this constraint.
+        :param schema: Optional schema name to operate within.
+
+        """
+        op = cls(constraint_name, table_name, elements, **kw)
+        return operations.invoke(op)
+
+    @classmethod
+    def batch_create_exclude_constraint(
+        cls,
+        operations: BatchOperations,
+        constraint_name: str,
+        *elements: Any,
+        **kw: Any,
+    ) -> Optional[Table]:
+        """Issue a "create exclude constraint" instruction using the
+        current batch migration context.
+
+        .. note::  This method is Postgresql specific, and additionally
+           requires at least SQLAlchemy 1.0.
+
+        .. seealso::
+
+            :meth:`.Operations.create_exclude_constraint`
+
+        """
+        kw["schema"] = operations.impl.schema
+        op = cls(constraint_name, operations.impl.table_name, elements, **kw)
+        return operations.invoke(op)
+
+
+@render.renderers.dispatch_for(CreateExcludeConstraintOp)
+def _add_exclude_constraint(
+    autogen_context: AutogenContext, op: CreateExcludeConstraintOp
+) -> str:
+    return _exclude_constraint(op.to_constraint(), autogen_context, alter=True)
+
+
+@render._constraint_renderers.dispatch_for(ExcludeConstraint)
+def _render_inline_exclude_constraint(
+    constraint: ExcludeConstraint,
+    autogen_context: AutogenContext,
+    namespace_metadata: MetaData,
+) -> str:
+    rendered = render._user_defined_render(
+        "exclude", constraint, autogen_context
+    )
+    if rendered is not False:
+        return rendered
+
+    return _exclude_constraint(constraint, autogen_context, False)
+
+
+def _postgresql_autogenerate_prefix(autogen_context: AutogenContext) -> str:
+    imports = autogen_context.imports
+    if imports is not None:
+        imports.add("from sqlalchemy.dialects import postgresql")
+    return "postgresql."
+
+
+def _exclude_constraint(
+    constraint: ExcludeConstraint,
+    autogen_context: AutogenContext,
+    alter: bool,
+) -> str:
+    opts: List[Tuple[str, Union[quoted_name, str, _f_name, None]]] = []
+
+    has_batch = autogen_context._has_batch
+
+    if constraint.deferrable:
+        opts.append(("deferrable", str(constraint.deferrable)))
+    if constraint.initially:
+        opts.append(("initially", str(constraint.initially)))
+    if constraint.using:
+        opts.append(("using", str(constraint.using)))
+    if not has_batch and alter and constraint.table.schema:
+        opts.append(("schema", render._ident(constraint.table.schema)))
+    if not alter and constraint.name:
+        opts.append(
+            ("name", render._render_gen_name(autogen_context, constraint.name))
+        )
+
+    def do_expr_where_opts():
+        args = [
+            "(%s, %r)"
+            % (
+                _render_potential_column(
+                    sqltext,  # type:ignore[arg-type]
+                    autogen_context,
+                ),
+                opstring,
+            )
+            for sqltext, name, opstring in constraint._render_exprs
+        ]
+        if constraint.where is not None:
+            args.append(
+                "where=%s"
+                % render._render_potential_expr(
+                    constraint.where, autogen_context
+                )
+            )
+        args.extend(["%s=%r" % (k, v) for k, v in opts])
+        return args
+
+    if alter:
+        args = [
+            repr(render._render_gen_name(autogen_context, constraint.name))
+        ]
+        if not has_batch:
+            args += [repr(render._ident(constraint.table.name))]
+        args.extend(do_expr_where_opts())
+        return "%(prefix)screate_exclude_constraint(%(args)s)" % {
+            "prefix": render._alembic_autogenerate_prefix(autogen_context),
+            "args": ", ".join(args),
+        }
+    else:
+        args = do_expr_where_opts()
+        return "%(prefix)sExcludeConstraint(%(args)s)" % {
+            "prefix": _postgresql_autogenerate_prefix(autogen_context),
+            "args": ", ".join(args),
+        }
+
+
+def _render_potential_column(
+    value: Union[
+        ColumnClause[Any], Column[Any], TextClause, FunctionElement[Any]
+    ],
+    autogen_context: AutogenContext,
+) -> str:
+    if isinstance(value, ColumnClause):
+        if value.is_literal:
+            # like literal_column("int8range(from, to)") in ExcludeConstraint
+            template = "%(prefix)sliteral_column(%(name)r)"
+        else:
+            template = "%(prefix)scolumn(%(name)r)"
+
+        return template % {
+            "prefix": render._sqlalchemy_autogenerate_prefix(autogen_context),
+            "name": value.name,
+        }
+    else:
+        return render._render_potential_expr(
+            value,
+            autogen_context,
+            wrap_in_element=isinstance(value, (TextClause, FunctionElement)),
+        )