aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/alembic/util/langhelpers.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/util/langhelpers.py')
-rw-r--r--.venv/lib/python3.12/site-packages/alembic/util/langhelpers.py332
1 files changed, 332 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/util/langhelpers.py b/.venv/lib/python3.12/site-packages/alembic/util/langhelpers.py
new file mode 100644
index 00000000..80d88cbc
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/alembic/util/langhelpers.py
@@ -0,0 +1,332 @@
+from __future__ import annotations
+
+import collections
+from collections.abc import Iterable
+import textwrap
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import List
+from typing import Mapping
+from typing import MutableMapping
+from typing import NoReturn
+from typing import Optional
+from typing import overload
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+import uuid
+import warnings
+
+from sqlalchemy.util import asbool as asbool # noqa: F401
+from sqlalchemy.util import immutabledict as immutabledict # noqa: F401
+from sqlalchemy.util import to_list as to_list # noqa: F401
+from sqlalchemy.util import unique_list as unique_list
+
+from .compat import inspect_getfullargspec
+
+if True:
+ # zimports workaround :(
+ from sqlalchemy.util import ( # noqa: F401
+ memoized_property as memoized_property,
+ )
+
+
+EMPTY_DICT: Mapping[Any, Any] = immutabledict()
+_T = TypeVar("_T", bound=Any)
+
+_C = TypeVar("_C", bound=Callable[..., Any])
+
+
+class _ModuleClsMeta(type):
+ def __setattr__(cls, key: str, value: Callable[..., Any]) -> None:
+ super().__setattr__(key, value)
+ cls._update_module_proxies(key) # type: ignore
+
+
+class ModuleClsProxy(metaclass=_ModuleClsMeta):
+ """Create module level proxy functions for the
+ methods on a given class.
+
+ The functions will have a compatible signature
+ as the methods.
+
+ """
+
+ _setups: Dict[
+ Type[Any],
+ Tuple[
+ Set[str],
+ List[Tuple[MutableMapping[str, Any], MutableMapping[str, Any]]],
+ ],
+ ] = collections.defaultdict(lambda: (set(), []))
+
+ @classmethod
+ def _update_module_proxies(cls, name: str) -> None:
+ attr_names, modules = cls._setups[cls]
+ for globals_, locals_ in modules:
+ cls._add_proxied_attribute(name, globals_, locals_, attr_names)
+
+ def _install_proxy(self) -> None:
+ attr_names, modules = self._setups[self.__class__]
+ for globals_, locals_ in modules:
+ globals_["_proxy"] = self
+ for attr_name in attr_names:
+ globals_[attr_name] = getattr(self, attr_name)
+
+ def _remove_proxy(self) -> None:
+ attr_names, modules = self._setups[self.__class__]
+ for globals_, locals_ in modules:
+ globals_["_proxy"] = None
+ for attr_name in attr_names:
+ del globals_[attr_name]
+
+ @classmethod
+ def create_module_class_proxy(
+ cls,
+ globals_: MutableMapping[str, Any],
+ locals_: MutableMapping[str, Any],
+ ) -> None:
+ attr_names, modules = cls._setups[cls]
+ modules.append((globals_, locals_))
+ cls._setup_proxy(globals_, locals_, attr_names)
+
+ @classmethod
+ def _setup_proxy(
+ cls,
+ globals_: MutableMapping[str, Any],
+ locals_: MutableMapping[str, Any],
+ attr_names: Set[str],
+ ) -> None:
+ for methname in dir(cls):
+ cls._add_proxied_attribute(methname, globals_, locals_, attr_names)
+
+ @classmethod
+ def _add_proxied_attribute(
+ cls,
+ methname: str,
+ globals_: MutableMapping[str, Any],
+ locals_: MutableMapping[str, Any],
+ attr_names: Set[str],
+ ) -> None:
+ if not methname.startswith("_"):
+ meth = getattr(cls, methname)
+ if callable(meth):
+ locals_[methname] = cls._create_method_proxy(
+ methname, globals_, locals_
+ )
+ else:
+ attr_names.add(methname)
+
+ @classmethod
+ def _create_method_proxy(
+ cls,
+ name: str,
+ globals_: MutableMapping[str, Any],
+ locals_: MutableMapping[str, Any],
+ ) -> Callable[..., Any]:
+ fn = getattr(cls, name)
+
+ def _name_error(name: str, from_: Exception) -> NoReturn:
+ raise NameError(
+ "Can't invoke function '%s', as the proxy object has "
+ "not yet been "
+ "established for the Alembic '%s' class. "
+ "Try placing this code inside a callable."
+ % (name, cls.__name__)
+ ) from from_
+
+ globals_["_name_error"] = _name_error
+
+ translations = getattr(fn, "_legacy_translations", [])
+ if translations:
+ spec = inspect_getfullargspec(fn)
+ if spec[0] and spec[0][0] == "self":
+ spec[0].pop(0)
+
+ outer_args = inner_args = "*args, **kw"
+ translate_str = "args, kw = _translate(%r, %r, %r, args, kw)" % (
+ fn.__name__,
+ tuple(spec),
+ translations,
+ )
+
+ def translate(
+ fn_name: str, spec: Any, translations: Any, args: Any, kw: Any
+ ) -> Any:
+ return_kw = {}
+ return_args = []
+
+ for oldname, newname in translations:
+ if oldname in kw:
+ warnings.warn(
+ "Argument %r is now named %r "
+ "for method %s()." % (oldname, newname, fn_name)
+ )
+ return_kw[newname] = kw.pop(oldname)
+ return_kw.update(kw)
+
+ args = list(args)
+ if spec[3]:
+ pos_only = spec[0][: -len(spec[3])]
+ else:
+ pos_only = spec[0]
+ for arg in pos_only:
+ if arg not in return_kw:
+ try:
+ return_args.append(args.pop(0))
+ except IndexError:
+ raise TypeError(
+ "missing required positional argument: %s"
+ % arg
+ )
+ return_args.extend(args)
+
+ return return_args, return_kw
+
+ globals_["_translate"] = translate
+ else:
+ outer_args = "*args, **kw"
+ inner_args = "*args, **kw"
+ translate_str = ""
+
+ func_text = textwrap.dedent(
+ """\
+ def %(name)s(%(args)s):
+ %(doc)r
+ %(translate)s
+ try:
+ p = _proxy
+ except NameError as ne:
+ _name_error('%(name)s', ne)
+ return _proxy.%(name)s(%(apply_kw)s)
+ e
+ """
+ % {
+ "name": name,
+ "translate": translate_str,
+ "args": outer_args,
+ "apply_kw": inner_args,
+ "doc": fn.__doc__,
+ }
+ )
+ lcl: MutableMapping[str, Any] = {}
+
+ exec(func_text, cast("Dict[str, Any]", globals_), lcl)
+ return cast("Callable[..., Any]", lcl[name])
+
+
+def _with_legacy_names(translations: Any) -> Any:
+ def decorate(fn: _C) -> _C:
+ fn._legacy_translations = translations # type: ignore[attr-defined]
+ return fn
+
+ return decorate
+
+
+def rev_id() -> str:
+ return uuid.uuid4().hex[-12:]
+
+
+@overload
+def to_tuple(x: Any, default: Tuple[Any, ...]) -> Tuple[Any, ...]: ...
+
+
+@overload
+def to_tuple(x: None, default: Optional[_T] = ...) -> _T: ...
+
+
+@overload
+def to_tuple(
+ x: Any, default: Optional[Tuple[Any, ...]] = None
+) -> Tuple[Any, ...]: ...
+
+
+def to_tuple(
+ x: Any, default: Optional[Tuple[Any, ...]] = None
+) -> Optional[Tuple[Any, ...]]:
+ if x is None:
+ return default
+ elif isinstance(x, str):
+ return (x,)
+ elif isinstance(x, Iterable):
+ return tuple(x)
+ else:
+ return (x,)
+
+
+def dedupe_tuple(tup: Tuple[str, ...]) -> Tuple[str, ...]:
+ return tuple(unique_list(tup))
+
+
+class Dispatcher:
+ def __init__(self, uselist: bool = False) -> None:
+ self._registry: Dict[Tuple[Any, ...], Any] = {}
+ self.uselist = uselist
+
+ def dispatch_for(
+ self, target: Any, qualifier: str = "default"
+ ) -> Callable[[_C], _C]:
+ def decorate(fn: _C) -> _C:
+ if self.uselist:
+ self._registry.setdefault((target, qualifier), []).append(fn)
+ else:
+ assert (target, qualifier) not in self._registry
+ self._registry[(target, qualifier)] = fn
+ return fn
+
+ return decorate
+
+ def dispatch(self, obj: Any, qualifier: str = "default") -> Any:
+ if isinstance(obj, str):
+ targets: Sequence[Any] = [obj]
+ elif isinstance(obj, type):
+ targets = obj.__mro__
+ else:
+ targets = type(obj).__mro__
+
+ for spcls in targets:
+ if qualifier != "default" and (spcls, qualifier) in self._registry:
+ return self._fn_or_list(self._registry[(spcls, qualifier)])
+ elif (spcls, "default") in self._registry:
+ return self._fn_or_list(self._registry[(spcls, "default")])
+ else:
+ raise ValueError("no dispatch function for object: %s" % obj)
+
+ def _fn_or_list(
+ self, fn_or_list: Union[List[Callable[..., Any]], Callable[..., Any]]
+ ) -> Callable[..., Any]:
+ if self.uselist:
+
+ def go(*arg: Any, **kw: Any) -> None:
+ if TYPE_CHECKING:
+ assert isinstance(fn_or_list, Sequence)
+ for fn in fn_or_list:
+ fn(*arg, **kw)
+
+ return go
+ else:
+ return fn_or_list # type: ignore
+
+ def branch(self) -> Dispatcher:
+ """Return a copy of this dispatcher that is independently
+ writable."""
+
+ d = Dispatcher()
+ if self.uselist:
+ d._registry.update(
+ (k, [fn for fn in self._registry[k]]) for k in self._registry
+ )
+ else:
+ d._registry.update(self._registry)
+ return d
+
+
+def not_none(value: Optional[_T]) -> _T:
+ assert value is not None
+ return value