about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/ext/mypy/decl_class.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sqlalchemy/ext/mypy/decl_class.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/ext/mypy/decl_class.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/ext/mypy/decl_class.py515
1 files changed, 515 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/ext/mypy/decl_class.py b/.venv/lib/python3.12/site-packages/sqlalchemy/ext/mypy/decl_class.py
new file mode 100644
index 00000000..2ce7ad56
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/ext/mypy/decl_class.py
@@ -0,0 +1,515 @@
+# ext/mypy/decl_class.py
+# Copyright (C) 2021-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+
+from __future__ import annotations
+
+from typing import List
+from typing import Optional
+from typing import Union
+
+from mypy.nodes import AssignmentStmt
+from mypy.nodes import CallExpr
+from mypy.nodes import ClassDef
+from mypy.nodes import Decorator
+from mypy.nodes import LambdaExpr
+from mypy.nodes import ListExpr
+from mypy.nodes import MemberExpr
+from mypy.nodes import NameExpr
+from mypy.nodes import PlaceholderNode
+from mypy.nodes import RefExpr
+from mypy.nodes import StrExpr
+from mypy.nodes import SymbolNode
+from mypy.nodes import SymbolTableNode
+from mypy.nodes import TempNode
+from mypy.nodes import TypeInfo
+from mypy.nodes import Var
+from mypy.plugin import SemanticAnalyzerPluginInterface
+from mypy.types import AnyType
+from mypy.types import CallableType
+from mypy.types import get_proper_type
+from mypy.types import Instance
+from mypy.types import NoneType
+from mypy.types import ProperType
+from mypy.types import Type
+from mypy.types import TypeOfAny
+from mypy.types import UnboundType
+from mypy.types import UnionType
+
+from . import apply
+from . import infer
+from . import names
+from . import util
+
+
+def scan_declarative_assignments_and_apply_types(
+    cls: ClassDef,
+    api: SemanticAnalyzerPluginInterface,
+    is_mixin_scan: bool = False,
+) -> Optional[List[util.SQLAlchemyAttribute]]:
+    info = util.info_for_cls(cls, api)
+
+    if info is None:
+        # this can occur during cached passes
+        return None
+    elif cls.fullname.startswith("builtins"):
+        return None
+
+    mapped_attributes: Optional[List[util.SQLAlchemyAttribute]] = (
+        util.get_mapped_attributes(info, api)
+    )
+
+    # used by assign.add_additional_orm_attributes among others
+    util.establish_as_sqlalchemy(info)
+
+    if mapped_attributes is not None:
+        # ensure that a class that's mapped is always picked up by
+        # its mapped() decorator or declarative metaclass before
+        # it would be detected as an unmapped mixin class
+
+        if not is_mixin_scan:
+            # mypy can call us more than once.  it then *may* have reset the
+            # left hand side of everything, but not the right that we removed,
+            # removing our ability to re-scan.   but we have the types
+            # here, so lets re-apply them, or if we have an UnboundType,
+            # we can re-scan
+
+            apply.re_apply_declarative_assignments(cls, api, mapped_attributes)
+
+        return mapped_attributes
+
+    mapped_attributes = []
+
+    if not cls.defs.body:
+        # when we get a mixin class from another file, the body is
+        # empty (!) but the names are in the symbol table.  so use that.
+
+        for sym_name, sym in info.names.items():
+            _scan_symbol_table_entry(
+                cls, api, sym_name, sym, mapped_attributes
+            )
+    else:
+        for stmt in util.flatten_typechecking(cls.defs.body):
+            if isinstance(stmt, AssignmentStmt):
+                _scan_declarative_assignment_stmt(
+                    cls, api, stmt, mapped_attributes
+                )
+            elif isinstance(stmt, Decorator):
+                _scan_declarative_decorator_stmt(
+                    cls, api, stmt, mapped_attributes
+                )
+    _scan_for_mapped_bases(cls, api)
+
+    if not is_mixin_scan:
+        apply.add_additional_orm_attributes(cls, api, mapped_attributes)
+
+    util.set_mapped_attributes(info, mapped_attributes)
+
+    return mapped_attributes
+
+
+def _scan_symbol_table_entry(
+    cls: ClassDef,
+    api: SemanticAnalyzerPluginInterface,
+    name: str,
+    value: SymbolTableNode,
+    attributes: List[util.SQLAlchemyAttribute],
+) -> None:
+    """Extract mapping information from a SymbolTableNode that's in the
+    type.names dictionary.
+
+    """
+    value_type = get_proper_type(value.type)
+    if not isinstance(value_type, Instance):
+        return
+
+    left_hand_explicit_type = None
+    type_id = names.type_id_for_named_node(value_type.type)
+    # type_id = names._type_id_for_unbound_type(value.type.type, cls, api)
+
+    err = False
+
+    # TODO: this is nearly the same logic as that of
+    # _scan_declarative_decorator_stmt, likely can be merged
+    if type_id in {
+        names.MAPPED,
+        names.RELATIONSHIP,
+        names.COMPOSITE_PROPERTY,
+        names.MAPPER_PROPERTY,
+        names.SYNONYM_PROPERTY,
+        names.COLUMN_PROPERTY,
+    }:
+        if value_type.args:
+            left_hand_explicit_type = get_proper_type(value_type.args[0])
+        else:
+            err = True
+    elif type_id is names.COLUMN:
+        if not value_type.args:
+            err = True
+        else:
+            typeengine_arg: Union[ProperType, TypeInfo] = get_proper_type(
+                value_type.args[0]
+            )
+            if isinstance(typeengine_arg, Instance):
+                typeengine_arg = typeengine_arg.type
+
+            if isinstance(typeengine_arg, (UnboundType, TypeInfo)):
+                sym = api.lookup_qualified(typeengine_arg.name, typeengine_arg)
+                if sym is not None and isinstance(sym.node, TypeInfo):
+                    if names.has_base_type_id(sym.node, names.TYPEENGINE):
+                        left_hand_explicit_type = UnionType(
+                            [
+                                infer.extract_python_type_from_typeengine(
+                                    api, sym.node, []
+                                ),
+                                NoneType(),
+                            ]
+                        )
+                    else:
+                        util.fail(
+                            api,
+                            "Column type should be a TypeEngine "
+                            "subclass not '{}'".format(sym.node.fullname),
+                            value_type,
+                        )
+
+    if err:
+        msg = (
+            "Can't infer type from attribute {} on class {}. "
+            "please specify a return type from this function that is "
+            "one of: Mapped[<python type>], relationship[<target class>], "
+            "Column[<TypeEngine>], MapperProperty[<python type>]"
+        )
+        util.fail(api, msg.format(name, cls.name), cls)
+
+        left_hand_explicit_type = AnyType(TypeOfAny.special_form)
+
+    if left_hand_explicit_type is not None:
+        assert value.node is not None
+        attributes.append(
+            util.SQLAlchemyAttribute(
+                name=name,
+                line=value.node.line,
+                column=value.node.column,
+                typ=left_hand_explicit_type,
+                info=cls.info,
+            )
+        )
+
+
+def _scan_declarative_decorator_stmt(
+    cls: ClassDef,
+    api: SemanticAnalyzerPluginInterface,
+    stmt: Decorator,
+    attributes: List[util.SQLAlchemyAttribute],
+) -> None:
+    """Extract mapping information from a @declared_attr in a declarative
+    class.
+
+    E.g.::
+
+        @reg.mapped
+        class MyClass:
+            # ...
+
+            @declared_attr
+            def updated_at(cls) -> Column[DateTime]:
+                return Column(DateTime)
+
+    Will resolve in mypy as::
+
+        @reg.mapped
+        class MyClass:
+            # ...
+
+            updated_at: Mapped[Optional[datetime.datetime]]
+
+    """
+    for dec in stmt.decorators:
+        if (
+            isinstance(dec, (NameExpr, MemberExpr, SymbolNode))
+            and names.type_id_for_named_node(dec) is names.DECLARED_ATTR
+        ):
+            break
+    else:
+        return
+
+    dec_index = cls.defs.body.index(stmt)
+
+    left_hand_explicit_type: Optional[ProperType] = None
+
+    if util.name_is_dunder(stmt.name):
+        # for dunder names like __table_args__, __tablename__,
+        # __mapper_args__ etc., rewrite these as simple assignment
+        # statements; otherwise mypy doesn't like if the decorated
+        # function has an annotation like ``cls: Type[Foo]`` because
+        # it isn't @classmethod
+        any_ = AnyType(TypeOfAny.special_form)
+        left_node = NameExpr(stmt.var.name)
+        left_node.node = stmt.var
+        new_stmt = AssignmentStmt([left_node], TempNode(any_))
+        new_stmt.type = left_node.node.type
+        cls.defs.body[dec_index] = new_stmt
+        return
+    elif isinstance(stmt.func.type, CallableType):
+        func_type = stmt.func.type.ret_type
+        if isinstance(func_type, UnboundType):
+            type_id = names.type_id_for_unbound_type(func_type, cls, api)
+        else:
+            # this does not seem to occur unless the type argument is
+            # incorrect
+            return
+
+        if (
+            type_id
+            in {
+                names.MAPPED,
+                names.RELATIONSHIP,
+                names.COMPOSITE_PROPERTY,
+                names.MAPPER_PROPERTY,
+                names.SYNONYM_PROPERTY,
+                names.COLUMN_PROPERTY,
+            }
+            and func_type.args
+        ):
+            left_hand_explicit_type = get_proper_type(func_type.args[0])
+        elif type_id is names.COLUMN and func_type.args:
+            typeengine_arg = func_type.args[0]
+            if isinstance(typeengine_arg, UnboundType):
+                sym = api.lookup_qualified(typeengine_arg.name, typeengine_arg)
+                if sym is not None and isinstance(sym.node, TypeInfo):
+                    if names.has_base_type_id(sym.node, names.TYPEENGINE):
+                        left_hand_explicit_type = UnionType(
+                            [
+                                infer.extract_python_type_from_typeengine(
+                                    api, sym.node, []
+                                ),
+                                NoneType(),
+                            ]
+                        )
+                    else:
+                        util.fail(
+                            api,
+                            "Column type should be a TypeEngine "
+                            "subclass not '{}'".format(sym.node.fullname),
+                            func_type,
+                        )
+
+    if left_hand_explicit_type is None:
+        # no type on the decorated function.  our option here is to
+        # dig into the function body and get the return type, but they
+        # should just have an annotation.
+        msg = (
+            "Can't infer type from @declared_attr on function '{}';  "
+            "please specify a return type from this function that is "
+            "one of: Mapped[<python type>], relationship[<target class>], "
+            "Column[<TypeEngine>], MapperProperty[<python type>]"
+        )
+        util.fail(api, msg.format(stmt.var.name), stmt)
+
+        left_hand_explicit_type = AnyType(TypeOfAny.special_form)
+
+    left_node = NameExpr(stmt.var.name)
+    left_node.node = stmt.var
+
+    # totally feeling around in the dark here as I don't totally understand
+    # the significance of UnboundType.  It seems to be something that is
+    # not going to do what's expected when it is applied as the type of
+    # an AssignmentStatement.  So do a feeling-around-in-the-dark version
+    # of converting it to the regular Instance/TypeInfo/UnionType structures
+    # we see everywhere else.
+    if isinstance(left_hand_explicit_type, UnboundType):
+        left_hand_explicit_type = get_proper_type(
+            util.unbound_to_instance(api, left_hand_explicit_type)
+        )
+
+    left_node.node.type = api.named_type(
+        names.NAMED_TYPE_SQLA_MAPPED, [left_hand_explicit_type]
+    )
+
+    # this will ignore the rvalue entirely
+    # rvalue = TempNode(AnyType(TypeOfAny.special_form))
+
+    # rewrite the node as:
+    # <attr> : Mapped[<typ>] =
+    # _sa_Mapped._empty_constructor(lambda: <function body>)
+    # the function body is maintained so it gets type checked internally
+    rvalue = names.expr_to_mapped_constructor(
+        LambdaExpr(stmt.func.arguments, stmt.func.body)
+    )
+
+    new_stmt = AssignmentStmt([left_node], rvalue)
+    new_stmt.type = left_node.node.type
+
+    attributes.append(
+        util.SQLAlchemyAttribute(
+            name=left_node.name,
+            line=stmt.line,
+            column=stmt.column,
+            typ=left_hand_explicit_type,
+            info=cls.info,
+        )
+    )
+    cls.defs.body[dec_index] = new_stmt
+
+
+def _scan_declarative_assignment_stmt(
+    cls: ClassDef,
+    api: SemanticAnalyzerPluginInterface,
+    stmt: AssignmentStmt,
+    attributes: List[util.SQLAlchemyAttribute],
+) -> None:
+    """Extract mapping information from an assignment statement in a
+    declarative class.
+
+    """
+    lvalue = stmt.lvalues[0]
+    if not isinstance(lvalue, NameExpr):
+        return
+
+    sym = cls.info.names.get(lvalue.name)
+
+    # this establishes that semantic analysis has taken place, which
+    # means the nodes are populated and we are called from an appropriate
+    # hook.
+    assert sym is not None
+    node = sym.node
+
+    if isinstance(node, PlaceholderNode):
+        return
+
+    assert node is lvalue.node
+    assert isinstance(node, Var)
+
+    if node.name == "__abstract__":
+        if api.parse_bool(stmt.rvalue) is True:
+            util.set_is_base(cls.info)
+        return
+    elif node.name == "__tablename__":
+        util.set_has_table(cls.info)
+    elif node.name.startswith("__"):
+        return
+    elif node.name == "_mypy_mapped_attrs":
+        if not isinstance(stmt.rvalue, ListExpr):
+            util.fail(api, "_mypy_mapped_attrs is expected to be a list", stmt)
+        else:
+            for item in stmt.rvalue.items:
+                if isinstance(item, (NameExpr, StrExpr)):
+                    apply.apply_mypy_mapped_attr(cls, api, item, attributes)
+
+    left_hand_mapped_type: Optional[Type] = None
+    left_hand_explicit_type: Optional[ProperType] = None
+
+    if node.is_inferred or node.type is None:
+        if isinstance(stmt.type, UnboundType):
+            # look for an explicit Mapped[] type annotation on the left
+            # side with nothing on the right
+
+            # print(stmt.type)
+            # Mapped?[Optional?[A?]]
+
+            left_hand_explicit_type = stmt.type
+
+            if stmt.type.name == "Mapped":
+                mapped_sym = api.lookup_qualified("Mapped", cls)
+                if (
+                    mapped_sym is not None
+                    and mapped_sym.node is not None
+                    and names.type_id_for_named_node(mapped_sym.node)
+                    is names.MAPPED
+                ):
+                    left_hand_explicit_type = get_proper_type(
+                        stmt.type.args[0]
+                    )
+                    left_hand_mapped_type = stmt.type
+
+            # TODO: do we need to convert from unbound for this case?
+            # left_hand_explicit_type = util._unbound_to_instance(
+            #     api, left_hand_explicit_type
+            # )
+    else:
+        node_type = get_proper_type(node.type)
+        if (
+            isinstance(node_type, Instance)
+            and names.type_id_for_named_node(node_type.type) is names.MAPPED
+        ):
+            # print(node.type)
+            # sqlalchemy.orm.attributes.Mapped[<python type>]
+            left_hand_explicit_type = get_proper_type(node_type.args[0])
+            left_hand_mapped_type = node_type
+        else:
+            # print(node.type)
+            # <python type>
+            left_hand_explicit_type = node_type
+            left_hand_mapped_type = None
+
+    if isinstance(stmt.rvalue, TempNode) and left_hand_mapped_type is not None:
+        # annotation without assignment and Mapped is present
+        # as type annotation
+        # equivalent to using _infer_type_from_left_hand_type_only.
+
+        python_type_for_type = left_hand_explicit_type
+    elif isinstance(stmt.rvalue, CallExpr) and isinstance(
+        stmt.rvalue.callee, RefExpr
+    ):
+        python_type_for_type = infer.infer_type_from_right_hand_nameexpr(
+            api, stmt, node, left_hand_explicit_type, stmt.rvalue.callee
+        )
+
+        if python_type_for_type is None:
+            return
+
+    else:
+        return
+
+    assert python_type_for_type is not None
+
+    attributes.append(
+        util.SQLAlchemyAttribute(
+            name=node.name,
+            line=stmt.line,
+            column=stmt.column,
+            typ=python_type_for_type,
+            info=cls.info,
+        )
+    )
+
+    apply.apply_type_to_mapped_statement(
+        api,
+        stmt,
+        lvalue,
+        left_hand_explicit_type,
+        python_type_for_type,
+    )
+
+
+def _scan_for_mapped_bases(
+    cls: ClassDef,
+    api: SemanticAnalyzerPluginInterface,
+) -> None:
+    """Given a class, iterate through its superclass hierarchy to find
+    all other classes that are considered as ORM-significant.
+
+    Locates non-mapped mixins and scans them for mapped attributes to be
+    applied to subclasses.
+
+    """
+
+    info = util.info_for_cls(cls, api)
+
+    if info is None:
+        return
+
+    for base_info in info.mro[1:-1]:
+        if base_info.fullname.startswith("builtins"):
+            continue
+
+        # scan each base for mapped attributes.  if they are not already
+        # scanned (but have all their type info), that means they are unmapped
+        # mixins
+        scan_declarative_assignments_and_apply_types(
+            base_info.defn, api, is_mixin_scan=True
+        )