about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/util/_py_collections.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/util/_py_collections.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/util/_py_collections.py541
1 files changed, 541 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/util/_py_collections.py b/.venv/lib/python3.12/site-packages/sqlalchemy/util/_py_collections.py
new file mode 100644
index 00000000..f6aefcf6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/util/_py_collections.py
@@ -0,0 +1,541 @@
+# util/_py_collections.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: allow-untyped-defs, allow-untyped-calls
+
+from __future__ import annotations
+
+from itertools import filterfalse
+from typing import AbstractSet
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Collection
+from typing import Dict
+from typing import Iterable
+from typing import Iterator
+from typing import List
+from typing import Mapping
+from typing import NoReturn
+from typing import Optional
+from typing import Set
+from typing import Tuple
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+
+from ..util.typing import Self
+
+_T = TypeVar("_T", bound=Any)
+_S = TypeVar("_S", bound=Any)
+_KT = TypeVar("_KT", bound=Any)
+_VT = TypeVar("_VT", bound=Any)
+
+
+class ReadOnlyContainer:
+    __slots__ = ()
+
+    def _readonly(self, *arg: Any, **kw: Any) -> NoReturn:
+        raise TypeError(
+            "%s object is immutable and/or readonly" % self.__class__.__name__
+        )
+
+    def _immutable(self, *arg: Any, **kw: Any) -> NoReturn:
+        raise TypeError("%s object is immutable" % self.__class__.__name__)
+
+    def __delitem__(self, key: Any) -> NoReturn:
+        self._readonly()
+
+    def __setitem__(self, key: Any, value: Any) -> NoReturn:
+        self._readonly()
+
+    def __setattr__(self, key: str, value: Any) -> NoReturn:
+        self._readonly()
+
+
+class ImmutableDictBase(ReadOnlyContainer, Dict[_KT, _VT]):
+    if TYPE_CHECKING:
+
+        def __new__(cls, *args: Any) -> Self: ...
+
+        def __init__(cls, *args: Any): ...
+
+    def _readonly(self, *arg: Any, **kw: Any) -> NoReturn:
+        self._immutable()
+
+    def clear(self) -> NoReturn:
+        self._readonly()
+
+    def pop(self, key: Any, default: Optional[Any] = None) -> NoReturn:
+        self._readonly()
+
+    def popitem(self) -> NoReturn:
+        self._readonly()
+
+    def setdefault(self, key: Any, default: Optional[Any] = None) -> NoReturn:
+        self._readonly()
+
+    def update(self, *arg: Any, **kw: Any) -> NoReturn:
+        self._readonly()
+
+
+class immutabledict(ImmutableDictBase[_KT, _VT]):
+    def __new__(cls, *args):
+        new = ImmutableDictBase.__new__(cls)
+        dict.__init__(new, *args)
+        return new
+
+    def __init__(
+        self, *args: Union[Mapping[_KT, _VT], Iterable[Tuple[_KT, _VT]]]
+    ):
+        pass
+
+    def __reduce__(self):
+        return immutabledict, (dict(self),)
+
+    def union(
+        self, __d: Optional[Mapping[_KT, _VT]] = None
+    ) -> immutabledict[_KT, _VT]:
+        if not __d:
+            return self
+
+        new = ImmutableDictBase.__new__(self.__class__)
+        dict.__init__(new, self)
+        dict.update(new, __d)  # type: ignore
+        return new
+
+    def _union_w_kw(
+        self, __d: Optional[Mapping[_KT, _VT]] = None, **kw: _VT
+    ) -> immutabledict[_KT, _VT]:
+        # not sure if C version works correctly w/ this yet
+        if not __d and not kw:
+            return self
+
+        new = ImmutableDictBase.__new__(self.__class__)
+        dict.__init__(new, self)
+        if __d:
+            dict.update(new, __d)  # type: ignore
+        dict.update(new, kw)  # type: ignore
+        return new
+
+    def merge_with(
+        self, *dicts: Optional[Mapping[_KT, _VT]]
+    ) -> immutabledict[_KT, _VT]:
+        new = None
+        for d in dicts:
+            if d:
+                if new is None:
+                    new = ImmutableDictBase.__new__(self.__class__)
+                    dict.__init__(new, self)
+                dict.update(new, d)  # type: ignore
+        if new is None:
+            return self
+
+        return new
+
+    def __repr__(self) -> str:
+        return "immutabledict(%s)" % dict.__repr__(self)
+
+    # PEP 584
+    def __ior__(self, __value: Any) -> NoReturn:  # type: ignore
+        self._readonly()
+
+    def __or__(  # type: ignore[override]
+        self, __value: Mapping[_KT, _VT]
+    ) -> immutabledict[_KT, _VT]:
+        return immutabledict(
+            super().__or__(__value),  # type: ignore[call-overload]
+        )
+
+    def __ror__(  # type: ignore[override]
+        self, __value: Mapping[_KT, _VT]
+    ) -> immutabledict[_KT, _VT]:
+        return immutabledict(
+            super().__ror__(__value),  # type: ignore[call-overload]
+        )
+
+
+class OrderedSet(Set[_T]):
+    __slots__ = ("_list",)
+
+    _list: List[_T]
+
+    def __init__(self, d: Optional[Iterable[_T]] = None) -> None:
+        if d is not None:
+            self._list = unique_list(d)
+            super().update(self._list)
+        else:
+            self._list = []
+
+    def copy(self) -> OrderedSet[_T]:
+        cp = self.__class__()
+        cp._list = self._list.copy()
+        set.update(cp, cp._list)
+        return cp
+
+    def add(self, element: _T) -> None:
+        if element not in self:
+            self._list.append(element)
+        super().add(element)
+
+    def remove(self, element: _T) -> None:
+        super().remove(element)
+        self._list.remove(element)
+
+    def pop(self) -> _T:
+        try:
+            value = self._list.pop()
+        except IndexError:
+            raise KeyError("pop from an empty set") from None
+        super().remove(value)
+        return value
+
+    def insert(self, pos: int, element: _T) -> None:
+        if element not in self:
+            self._list.insert(pos, element)
+        super().add(element)
+
+    def discard(self, element: _T) -> None:
+        if element in self:
+            self._list.remove(element)
+            super().remove(element)
+
+    def clear(self) -> None:
+        super().clear()
+        self._list = []
+
+    def __getitem__(self, key: int) -> _T:
+        return self._list[key]
+
+    def __iter__(self) -> Iterator[_T]:
+        return iter(self._list)
+
+    def __add__(self, other: Iterator[_T]) -> OrderedSet[_T]:
+        return self.union(other)
+
+    def __repr__(self) -> str:
+        return "%s(%r)" % (self.__class__.__name__, self._list)
+
+    __str__ = __repr__
+
+    def update(self, *iterables: Iterable[_T]) -> None:
+        for iterable in iterables:
+            for e in iterable:
+                if e not in self:
+                    self._list.append(e)
+                    super().add(e)
+
+    def __ior__(self, other: AbstractSet[_S]) -> OrderedSet[Union[_T, _S]]:
+        self.update(other)
+        return self
+
+    def union(self, *other: Iterable[_S]) -> OrderedSet[Union[_T, _S]]:
+        result: OrderedSet[Union[_T, _S]] = self.copy()
+        result.update(*other)
+        return result
+
+    def __or__(self, other: AbstractSet[_S]) -> OrderedSet[Union[_T, _S]]:
+        return self.union(other)
+
+    def intersection(self, *other: Iterable[Any]) -> OrderedSet[_T]:
+        other_set: Set[Any] = set()
+        other_set.update(*other)
+        return self.__class__(a for a in self if a in other_set)
+
+    def __and__(self, other: AbstractSet[object]) -> OrderedSet[_T]:
+        return self.intersection(other)
+
+    def symmetric_difference(self, other: Iterable[_T]) -> OrderedSet[_T]:
+        collection: Collection[_T]
+        if isinstance(other, set):
+            collection = other_set = other
+        elif isinstance(other, Collection):
+            collection = other
+            other_set = set(other)
+        else:
+            collection = list(other)
+            other_set = set(collection)
+        result = self.__class__(a for a in self if a not in other_set)
+        result.update(a for a in collection if a not in self)
+        return result
+
+    def __xor__(self, other: AbstractSet[_S]) -> OrderedSet[Union[_T, _S]]:
+        return cast(OrderedSet[Union[_T, _S]], self).symmetric_difference(
+            other
+        )
+
+    def difference(self, *other: Iterable[Any]) -> OrderedSet[_T]:
+        other_set = super().difference(*other)
+        return self.__class__(a for a in self._list if a in other_set)
+
+    def __sub__(self, other: AbstractSet[Optional[_T]]) -> OrderedSet[_T]:
+        return self.difference(other)
+
+    def intersection_update(self, *other: Iterable[Any]) -> None:
+        super().intersection_update(*other)
+        self._list = [a for a in self._list if a in self]
+
+    def __iand__(self, other: AbstractSet[object]) -> OrderedSet[_T]:
+        self.intersection_update(other)
+        return self
+
+    def symmetric_difference_update(self, other: Iterable[Any]) -> None:
+        collection = other if isinstance(other, Collection) else list(other)
+        super().symmetric_difference_update(collection)
+        self._list = [a for a in self._list if a in self]
+        self._list += [a for a in collection if a in self]
+
+    def __ixor__(self, other: AbstractSet[_S]) -> OrderedSet[Union[_T, _S]]:
+        self.symmetric_difference_update(other)
+        return cast(OrderedSet[Union[_T, _S]], self)
+
+    def difference_update(self, *other: Iterable[Any]) -> None:
+        super().difference_update(*other)
+        self._list = [a for a in self._list if a in self]
+
+    def __isub__(self, other: AbstractSet[Optional[_T]]) -> OrderedSet[_T]:  # type: ignore  # noqa: E501
+        self.difference_update(other)
+        return self
+
+
+class IdentitySet:
+    """A set that considers only object id() for uniqueness.
+
+    This strategy has edge cases for builtin types- it's possible to have
+    two 'foo' strings in one of these sets, for example.  Use sparingly.
+
+    """
+
+    _members: Dict[int, Any]
+
+    def __init__(self, iterable: Optional[Iterable[Any]] = None):
+        self._members = dict()
+        if iterable:
+            self.update(iterable)
+
+    def add(self, value: Any) -> None:
+        self._members[id(value)] = value
+
+    def __contains__(self, value: Any) -> bool:
+        return id(value) in self._members
+
+    def remove(self, value: Any) -> None:
+        del self._members[id(value)]
+
+    def discard(self, value: Any) -> None:
+        try:
+            self.remove(value)
+        except KeyError:
+            pass
+
+    def pop(self) -> Any:
+        try:
+            pair = self._members.popitem()
+            return pair[1]
+        except KeyError:
+            raise KeyError("pop from an empty set")
+
+    def clear(self) -> None:
+        self._members.clear()
+
+    def __eq__(self, other: Any) -> bool:
+        if isinstance(other, IdentitySet):
+            return self._members == other._members
+        else:
+            return False
+
+    def __ne__(self, other: Any) -> bool:
+        if isinstance(other, IdentitySet):
+            return self._members != other._members
+        else:
+            return True
+
+    def issubset(self, iterable: Iterable[Any]) -> bool:
+        if isinstance(iterable, self.__class__):
+            other = iterable
+        else:
+            other = self.__class__(iterable)
+
+        if len(self) > len(other):
+            return False
+        for m in filterfalse(
+            other._members.__contains__, iter(self._members.keys())
+        ):
+            return False
+        return True
+
+    def __le__(self, other: Any) -> bool:
+        if not isinstance(other, IdentitySet):
+            return NotImplemented
+        return self.issubset(other)
+
+    def __lt__(self, other: Any) -> bool:
+        if not isinstance(other, IdentitySet):
+            return NotImplemented
+        return len(self) < len(other) and self.issubset(other)
+
+    def issuperset(self, iterable: Iterable[Any]) -> bool:
+        if isinstance(iterable, self.__class__):
+            other = iterable
+        else:
+            other = self.__class__(iterable)
+
+        if len(self) < len(other):
+            return False
+
+        for m in filterfalse(
+            self._members.__contains__, iter(other._members.keys())
+        ):
+            return False
+        return True
+
+    def __ge__(self, other: Any) -> bool:
+        if not isinstance(other, IdentitySet):
+            return NotImplemented
+        return self.issuperset(other)
+
+    def __gt__(self, other: Any) -> bool:
+        if not isinstance(other, IdentitySet):
+            return NotImplemented
+        return len(self) > len(other) and self.issuperset(other)
+
+    def union(self, iterable: Iterable[Any]) -> IdentitySet:
+        result = self.__class__()
+        members = self._members
+        result._members.update(members)
+        result._members.update((id(obj), obj) for obj in iterable)
+        return result
+
+    def __or__(self, other: Any) -> IdentitySet:
+        if not isinstance(other, IdentitySet):
+            return NotImplemented
+        return self.union(other)
+
+    def update(self, iterable: Iterable[Any]) -> None:
+        self._members.update((id(obj), obj) for obj in iterable)
+
+    def __ior__(self, other: Any) -> IdentitySet:
+        if not isinstance(other, IdentitySet):
+            return NotImplemented
+        self.update(other)
+        return self
+
+    def difference(self, iterable: Iterable[Any]) -> IdentitySet:
+        result = self.__new__(self.__class__)
+        other: Collection[Any]
+
+        if isinstance(iterable, self.__class__):
+            other = iterable._members
+        else:
+            other = {id(obj) for obj in iterable}
+        result._members = {
+            k: v for k, v in self._members.items() if k not in other
+        }
+        return result
+
+    def __sub__(self, other: IdentitySet) -> IdentitySet:
+        if not isinstance(other, IdentitySet):
+            return NotImplemented
+        return self.difference(other)
+
+    def difference_update(self, iterable: Iterable[Any]) -> None:
+        self._members = self.difference(iterable)._members
+
+    def __isub__(self, other: IdentitySet) -> IdentitySet:
+        if not isinstance(other, IdentitySet):
+            return NotImplemented
+        self.difference_update(other)
+        return self
+
+    def intersection(self, iterable: Iterable[Any]) -> IdentitySet:
+        result = self.__new__(self.__class__)
+
+        other: Collection[Any]
+
+        if isinstance(iterable, self.__class__):
+            other = iterable._members
+        else:
+            other = {id(obj) for obj in iterable}
+        result._members = {
+            k: v for k, v in self._members.items() if k in other
+        }
+        return result
+
+    def __and__(self, other: IdentitySet) -> IdentitySet:
+        if not isinstance(other, IdentitySet):
+            return NotImplemented
+        return self.intersection(other)
+
+    def intersection_update(self, iterable: Iterable[Any]) -> None:
+        self._members = self.intersection(iterable)._members
+
+    def __iand__(self, other: IdentitySet) -> IdentitySet:
+        if not isinstance(other, IdentitySet):
+            return NotImplemented
+        self.intersection_update(other)
+        return self
+
+    def symmetric_difference(self, iterable: Iterable[Any]) -> IdentitySet:
+        result = self.__new__(self.__class__)
+        if isinstance(iterable, self.__class__):
+            other = iterable._members
+        else:
+            other = {id(obj): obj for obj in iterable}
+        result._members = {
+            k: v for k, v in self._members.items() if k not in other
+        }
+        result._members.update(
+            (k, v) for k, v in other.items() if k not in self._members
+        )
+        return result
+
+    def __xor__(self, other: IdentitySet) -> IdentitySet:
+        if not isinstance(other, IdentitySet):
+            return NotImplemented
+        return self.symmetric_difference(other)
+
+    def symmetric_difference_update(self, iterable: Iterable[Any]) -> None:
+        self._members = self.symmetric_difference(iterable)._members
+
+    def __ixor__(self, other: IdentitySet) -> IdentitySet:
+        if not isinstance(other, IdentitySet):
+            return NotImplemented
+        self.symmetric_difference(other)
+        return self
+
+    def copy(self) -> IdentitySet:
+        result = self.__new__(self.__class__)
+        result._members = self._members.copy()
+        return result
+
+    __copy__ = copy
+
+    def __len__(self) -> int:
+        return len(self._members)
+
+    def __iter__(self) -> Iterator[Any]:
+        return iter(self._members.values())
+
+    def __hash__(self) -> NoReturn:
+        raise TypeError("set objects are unhashable")
+
+    def __repr__(self) -> str:
+        return "%s(%r)" % (type(self).__name__, list(self._members.values()))
+
+
+def unique_list(
+    seq: Iterable[_T], hashfunc: Optional[Callable[[_T], int]] = None
+) -> List[_T]:
+    seen: Set[Any] = set()
+    seen_add = seen.add
+    if not hashfunc:
+        return [x for x in seq if x not in seen and not seen_add(x)]
+    else:
+        return [
+            x
+            for x in seq
+            if hashfunc(x) not in seen and not seen_add(hashfunc(x))
+        ]