about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-master.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py')
-rw-r--r--.venv/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py2222
1 files changed, 2222 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py b/.venv/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py
new file mode 100644
index 00000000..b4086f1d
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py
@@ -0,0 +1,2222 @@
+# util/langhelpers.py
+# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: https://www.opensource.org/licenses/mit-license.php
+# mypy: allow-untyped-defs, allow-untyped-calls
+
+"""Routines to help with the creation, loading and introspection of
+modules, classes, hierarchies, attributes, functions, and methods.
+
+"""
+from __future__ import annotations
+
+import collections
+import enum
+from functools import update_wrapper
+import inspect
+import itertools
+import operator
+import re
+import sys
+import textwrap
+import threading
+import types
+from types import CodeType
+from typing import Any
+from typing import Callable
+from typing import cast
+from typing import Dict
+from typing import FrozenSet
+from typing import Generic
+from typing import Iterator
+from typing import List
+from typing import Mapping
+from typing import NoReturn
+from typing import Optional
+from typing import overload
+from typing import Sequence
+from typing import Set
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import TypeVar
+from typing import Union
+import warnings
+
+from . import _collections
+from . import compat
+from ._has_cy import HAS_CYEXTENSION
+from .typing import Literal
+from .. import exc
+
+_T = TypeVar("_T")
+_T_co = TypeVar("_T_co", covariant=True)
+_F = TypeVar("_F", bound=Callable[..., Any])
+_MP = TypeVar("_MP", bound="memoized_property[Any]")
+_MA = TypeVar("_MA", bound="HasMemoized.memoized_attribute[Any]")
+_HP = TypeVar("_HP", bound="hybridproperty[Any]")
+_HM = TypeVar("_HM", bound="hybridmethod[Any]")
+
+
+if compat.py310:
+
+    def get_annotations(obj: Any) -> Mapping[str, Any]:
+        return inspect.get_annotations(obj)
+
+else:
+
+    def get_annotations(obj: Any) -> Mapping[str, Any]:
+        # it's been observed that cls.__annotations__ can be non present.
+        # it's not clear what causes this, running under tox py37/38 it
+        # happens, running straight pytest it doesnt
+
+        # https://docs.python.org/3/howto/annotations.html#annotations-howto
+        if isinstance(obj, type):
+            ann = obj.__dict__.get("__annotations__", None)
+        else:
+            ann = getattr(obj, "__annotations__", None)
+
+        if ann is None:
+            return _collections.EMPTY_DICT
+        else:
+            return cast("Mapping[str, Any]", ann)
+
+
+def md5_hex(x: Any) -> str:
+    x = x.encode("utf-8")
+    m = compat.md5_not_for_security()
+    m.update(x)
+    return cast(str, m.hexdigest())
+
+
+class safe_reraise:
+    """Reraise an exception after invoking some
+    handler code.
+
+    Stores the existing exception info before
+    invoking so that it is maintained across a potential
+    coroutine context switch.
+
+    e.g.::
+
+        try:
+            sess.commit()
+        except:
+            with safe_reraise():
+                sess.rollback()
+
+    TODO: we should at some point evaluate current behaviors in this regard
+    based on current greenlet, gevent/eventlet implementations in Python 3, and
+    also see the degree to which our own asyncio (based on greenlet also) is
+    impacted by this. .rollback() will cause IO / context switch to occur in
+    all these scenarios; what happens to the exception context from an
+    "except:" block if we don't explicitly store it? Original issue was #2703.
+
+    """
+
+    __slots__ = ("_exc_info",)
+
+    _exc_info: Union[
+        None,
+        Tuple[
+            Type[BaseException],
+            BaseException,
+            types.TracebackType,
+        ],
+        Tuple[None, None, None],
+    ]
+
+    def __enter__(self) -> None:
+        self._exc_info = sys.exc_info()
+
+    def __exit__(
+        self,
+        type_: Optional[Type[BaseException]],
+        value: Optional[BaseException],
+        traceback: Optional[types.TracebackType],
+    ) -> NoReturn:
+        assert self._exc_info is not None
+        # see #2703 for notes
+        if type_ is None:
+            exc_type, exc_value, exc_tb = self._exc_info
+            assert exc_value is not None
+            self._exc_info = None  # remove potential circular references
+            raise exc_value.with_traceback(exc_tb)
+        else:
+            self._exc_info = None  # remove potential circular references
+            assert value is not None
+            raise value.with_traceback(traceback)
+
+
+def walk_subclasses(cls: Type[_T]) -> Iterator[Type[_T]]:
+    seen: Set[Any] = set()
+
+    stack = [cls]
+    while stack:
+        cls = stack.pop()
+        if cls in seen:
+            continue
+        else:
+            seen.add(cls)
+        stack.extend(cls.__subclasses__())
+        yield cls
+
+
+def string_or_unprintable(element: Any) -> str:
+    if isinstance(element, str):
+        return element
+    else:
+        try:
+            return str(element)
+        except Exception:
+            return "unprintable element %r" % element
+
+
+def clsname_as_plain_name(
+    cls: Type[Any], use_name: Optional[str] = None
+) -> str:
+    name = use_name or cls.__name__
+    return " ".join(n.lower() for n in re.findall(r"([A-Z][a-z]+|SQL)", name))
+
+
+def method_is_overridden(
+    instance_or_cls: Union[Type[Any], object],
+    against_method: Callable[..., Any],
+) -> bool:
+    """Return True if the two class methods don't match."""
+
+    if not isinstance(instance_or_cls, type):
+        current_cls = instance_or_cls.__class__
+    else:
+        current_cls = instance_or_cls
+
+    method_name = against_method.__name__
+
+    current_method: types.MethodType = getattr(current_cls, method_name)
+
+    return current_method != against_method
+
+
+def decode_slice(slc: slice) -> Tuple[Any, ...]:
+    """decode a slice object as sent to __getitem__.
+
+    takes into account the 2.5 __index__() method, basically.
+
+    """
+    ret: List[Any] = []
+    for x in slc.start, slc.stop, slc.step:
+        if hasattr(x, "__index__"):
+            x = x.__index__()
+        ret.append(x)
+    return tuple(ret)
+
+
+def _unique_symbols(used: Sequence[str], *bases: str) -> Iterator[str]:
+    used_set = set(used)
+    for base in bases:
+        pool = itertools.chain(
+            (base,),
+            map(lambda i: base + str(i), range(1000)),
+        )
+        for sym in pool:
+            if sym not in used_set:
+                used_set.add(sym)
+                yield sym
+                break
+        else:
+            raise NameError("exhausted namespace for symbol base %s" % base)
+
+
+def map_bits(fn: Callable[[int], Any], n: int) -> Iterator[Any]:
+    """Call the given function given each nonzero bit from n."""
+
+    while n:
+        b = n & (~n + 1)
+        yield fn(b)
+        n ^= b
+
+
+_Fn = TypeVar("_Fn", bound="Callable[..., Any]")
+
+# this seems to be in flux in recent mypy versions
+
+
+def decorator(target: Callable[..., Any]) -> Callable[[_Fn], _Fn]:
+    """A signature-matching decorator factory."""
+
+    def decorate(fn: _Fn) -> _Fn:
+        if not inspect.isfunction(fn) and not inspect.ismethod(fn):
+            raise Exception("not a decoratable function")
+
+        spec = compat.inspect_getfullargspec(fn)
+        env: Dict[str, Any] = {}
+
+        spec = _update_argspec_defaults_into_env(spec, env)
+
+        names = (
+            tuple(cast("Tuple[str, ...]", spec[0]))
+            + cast("Tuple[str, ...]", spec[1:3])
+            + (fn.__name__,)
+        )
+        targ_name, fn_name = _unique_symbols(names, "target", "fn")
+
+        metadata: Dict[str, Optional[str]] = dict(target=targ_name, fn=fn_name)
+        metadata.update(format_argspec_plus(spec, grouped=False))
+        metadata["name"] = fn.__name__
+
+        if inspect.iscoroutinefunction(fn):
+            metadata["prefix"] = "async "
+            metadata["target_prefix"] = "await "
+        else:
+            metadata["prefix"] = ""
+            metadata["target_prefix"] = ""
+
+        # look for __ positional arguments.  This is a convention in
+        # SQLAlchemy that arguments should be passed positionally
+        # rather than as keyword
+        # arguments.  note that apply_pos doesn't currently work in all cases
+        # such as when a kw-only indicator "*" is present, which is why
+        # we limit the use of this to just that case we can detect.  As we add
+        # more kinds of methods that use @decorator, things may have to
+        # be further improved in this area
+        if "__" in repr(spec[0]):
+            code = (
+                """\
+%(prefix)sdef %(name)s%(grouped_args)s:
+    return %(target_prefix)s%(target)s(%(fn)s, %(apply_pos)s)
+"""
+                % metadata
+            )
+        else:
+            code = (
+                """\
+%(prefix)sdef %(name)s%(grouped_args)s:
+    return %(target_prefix)s%(target)s(%(fn)s, %(apply_kw)s)
+"""
+                % metadata
+            )
+
+        mod = sys.modules[fn.__module__]
+        env.update(vars(mod))
+        env.update({targ_name: target, fn_name: fn, "__name__": fn.__module__})
+
+        decorated = cast(
+            types.FunctionType,
+            _exec_code_in_env(code, env, fn.__name__),
+        )
+        decorated.__defaults__ = getattr(fn, "__func__", fn).__defaults__
+
+        decorated.__wrapped__ = fn  # type: ignore[attr-defined]
+        return update_wrapper(decorated, fn)  # type: ignore[return-value]
+
+    return update_wrapper(decorate, target)  # type: ignore[return-value]
+
+
+def _update_argspec_defaults_into_env(spec, env):
+    """given a FullArgSpec, convert defaults to be symbol names in an env."""
+
+    if spec.defaults:
+        new_defaults = []
+        i = 0
+        for arg in spec.defaults:
+            if type(arg).__module__ not in ("builtins", "__builtin__"):
+                name = "x%d" % i
+                env[name] = arg
+                new_defaults.append(name)
+                i += 1
+            else:
+                new_defaults.append(arg)
+        elem = list(spec)
+        elem[3] = tuple(new_defaults)
+        return compat.FullArgSpec(*elem)
+    else:
+        return spec
+
+
+def _exec_code_in_env(
+    code: Union[str, types.CodeType], env: Dict[str, Any], fn_name: str
+) -> Callable[..., Any]:
+    exec(code, env)
+    return env[fn_name]  # type: ignore[no-any-return]
+
+
+_PF = TypeVar("_PF")
+_TE = TypeVar("_TE")
+
+
+class PluginLoader:
+    def __init__(
+        self, group: str, auto_fn: Optional[Callable[..., Any]] = None
+    ):
+        self.group = group
+        self.impls: Dict[str, Any] = {}
+        self.auto_fn = auto_fn
+
+    def clear(self):
+        self.impls.clear()
+
+    def load(self, name: str) -> Any:
+        if name in self.impls:
+            return self.impls[name]()
+
+        if self.auto_fn:
+            loader = self.auto_fn(name)
+            if loader:
+                self.impls[name] = loader
+                return loader()
+
+        for impl in compat.importlib_metadata_get(self.group):
+            if impl.name == name:
+                self.impls[name] = impl.load
+                return impl.load()
+
+        raise exc.NoSuchModuleError(
+            "Can't load plugin: %s:%s" % (self.group, name)
+        )
+
+    def register(self, name: str, modulepath: str, objname: str) -> None:
+        def load():
+            mod = __import__(modulepath)
+            for token in modulepath.split(".")[1:]:
+                mod = getattr(mod, token)
+            return getattr(mod, objname)
+
+        self.impls[name] = load
+
+
+def _inspect_func_args(fn):
+    try:
+        co_varkeywords = inspect.CO_VARKEYWORDS
+    except AttributeError:
+        # https://docs.python.org/3/library/inspect.html
+        # The flags are specific to CPython, and may not be defined in other
+        # Python implementations. Furthermore, the flags are an implementation
+        # detail, and can be removed or deprecated in future Python releases.
+        spec = compat.inspect_getfullargspec(fn)
+        return spec[0], bool(spec[2])
+    else:
+        # use fn.__code__ plus flags to reduce method call overhead
+        co = fn.__code__
+        nargs = co.co_argcount
+        return (
+            list(co.co_varnames[:nargs]),
+            bool(co.co_flags & co_varkeywords),
+        )
+
+
+@overload
+def get_cls_kwargs(
+    cls: type,
+    *,
+    _set: Optional[Set[str]] = None,
+    raiseerr: Literal[True] = ...,
+) -> Set[str]: ...
+
+
+@overload
+def get_cls_kwargs(
+    cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
+) -> Optional[Set[str]]: ...
+
+
+def get_cls_kwargs(
+    cls: type, *, _set: Optional[Set[str]] = None, raiseerr: bool = False
+) -> Optional[Set[str]]:
+    r"""Return the full set of inherited kwargs for the given `cls`.
+
+    Probes a class's __init__ method, collecting all named arguments.  If the
+    __init__ defines a \**kwargs catch-all, then the constructor is presumed
+    to pass along unrecognized keywords to its base classes, and the
+    collection process is repeated recursively on each of the bases.
+
+    Uses a subset of inspect.getfullargspec() to cut down on method overhead,
+    as this is used within the Core typing system to create copies of type
+    objects which is a performance-sensitive operation.
+
+    No anonymous tuple arguments please !
+
+    """
+    toplevel = _set is None
+    if toplevel:
+        _set = set()
+    assert _set is not None
+
+    ctr = cls.__dict__.get("__init__", False)
+
+    has_init = (
+        ctr
+        and isinstance(ctr, types.FunctionType)
+        and isinstance(ctr.__code__, types.CodeType)
+    )
+
+    if has_init:
+        names, has_kw = _inspect_func_args(ctr)
+        _set.update(names)
+
+        if not has_kw and not toplevel:
+            if raiseerr:
+                raise TypeError(
+                    f"given cls {cls} doesn't have an __init__ method"
+                )
+            else:
+                return None
+    else:
+        has_kw = False
+
+    if not has_init or has_kw:
+        for c in cls.__bases__:
+            if get_cls_kwargs(c, _set=_set) is None:
+                break
+
+    _set.discard("self")
+    return _set
+
+
+def get_func_kwargs(func: Callable[..., Any]) -> List[str]:
+    """Return the set of legal kwargs for the given `func`.
+
+    Uses getargspec so is safe to call for methods, functions,
+    etc.
+
+    """
+
+    return compat.inspect_getfullargspec(func)[0]
+
+
+def get_callable_argspec(
+    fn: Callable[..., Any], no_self: bool = False, _is_init: bool = False
+) -> compat.FullArgSpec:
+    """Return the argument signature for any callable.
+
+    All pure-Python callables are accepted, including
+    functions, methods, classes, objects with __call__;
+    builtins and other edge cases like functools.partial() objects
+    raise a TypeError.
+
+    """
+    if inspect.isbuiltin(fn):
+        raise TypeError("Can't inspect builtin: %s" % fn)
+    elif inspect.isfunction(fn):
+        if _is_init and no_self:
+            spec = compat.inspect_getfullargspec(fn)
+            return compat.FullArgSpec(
+                spec.args[1:],
+                spec.varargs,
+                spec.varkw,
+                spec.defaults,
+                spec.kwonlyargs,
+                spec.kwonlydefaults,
+                spec.annotations,
+            )
+        else:
+            return compat.inspect_getfullargspec(fn)
+    elif inspect.ismethod(fn):
+        if no_self and (_is_init or fn.__self__):
+            spec = compat.inspect_getfullargspec(fn.__func__)
+            return compat.FullArgSpec(
+                spec.args[1:],
+                spec.varargs,
+                spec.varkw,
+                spec.defaults,
+                spec.kwonlyargs,
+                spec.kwonlydefaults,
+                spec.annotations,
+            )
+        else:
+            return compat.inspect_getfullargspec(fn.__func__)
+    elif inspect.isclass(fn):
+        return get_callable_argspec(
+            fn.__init__, no_self=no_self, _is_init=True
+        )
+    elif hasattr(fn, "__func__"):
+        return compat.inspect_getfullargspec(fn.__func__)
+    elif hasattr(fn, "__call__"):
+        if inspect.ismethod(fn.__call__):
+            return get_callable_argspec(fn.__call__, no_self=no_self)
+        else:
+            raise TypeError("Can't inspect callable: %s" % fn)
+    else:
+        raise TypeError("Can't inspect callable: %s" % fn)
+
+
+def format_argspec_plus(
+    fn: Union[Callable[..., Any], compat.FullArgSpec], grouped: bool = True
+) -> Dict[str, Optional[str]]:
+    """Returns a dictionary of formatted, introspected function arguments.
+
+    A enhanced variant of inspect.formatargspec to support code generation.
+
+    fn
+       An inspectable callable or tuple of inspect getargspec() results.
+    grouped
+      Defaults to True; include (parens, around, argument) lists
+
+    Returns:
+
+    args
+      Full inspect.formatargspec for fn
+    self_arg
+      The name of the first positional argument, varargs[0], or None
+      if the function defines no positional arguments.
+    apply_pos
+      args, re-written in calling rather than receiving syntax.  Arguments are
+      passed positionally.
+    apply_kw
+      Like apply_pos, except keyword-ish args are passed as keywords.
+    apply_pos_proxied
+      Like apply_pos but omits the self/cls argument
+
+    Example::
+
+      >>> format_argspec_plus(lambda self, a, b, c=3, **d: 123)
+      {'grouped_args': '(self, a, b, c=3, **d)',
+       'self_arg': 'self',
+       'apply_kw': '(self, a, b, c=c, **d)',
+       'apply_pos': '(self, a, b, c, **d)'}
+
+    """
+    if callable(fn):
+        spec = compat.inspect_getfullargspec(fn)
+    else:
+        spec = fn
+
+    args = compat.inspect_formatargspec(*spec)
+
+    apply_pos = compat.inspect_formatargspec(
+        spec[0], spec[1], spec[2], None, spec[4]
+    )
+
+    if spec[0]:
+        self_arg = spec[0][0]
+
+        apply_pos_proxied = compat.inspect_formatargspec(
+            spec[0][1:], spec[1], spec[2], None, spec[4]
+        )
+
+    elif spec[1]:
+        # I'm not sure what this is
+        self_arg = "%s[0]" % spec[1]
+
+        apply_pos_proxied = apply_pos
+    else:
+        self_arg = None
+        apply_pos_proxied = apply_pos
+
+    num_defaults = 0
+    if spec[3]:
+        num_defaults += len(cast(Tuple[Any], spec[3]))
+    if spec[4]:
+        num_defaults += len(spec[4])
+
+    name_args = spec[0] + spec[4]
+
+    defaulted_vals: Union[List[str], Tuple[()]]
+
+    if num_defaults:
+        defaulted_vals = name_args[0 - num_defaults :]
+    else:
+        defaulted_vals = ()
+
+    apply_kw = compat.inspect_formatargspec(
+        name_args,
+        spec[1],
+        spec[2],
+        defaulted_vals,
+        formatvalue=lambda x: "=" + str(x),
+    )
+
+    if spec[0]:
+        apply_kw_proxied = compat.inspect_formatargspec(
+            name_args[1:],
+            spec[1],
+            spec[2],
+            defaulted_vals,
+            formatvalue=lambda x: "=" + str(x),
+        )
+    else:
+        apply_kw_proxied = apply_kw
+
+    if grouped:
+        return dict(
+            grouped_args=args,
+            self_arg=self_arg,
+            apply_pos=apply_pos,
+            apply_kw=apply_kw,
+            apply_pos_proxied=apply_pos_proxied,
+            apply_kw_proxied=apply_kw_proxied,
+        )
+    else:
+        return dict(
+            grouped_args=args,
+            self_arg=self_arg,
+            apply_pos=apply_pos[1:-1],
+            apply_kw=apply_kw[1:-1],
+            apply_pos_proxied=apply_pos_proxied[1:-1],
+            apply_kw_proxied=apply_kw_proxied[1:-1],
+        )
+
+
+def format_argspec_init(method, grouped=True):
+    """format_argspec_plus with considerations for typical __init__ methods
+
+    Wraps format_argspec_plus with error handling strategies for typical
+    __init__ cases:
+
+    .. sourcecode:: text
+
+      object.__init__ -> (self)
+      other unreflectable (usually C) -> (self, *args, **kwargs)
+
+    """
+    if method is object.__init__:
+        grouped_args = "(self)"
+        args = "(self)" if grouped else "self"
+        proxied = "()" if grouped else ""
+    else:
+        try:
+            return format_argspec_plus(method, grouped=grouped)
+        except TypeError:
+            grouped_args = "(self, *args, **kwargs)"
+            args = grouped_args if grouped else "self, *args, **kwargs"
+            proxied = "(*args, **kwargs)" if grouped else "*args, **kwargs"
+    return dict(
+        self_arg="self",
+        grouped_args=grouped_args,
+        apply_pos=args,
+        apply_kw=args,
+        apply_pos_proxied=proxied,
+        apply_kw_proxied=proxied,
+    )
+
+
+def create_proxy_methods(
+    target_cls: Type[Any],
+    target_cls_sphinx_name: str,
+    proxy_cls_sphinx_name: str,
+    classmethods: Sequence[str] = (),
+    methods: Sequence[str] = (),
+    attributes: Sequence[str] = (),
+    use_intermediate_variable: Sequence[str] = (),
+) -> Callable[[_T], _T]:
+    """A class decorator indicating attributes should refer to a proxy
+    class.
+
+    This decorator is now a "marker" that does nothing at runtime.  Instead,
+    it is consumed by the tools/generate_proxy_methods.py script to
+    statically generate proxy methods and attributes that are fully
+    recognized by typing tools such as mypy.
+
+    """
+
+    def decorate(cls):
+        return cls
+
+    return decorate
+
+
+def getargspec_init(method):
+    """inspect.getargspec with considerations for typical __init__ methods
+
+    Wraps inspect.getargspec with error handling for typical __init__ cases:
+
+    .. sourcecode:: text
+
+      object.__init__ -> (self)
+      other unreflectable (usually C) -> (self, *args, **kwargs)
+
+    """
+    try:
+        return compat.inspect_getfullargspec(method)
+    except TypeError:
+        if method is object.__init__:
+            return (["self"], None, None, None)
+        else:
+            return (["self"], "args", "kwargs", None)
+
+
+def unbound_method_to_callable(func_or_cls):
+    """Adjust the incoming callable such that a 'self' argument is not
+    required.
+
+    """
+
+    if isinstance(func_or_cls, types.MethodType) and not func_or_cls.__self__:
+        return func_or_cls.__func__
+    else:
+        return func_or_cls
+
+
+def generic_repr(
+    obj: Any,
+    additional_kw: Sequence[Tuple[str, Any]] = (),
+    to_inspect: Optional[Union[object, List[object]]] = None,
+    omit_kwarg: Sequence[str] = (),
+) -> str:
+    """Produce a __repr__() based on direct association of the __init__()
+    specification vs. same-named attributes present.
+
+    """
+    if to_inspect is None:
+        to_inspect = [obj]
+    else:
+        to_inspect = _collections.to_list(to_inspect)
+
+    missing = object()
+
+    pos_args = []
+    kw_args: _collections.OrderedDict[str, Any] = _collections.OrderedDict()
+    vargs = None
+    for i, insp in enumerate(to_inspect):
+        try:
+            spec = compat.inspect_getfullargspec(insp.__init__)
+        except TypeError:
+            continue
+        else:
+            default_len = len(spec.defaults) if spec.defaults else 0
+            if i == 0:
+                if spec.varargs:
+                    vargs = spec.varargs
+                if default_len:
+                    pos_args.extend(spec.args[1:-default_len])
+                else:
+                    pos_args.extend(spec.args[1:])
+            else:
+                kw_args.update(
+                    [(arg, missing) for arg in spec.args[1:-default_len]]
+                )
+
+            if default_len:
+                assert spec.defaults
+                kw_args.update(
+                    [
+                        (arg, default)
+                        for arg, default in zip(
+                            spec.args[-default_len:], spec.defaults
+                        )
+                    ]
+                )
+    output: List[str] = []
+
+    output.extend(repr(getattr(obj, arg, None)) for arg in pos_args)
+
+    if vargs is not None and hasattr(obj, vargs):
+        output.extend([repr(val) for val in getattr(obj, vargs)])
+
+    for arg, defval in kw_args.items():
+        if arg in omit_kwarg:
+            continue
+        try:
+            val = getattr(obj, arg, missing)
+            if val is not missing and val != defval:
+                output.append("%s=%r" % (arg, val))
+        except Exception:
+            pass
+
+    if additional_kw:
+        for arg, defval in additional_kw:
+            try:
+                val = getattr(obj, arg, missing)
+                if val is not missing and val != defval:
+                    output.append("%s=%r" % (arg, val))
+            except Exception:
+                pass
+
+    return "%s(%s)" % (obj.__class__.__name__, ", ".join(output))
+
+
+class portable_instancemethod:
+    """Turn an instancemethod into a (parent, name) pair
+    to produce a serializable callable.
+
+    """
+
+    __slots__ = "target", "name", "kwargs", "__weakref__"
+
+    def __getstate__(self):
+        return {
+            "target": self.target,
+            "name": self.name,
+            "kwargs": self.kwargs,
+        }
+
+    def __setstate__(self, state):
+        self.target = state["target"]
+        self.name = state["name"]
+        self.kwargs = state.get("kwargs", ())
+
+    def __init__(self, meth, kwargs=()):
+        self.target = meth.__self__
+        self.name = meth.__name__
+        self.kwargs = kwargs
+
+    def __call__(self, *arg, **kw):
+        kw.update(self.kwargs)
+        return getattr(self.target, self.name)(*arg, **kw)
+
+
+def class_hierarchy(cls):
+    """Return an unordered sequence of all classes related to cls.
+
+    Traverses diamond hierarchies.
+
+    Fibs slightly: subclasses of builtin types are not returned.  Thus
+    class_hierarchy(class A(object)) returns (A, object), not A plus every
+    class systemwide that derives from object.
+
+    """
+
+    hier = {cls}
+    process = list(cls.__mro__)
+    while process:
+        c = process.pop()
+        bases = (_ for _ in c.__bases__ if _ not in hier)
+
+        for b in bases:
+            process.append(b)
+            hier.add(b)
+
+        if c.__module__ == "builtins" or not hasattr(c, "__subclasses__"):
+            continue
+
+        for s in [
+            _
+            for _ in (
+                c.__subclasses__()
+                if not issubclass(c, type)
+                else c.__subclasses__(c)
+            )
+            if _ not in hier
+        ]:
+            process.append(s)
+            hier.add(s)
+    return list(hier)
+
+
+def iterate_attributes(cls):
+    """iterate all the keys and attributes associated
+    with a class, without using getattr().
+
+    Does not use getattr() so that class-sensitive
+    descriptors (i.e. property.__get__()) are not called.
+
+    """
+    keys = dir(cls)
+    for key in keys:
+        for c in cls.__mro__:
+            if key in c.__dict__:
+                yield (key, c.__dict__[key])
+                break
+
+
+def monkeypatch_proxied_specials(
+    into_cls,
+    from_cls,
+    skip=None,
+    only=None,
+    name="self.proxy",
+    from_instance=None,
+):
+    """Automates delegation of __specials__ for a proxying type."""
+
+    if only:
+        dunders = only
+    else:
+        if skip is None:
+            skip = (
+                "__slots__",
+                "__del__",
+                "__getattribute__",
+                "__metaclass__",
+                "__getstate__",
+                "__setstate__",
+            )
+        dunders = [
+            m
+            for m in dir(from_cls)
+            if (
+                m.startswith("__")
+                and m.endswith("__")
+                and not hasattr(into_cls, m)
+                and m not in skip
+            )
+        ]
+
+    for method in dunders:
+        try:
+            maybe_fn = getattr(from_cls, method)
+            if not hasattr(maybe_fn, "__call__"):
+                continue
+            maybe_fn = getattr(maybe_fn, "__func__", maybe_fn)
+            fn = cast(types.FunctionType, maybe_fn)
+
+        except AttributeError:
+            continue
+        try:
+            spec = compat.inspect_getfullargspec(fn)
+            fn_args = compat.inspect_formatargspec(spec[0])
+            d_args = compat.inspect_formatargspec(spec[0][1:])
+        except TypeError:
+            fn_args = "(self, *args, **kw)"
+            d_args = "(*args, **kw)"
+
+        py = (
+            "def %(method)s%(fn_args)s: "
+            "return %(name)s.%(method)s%(d_args)s" % locals()
+        )
+
+        env: Dict[str, types.FunctionType] = (
+            from_instance is not None and {name: from_instance} or {}
+        )
+        exec(py, env)
+        try:
+            env[method].__defaults__ = fn.__defaults__
+        except AttributeError:
+            pass
+        setattr(into_cls, method, env[method])
+
+
+def methods_equivalent(meth1, meth2):
+    """Return True if the two methods are the same implementation."""
+
+    return getattr(meth1, "__func__", meth1) is getattr(
+        meth2, "__func__", meth2
+    )
+
+
+def as_interface(obj, cls=None, methods=None, required=None):
+    """Ensure basic interface compliance for an instance or dict of callables.
+
+    Checks that ``obj`` implements public methods of ``cls`` or has members
+    listed in ``methods``. If ``required`` is not supplied, implementing at
+    least one interface method is sufficient. Methods present on ``obj`` that
+    are not in the interface are ignored.
+
+    If ``obj`` is a dict and ``dict`` does not meet the interface
+    requirements, the keys of the dictionary are inspected. Keys present in
+    ``obj`` that are not in the interface will raise TypeErrors.
+
+    Raises TypeError if ``obj`` does not meet the interface criteria.
+
+    In all passing cases, an object with callable members is returned.  In the
+    simple case, ``obj`` is returned as-is; if dict processing kicks in then
+    an anonymous class is returned.
+
+    obj
+      A type, instance, or dictionary of callables.
+    cls
+      Optional, a type.  All public methods of cls are considered the
+      interface.  An ``obj`` instance of cls will always pass, ignoring
+      ``required``..
+    methods
+      Optional, a sequence of method names to consider as the interface.
+    required
+      Optional, a sequence of mandatory implementations. If omitted, an
+      ``obj`` that provides at least one interface method is considered
+      sufficient.  As a convenience, required may be a type, in which case
+      all public methods of the type are required.
+
+    """
+    if not cls and not methods:
+        raise TypeError("a class or collection of method names are required")
+
+    if isinstance(cls, type) and isinstance(obj, cls):
+        return obj
+
+    interface = set(methods or [m for m in dir(cls) if not m.startswith("_")])
+    implemented = set(dir(obj))
+
+    complies = operator.ge
+    if isinstance(required, type):
+        required = interface
+    elif not required:
+        required = set()
+        complies = operator.gt
+    else:
+        required = set(required)
+
+    if complies(implemented.intersection(interface), required):
+        return obj
+
+    # No dict duck typing here.
+    if not isinstance(obj, dict):
+        qualifier = complies is operator.gt and "any of" or "all of"
+        raise TypeError(
+            "%r does not implement %s: %s"
+            % (obj, qualifier, ", ".join(interface))
+        )
+
+    class AnonymousInterface:
+        """A callable-holding shell."""
+
+    if cls:
+        AnonymousInterface.__name__ = "Anonymous" + cls.__name__
+    found = set()
+
+    for method, impl in dictlike_iteritems(obj):
+        if method not in interface:
+            raise TypeError("%r: unknown in this interface" % method)
+        if not callable(impl):
+            raise TypeError("%r=%r is not callable" % (method, impl))
+        setattr(AnonymousInterface, method, staticmethod(impl))
+        found.add(method)
+
+    if complies(found, required):
+        return AnonymousInterface
+
+    raise TypeError(
+        "dictionary does not contain required keys %s"
+        % ", ".join(required - found)
+    )
+
+
+_GFD = TypeVar("_GFD", bound="generic_fn_descriptor[Any]")
+
+
+class generic_fn_descriptor(Generic[_T_co]):
+    """Descriptor which proxies a function when the attribute is not
+    present in dict
+
+    This superclass is organized in a particular way with "memoized" and
+    "non-memoized" implementation classes that are hidden from type checkers,
+    as Mypy seems to not be able to handle seeing multiple kinds of descriptor
+    classes used for the same attribute.
+
+    """
+
+    fget: Callable[..., _T_co]
+    __doc__: Optional[str]
+    __name__: str
+
+    def __init__(self, fget: Callable[..., _T_co], doc: Optional[str] = None):
+        self.fget = fget
+        self.__doc__ = doc or fget.__doc__
+        self.__name__ = fget.__name__
+
+    @overload
+    def __get__(self: _GFD, obj: None, cls: Any) -> _GFD: ...
+
+    @overload
+    def __get__(self, obj: object, cls: Any) -> _T_co: ...
+
+    def __get__(self: _GFD, obj: Any, cls: Any) -> Union[_GFD, _T_co]:
+        raise NotImplementedError()
+
+    if TYPE_CHECKING:
+
+        def __set__(self, instance: Any, value: Any) -> None: ...
+
+        def __delete__(self, instance: Any) -> None: ...
+
+    def _reset(self, obj: Any) -> None:
+        raise NotImplementedError()
+
+    @classmethod
+    def reset(cls, obj: Any, name: str) -> None:
+        raise NotImplementedError()
+
+
+class _non_memoized_property(generic_fn_descriptor[_T_co]):
+    """a plain descriptor that proxies a function.
+
+    primary rationale is to provide a plain attribute that's
+    compatible with memoized_property which is also recognized as equivalent
+    by mypy.
+
+    """
+
+    if not TYPE_CHECKING:
+
+        def __get__(self, obj, cls):
+            if obj is None:
+                return self
+            return self.fget(obj)
+
+
+class _memoized_property(generic_fn_descriptor[_T_co]):
+    """A read-only @property that is only evaluated once."""
+
+    if not TYPE_CHECKING:
+
+        def __get__(self, obj, cls):
+            if obj is None:
+                return self
+            obj.__dict__[self.__name__] = result = self.fget(obj)
+            return result
+
+    def _reset(self, obj):
+        _memoized_property.reset(obj, self.__name__)
+
+    @classmethod
+    def reset(cls, obj, name):
+        obj.__dict__.pop(name, None)
+
+
+# despite many attempts to get Mypy to recognize an overridden descriptor
+# where one is memoized and the other isn't, there seems to be no reliable
+# way other than completely deceiving the type checker into thinking there
+# is just one single descriptor type everywhere.  Otherwise, if a superclass
+# has non-memoized and subclass has memoized, that requires
+# "class memoized(non_memoized)".  but then if a superclass has memoized and
+# superclass has non-memoized, the class hierarchy of the descriptors
+# would need to be reversed; "class non_memoized(memoized)".  so there's no
+# way to achieve this.
+# additional issues, RO properties:
+# https://github.com/python/mypy/issues/12440
+if TYPE_CHECKING:
+    # allow memoized and non-memoized to be freely mixed by having them
+    # be the same class
+    memoized_property = generic_fn_descriptor
+    non_memoized_property = generic_fn_descriptor
+
+    # for read only situations, mypy only sees @property as read only.
+    # read only is needed when a subtype specializes the return type
+    # of a property, meaning assignment needs to be disallowed
+    ro_memoized_property = property
+    ro_non_memoized_property = property
+
+else:
+    memoized_property = ro_memoized_property = _memoized_property
+    non_memoized_property = ro_non_memoized_property = _non_memoized_property
+
+
+def memoized_instancemethod(fn: _F) -> _F:
+    """Decorate a method memoize its return value.
+
+    Best applied to no-arg methods: memoization is not sensitive to
+    argument values, and will always return the same value even when
+    called with different arguments.
+
+    """
+
+    def oneshot(self, *args, **kw):
+        result = fn(self, *args, **kw)
+
+        def memo(*a, **kw):
+            return result
+
+        memo.__name__ = fn.__name__
+        memo.__doc__ = fn.__doc__
+        self.__dict__[fn.__name__] = memo
+        return result
+
+    return update_wrapper(oneshot, fn)  # type: ignore
+
+
+class HasMemoized:
+    """A mixin class that maintains the names of memoized elements in a
+    collection for easy cache clearing, generative, etc.
+
+    """
+
+    if not TYPE_CHECKING:
+        # support classes that want to have __slots__ with an explicit
+        # slot for __dict__.  not sure if that requires base __slots__ here.
+        __slots__ = ()
+
+    _memoized_keys: FrozenSet[str] = frozenset()
+
+    def _reset_memoizations(self) -> None:
+        for elem in self._memoized_keys:
+            self.__dict__.pop(elem, None)
+
+    def _assert_no_memoizations(self) -> None:
+        for elem in self._memoized_keys:
+            assert elem not in self.__dict__
+
+    def _set_memoized_attribute(self, key: str, value: Any) -> None:
+        self.__dict__[key] = value
+        self._memoized_keys |= {key}
+
+    class memoized_attribute(memoized_property[_T]):
+        """A read-only @property that is only evaluated once.
+
+        :meta private:
+
+        """
+
+        fget: Callable[..., _T]
+        __doc__: Optional[str]
+        __name__: str
+
+        def __init__(self, fget: Callable[..., _T], doc: Optional[str] = None):
+            self.fget = fget
+            self.__doc__ = doc or fget.__doc__
+            self.__name__ = fget.__name__
+
+        @overload
+        def __get__(self: _MA, obj: None, cls: Any) -> _MA: ...
+
+        @overload
+        def __get__(self, obj: Any, cls: Any) -> _T: ...
+
+        def __get__(self, obj, cls):
+            if obj is None:
+                return self
+            obj.__dict__[self.__name__] = result = self.fget(obj)
+            obj._memoized_keys |= {self.__name__}
+            return result
+
+    @classmethod
+    def memoized_instancemethod(cls, fn: _F) -> _F:
+        """Decorate a method memoize its return value.
+
+        :meta private:
+
+        """
+
+        def oneshot(self: Any, *args: Any, **kw: Any) -> Any:
+            result = fn(self, *args, **kw)
+
+            def memo(*a, **kw):
+                return result
+
+            memo.__name__ = fn.__name__
+            memo.__doc__ = fn.__doc__
+            self.__dict__[fn.__name__] = memo
+            self._memoized_keys |= {fn.__name__}
+            return result
+
+        return update_wrapper(oneshot, fn)  # type: ignore
+
+
+if TYPE_CHECKING:
+    HasMemoized_ro_memoized_attribute = property
+else:
+    HasMemoized_ro_memoized_attribute = HasMemoized.memoized_attribute
+
+
+class MemoizedSlots:
+    """Apply memoized items to an object using a __getattr__ scheme.
+
+    This allows the functionality of memoized_property and
+    memoized_instancemethod to be available to a class using __slots__.
+
+    """
+
+    __slots__ = ()
+
+    def _fallback_getattr(self, key):
+        raise AttributeError(key)
+
+    def __getattr__(self, key: str) -> Any:
+        if key.startswith("_memoized_attr_") or key.startswith(
+            "_memoized_method_"
+        ):
+            raise AttributeError(key)
+        # to avoid recursion errors when interacting with other __getattr__
+        # schemes that refer to this one, when testing for memoized method
+        # look at __class__ only rather than going into __getattr__ again.
+        elif hasattr(self.__class__, f"_memoized_attr_{key}"):
+            value = getattr(self, f"_memoized_attr_{key}")()
+            setattr(self, key, value)
+            return value
+        elif hasattr(self.__class__, f"_memoized_method_{key}"):
+            fn = getattr(self, f"_memoized_method_{key}")
+
+            def oneshot(*args, **kw):
+                result = fn(*args, **kw)
+
+                def memo(*a, **kw):
+                    return result
+
+                memo.__name__ = fn.__name__
+                memo.__doc__ = fn.__doc__
+                setattr(self, key, memo)
+                return result
+
+            oneshot.__doc__ = fn.__doc__
+            return oneshot
+        else:
+            return self._fallback_getattr(key)
+
+
+# from paste.deploy.converters
+def asbool(obj: Any) -> bool:
+    if isinstance(obj, str):
+        obj = obj.strip().lower()
+        if obj in ["true", "yes", "on", "y", "t", "1"]:
+            return True
+        elif obj in ["false", "no", "off", "n", "f", "0"]:
+            return False
+        else:
+            raise ValueError("String is not true/false: %r" % obj)
+    return bool(obj)
+
+
+def bool_or_str(*text: str) -> Callable[[str], Union[str, bool]]:
+    """Return a callable that will evaluate a string as
+    boolean, or one of a set of "alternate" string values.
+
+    """
+
+    def bool_or_value(obj: str) -> Union[str, bool]:
+        if obj in text:
+            return obj
+        else:
+            return asbool(obj)
+
+    return bool_or_value
+
+
+def asint(value: Any) -> Optional[int]:
+    """Coerce to integer."""
+
+    if value is None:
+        return value
+    return int(value)
+
+
+def coerce_kw_type(
+    kw: Dict[str, Any],
+    key: str,
+    type_: Type[Any],
+    flexi_bool: bool = True,
+    dest: Optional[Dict[str, Any]] = None,
+) -> None:
+    r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if
+    necessary.  If 'flexi_bool' is True, the string '0' is considered false
+    when coercing to boolean.
+    """
+
+    if dest is None:
+        dest = kw
+
+    if (
+        key in kw
+        and (not isinstance(type_, type) or not isinstance(kw[key], type_))
+        and kw[key] is not None
+    ):
+        if type_ is bool and flexi_bool:
+            dest[key] = asbool(kw[key])
+        else:
+            dest[key] = type_(kw[key])
+
+
+def constructor_key(obj: Any, cls: Type[Any]) -> Tuple[Any, ...]:
+    """Produce a tuple structure that is cacheable using the __dict__ of
+    obj to retrieve values
+
+    """
+    names = get_cls_kwargs(cls)
+    return (cls,) + tuple(
+        (k, obj.__dict__[k]) for k in names if k in obj.__dict__
+    )
+
+
+def constructor_copy(obj: _T, cls: Type[_T], *args: Any, **kw: Any) -> _T:
+    """Instantiate cls using the __dict__ of obj as constructor arguments.
+
+    Uses inspect to match the named arguments of ``cls``.
+
+    """
+
+    names = get_cls_kwargs(cls)
+    kw.update(
+        (k, obj.__dict__[k]) for k in names.difference(kw) if k in obj.__dict__
+    )
+    return cls(*args, **kw)
+
+
+def counter() -> Callable[[], int]:
+    """Return a threadsafe counter function."""
+
+    lock = threading.Lock()
+    counter = itertools.count(1)
+
+    # avoid the 2to3 "next" transformation...
+    def _next():
+        with lock:
+            return next(counter)
+
+    return _next
+
+
+def duck_type_collection(
+    specimen: Any, default: Optional[Type[Any]] = None
+) -> Optional[Type[Any]]:
+    """Given an instance or class, guess if it is or is acting as one of
+    the basic collection types: list, set and dict.  If the __emulates__
+    property is present, return that preferentially.
+    """
+
+    if hasattr(specimen, "__emulates__"):
+        # canonicalize set vs sets.Set to a standard: the builtin set
+        if specimen.__emulates__ is not None and issubclass(
+            specimen.__emulates__, set
+        ):
+            return set
+        else:
+            return specimen.__emulates__  # type: ignore
+
+    isa = issubclass if isinstance(specimen, type) else isinstance
+    if isa(specimen, list):
+        return list
+    elif isa(specimen, set):
+        return set
+    elif isa(specimen, dict):
+        return dict
+
+    if hasattr(specimen, "append"):
+        return list
+    elif hasattr(specimen, "add"):
+        return set
+    elif hasattr(specimen, "set"):
+        return dict
+    else:
+        return default
+
+
+def assert_arg_type(
+    arg: Any, argtype: Union[Tuple[Type[Any], ...], Type[Any]], name: str
+) -> Any:
+    if isinstance(arg, argtype):
+        return arg
+    else:
+        if isinstance(argtype, tuple):
+            raise exc.ArgumentError(
+                "Argument '%s' is expected to be one of type %s, got '%s'"
+                % (name, " or ".join("'%s'" % a for a in argtype), type(arg))
+            )
+        else:
+            raise exc.ArgumentError(
+                "Argument '%s' is expected to be of type '%s', got '%s'"
+                % (name, argtype, type(arg))
+            )
+
+
+def dictlike_iteritems(dictlike):
+    """Return a (key, value) iterator for almost any dict-like object."""
+
+    if hasattr(dictlike, "items"):
+        return list(dictlike.items())
+
+    getter = getattr(dictlike, "__getitem__", getattr(dictlike, "get", None))
+    if getter is None:
+        raise TypeError("Object '%r' is not dict-like" % dictlike)
+
+    if hasattr(dictlike, "iterkeys"):
+
+        def iterator():
+            for key in dictlike.iterkeys():
+                assert getter is not None
+                yield key, getter(key)
+
+        return iterator()
+    elif hasattr(dictlike, "keys"):
+        return iter((key, getter(key)) for key in dictlike.keys())
+    else:
+        raise TypeError("Object '%r' is not dict-like" % dictlike)
+
+
+class classproperty(property):
+    """A decorator that behaves like @property except that operates
+    on classes rather than instances.
+
+    The decorator is currently special when using the declarative
+    module, but note that the
+    :class:`~.sqlalchemy.ext.declarative.declared_attr`
+    decorator should be used for this purpose with declarative.
+
+    """
+
+    fget: Callable[[Any], Any]
+
+    def __init__(self, fget: Callable[[Any], Any], *arg: Any, **kw: Any):
+        super().__init__(fget, *arg, **kw)
+        self.__doc__ = fget.__doc__
+
+    def __get__(self, obj: Any, cls: Optional[type] = None) -> Any:
+        return self.fget(cls)
+
+
+class hybridproperty(Generic[_T]):
+    def __init__(self, func: Callable[..., _T]):
+        self.func = func
+        self.clslevel = func
+
+    def __get__(self, instance: Any, owner: Any) -> _T:
+        if instance is None:
+            clsval = self.clslevel(owner)
+            return clsval
+        else:
+            return self.func(instance)
+
+    def classlevel(self, func: Callable[..., Any]) -> hybridproperty[_T]:
+        self.clslevel = func
+        return self
+
+
+class rw_hybridproperty(Generic[_T]):
+    def __init__(self, func: Callable[..., _T]):
+        self.func = func
+        self.clslevel = func
+        self.setfn: Optional[Callable[..., Any]] = None
+
+    def __get__(self, instance: Any, owner: Any) -> _T:
+        if instance is None:
+            clsval = self.clslevel(owner)
+            return clsval
+        else:
+            return self.func(instance)
+
+    def __set__(self, instance: Any, value: Any) -> None:
+        assert self.setfn is not None
+        self.setfn(instance, value)
+
+    def setter(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
+        self.setfn = func
+        return self
+
+    def classlevel(self, func: Callable[..., Any]) -> rw_hybridproperty[_T]:
+        self.clslevel = func
+        return self
+
+
+class hybridmethod(Generic[_T]):
+    """Decorate a function as cls- or instance- level."""
+
+    def __init__(self, func: Callable[..., _T]):
+        self.func = self.__func__ = func
+        self.clslevel = func
+
+    def __get__(self, instance: Any, owner: Any) -> Callable[..., _T]:
+        if instance is None:
+            return self.clslevel.__get__(owner, owner.__class__)  # type:ignore
+        else:
+            return self.func.__get__(instance, owner)  # type:ignore
+
+    def classlevel(self, func: Callable[..., Any]) -> hybridmethod[_T]:
+        self.clslevel = func
+        return self
+
+
+class symbol(int):
+    """A constant symbol.
+
+    >>> symbol("foo") is symbol("foo")
+    True
+    >>> symbol("foo")
+    <symbol 'foo>
+
+    A slight refinement of the MAGICCOOKIE=object() pattern.  The primary
+    advantage of symbol() is its repr().  They are also singletons.
+
+    Repeated calls of symbol('name') will all return the same instance.
+
+    """
+
+    name: str
+
+    symbols: Dict[str, symbol] = {}
+    _lock = threading.Lock()
+
+    def __new__(
+        cls,
+        name: str,
+        doc: Optional[str] = None,
+        canonical: Optional[int] = None,
+    ) -> symbol:
+        with cls._lock:
+            sym = cls.symbols.get(name)
+            if sym is None:
+                assert isinstance(name, str)
+                if canonical is None:
+                    canonical = hash(name)
+                sym = int.__new__(symbol, canonical)
+                sym.name = name
+                if doc:
+                    sym.__doc__ = doc
+
+                # NOTE: we should ultimately get rid of this global thing,
+                # however, currently it is to support pickling.  The best
+                # change would be when we are on py3.11 at a minimum, we
+                # switch to stdlib enum.IntFlag.
+                cls.symbols[name] = sym
+            else:
+                if canonical and canonical != sym:
+                    raise TypeError(
+                        f"Can't replace canonical symbol for {name!r} "
+                        f"with new int value {canonical}"
+                    )
+            return sym
+
+    def __reduce__(self):
+        return symbol, (self.name, "x", int(self))
+
+    def __str__(self):
+        return repr(self)
+
+    def __repr__(self):
+        return f"symbol({self.name!r})"
+
+
+class _IntFlagMeta(type):
+    def __init__(
+        cls,
+        classname: str,
+        bases: Tuple[Type[Any], ...],
+        dict_: Dict[str, Any],
+        **kw: Any,
+    ) -> None:
+        items: List[symbol]
+        cls._items = items = []
+        for k, v in dict_.items():
+            if re.match(r"^__.*__$", k):
+                continue
+            if isinstance(v, int):
+                sym = symbol(k, canonical=v)
+            elif not k.startswith("_"):
+                raise TypeError("Expected integer values for IntFlag")
+            else:
+                continue
+            setattr(cls, k, sym)
+            items.append(sym)
+
+        cls.__members__ = _collections.immutabledict(
+            {sym.name: sym for sym in items}
+        )
+
+    def __iter__(self) -> Iterator[symbol]:
+        raise NotImplementedError(
+            "iter not implemented to ensure compatibility with "
+            "Python 3.11 IntFlag.  Please use __members__.  See "
+            "https://github.com/python/cpython/issues/99304"
+        )
+
+
+class _FastIntFlag(metaclass=_IntFlagMeta):
+    """An 'IntFlag' copycat that isn't slow when performing bitwise
+    operations.
+
+    the ``FastIntFlag`` class will return ``enum.IntFlag`` under TYPE_CHECKING
+    and ``_FastIntFlag`` otherwise.
+
+    """
+
+
+if TYPE_CHECKING:
+    from enum import IntFlag
+
+    FastIntFlag = IntFlag
+else:
+    FastIntFlag = _FastIntFlag
+
+
+_E = TypeVar("_E", bound=enum.Enum)
+
+
+def parse_user_argument_for_enum(
+    arg: Any,
+    choices: Dict[_E, List[Any]],
+    name: str,
+    resolve_symbol_names: bool = False,
+) -> Optional[_E]:
+    """Given a user parameter, parse the parameter into a chosen value
+    from a list of choice objects, typically Enum values.
+
+    The user argument can be a string name that matches the name of a
+    symbol, or the symbol object itself, or any number of alternate choices
+    such as True/False/ None etc.
+
+    :param arg: the user argument.
+    :param choices: dictionary of enum values to lists of possible
+        entries for each.
+    :param name: name of the argument.   Used in an :class:`.ArgumentError`
+        that is raised if the parameter doesn't match any available argument.
+
+    """
+    for enum_value, choice in choices.items():
+        if arg is enum_value:
+            return enum_value
+        elif resolve_symbol_names and arg == enum_value.name:
+            return enum_value
+        elif arg in choice:
+            return enum_value
+
+    if arg is None:
+        return None
+
+    raise exc.ArgumentError(f"Invalid value for '{name}': {arg!r}")
+
+
+_creation_order = 1
+
+
+def set_creation_order(instance: Any) -> None:
+    """Assign a '_creation_order' sequence to the given instance.
+
+    This allows multiple instances to be sorted in order of creation
+    (typically within a single thread; the counter is not particularly
+    threadsafe).
+
+    """
+    global _creation_order
+    instance._creation_order = _creation_order
+    _creation_order += 1
+
+
+def warn_exception(func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
+    """executes the given function, catches all exceptions and converts to
+    a warning.
+
+    """
+    try:
+        return func(*args, **kwargs)
+    except Exception:
+        warn("%s('%s') ignored" % sys.exc_info()[0:2])
+
+
+def ellipses_string(value, len_=25):
+    try:
+        if len(value) > len_:
+            return "%s..." % value[0:len_]
+        else:
+            return value
+    except TypeError:
+        return value
+
+
+class _hash_limit_string(str):
+    """A string subclass that can only be hashed on a maximum amount
+    of unique values.
+
+    This is used for warnings so that we can send out parameterized warnings
+    without the __warningregistry__ of the module,  or the non-overridable
+    "once" registry within warnings.py, overloading memory,
+
+
+    """
+
+    _hash: int
+
+    def __new__(
+        cls, value: str, num: int, args: Sequence[Any]
+    ) -> _hash_limit_string:
+        interpolated = (value % args) + (
+            " (this warning may be suppressed after %d occurrences)" % num
+        )
+        self = super().__new__(cls, interpolated)
+        self._hash = hash("%s_%d" % (value, hash(interpolated) % num))
+        return self
+
+    def __hash__(self) -> int:
+        return self._hash
+
+    def __eq__(self, other: Any) -> bool:
+        return hash(self) == hash(other)
+
+
+def warn(msg: str, code: Optional[str] = None) -> None:
+    """Issue a warning.
+
+    If msg is a string, :class:`.exc.SAWarning` is used as
+    the category.
+
+    """
+    if code:
+        _warnings_warn(exc.SAWarning(msg, code=code))
+    else:
+        _warnings_warn(msg, exc.SAWarning)
+
+
+def warn_limited(msg: str, args: Sequence[Any]) -> None:
+    """Issue a warning with a parameterized string, limiting the number
+    of registrations.
+
+    """
+    if args:
+        msg = _hash_limit_string(msg, 10, args)
+    _warnings_warn(msg, exc.SAWarning)
+
+
+_warning_tags: Dict[CodeType, Tuple[str, Type[Warning]]] = {}
+
+
+def tag_method_for_warnings(
+    message: str, category: Type[Warning]
+) -> Callable[[_F], _F]:
+    def go(fn):
+        _warning_tags[fn.__code__] = (message, category)
+        return fn
+
+    return go
+
+
+_not_sa_pattern = re.compile(r"^(?:sqlalchemy\.(?!testing)|alembic\.)")
+
+
+def _warnings_warn(
+    message: Union[str, Warning],
+    category: Optional[Type[Warning]] = None,
+    stacklevel: int = 2,
+) -> None:
+    # adjust the given stacklevel to be outside of SQLAlchemy
+    try:
+        frame = sys._getframe(stacklevel)
+    except ValueError:
+        # being called from less than 3 (or given) stacklevels, weird,
+        # but don't crash
+        stacklevel = 0
+    except:
+        # _getframe() doesn't work, weird interpreter issue, weird,
+        # ok, but don't crash
+        stacklevel = 0
+    else:
+        stacklevel_found = warning_tag_found = False
+        while frame is not None:
+            # using __name__ here requires that we have __name__ in the
+            # __globals__ of the decorated string functions we make also.
+            # we generate this using {"__name__": fn.__module__}
+            if not stacklevel_found and not re.match(
+                _not_sa_pattern, frame.f_globals.get("__name__", "")
+            ):
+                # stop incrementing stack level if an out-of-SQLA line
+                # were found.
+                stacklevel_found = True
+
+                # however, for the warning tag thing, we have to keep
+                # scanning up the whole traceback
+
+            if frame.f_code in _warning_tags:
+                warning_tag_found = True
+                (_suffix, _category) = _warning_tags[frame.f_code]
+                category = category or _category
+                message = f"{message} ({_suffix})"
+
+            frame = frame.f_back  # type: ignore[assignment]
+
+            if not stacklevel_found:
+                stacklevel += 1
+            elif stacklevel_found and warning_tag_found:
+                break
+
+    if category is not None:
+        warnings.warn(message, category, stacklevel=stacklevel + 1)
+    else:
+        warnings.warn(message, stacklevel=stacklevel + 1)
+
+
+def only_once(
+    fn: Callable[..., _T], retry_on_exception: bool
+) -> Callable[..., Optional[_T]]:
+    """Decorate the given function to be a no-op after it is called exactly
+    once."""
+
+    once = [fn]
+
+    def go(*arg: Any, **kw: Any) -> Optional[_T]:
+        # strong reference fn so that it isn't garbage collected,
+        # which interferes with the event system's expectations
+        strong_fn = fn  # noqa
+        if once:
+            once_fn = once.pop()
+            try:
+                return once_fn(*arg, **kw)
+            except:
+                if retry_on_exception:
+                    once.insert(0, once_fn)
+                raise
+
+        return None
+
+    return go
+
+
+_SQLA_RE = re.compile(r"sqlalchemy/([a-z_]+/){0,2}[a-z_]+\.py")
+_UNITTEST_RE = re.compile(r"unit(?:2|test2?/)")
+
+
+def chop_traceback(
+    tb: List[str],
+    exclude_prefix: re.Pattern[str] = _UNITTEST_RE,
+    exclude_suffix: re.Pattern[str] = _SQLA_RE,
+) -> List[str]:
+    """Chop extraneous lines off beginning and end of a traceback.
+
+    :param tb:
+      a list of traceback lines as returned by ``traceback.format_stack()``
+
+    :param exclude_prefix:
+      a regular expression object matching lines to skip at beginning of
+      ``tb``
+
+    :param exclude_suffix:
+      a regular expression object matching lines to skip at end of ``tb``
+    """
+    start = 0
+    end = len(tb) - 1
+    while start <= end and exclude_prefix.search(tb[start]):
+        start += 1
+    while start <= end and exclude_suffix.search(tb[end]):
+        end -= 1
+    return tb[start : end + 1]
+
+
+NoneType = type(None)
+
+
+def attrsetter(attrname):
+    code = "def set(obj, value):    obj.%s = value" % attrname
+    env = locals().copy()
+    exec(code, env)
+    return env["set"]
+
+
+_dunders = re.compile("^__.+__$")
+
+
+class TypingOnly:
+    """A mixin class that marks a class as 'typing only', meaning it has
+    absolutely no methods, attributes, or runtime functionality whatsoever.
+
+    """
+
+    __slots__ = ()
+
+    def __init_subclass__(cls) -> None:
+        if TypingOnly in cls.__bases__:
+            remaining = {
+                name for name in cls.__dict__ if not _dunders.match(name)
+            }
+            if remaining:
+                raise AssertionError(
+                    f"Class {cls} directly inherits TypingOnly but has "
+                    f"additional attributes {remaining}."
+                )
+        super().__init_subclass__()
+
+
+class EnsureKWArg:
+    r"""Apply translation of functions to accept \**kw arguments if they
+    don't already.
+
+    Used to ensure cross-compatibility with third party legacy code, for things
+    like compiler visit methods that need to accept ``**kw`` arguments,
+    but may have been copied from old code that didn't accept them.
+
+    """
+
+    ensure_kwarg: str
+    """a regular expression that indicates method names for which the method
+    should accept ``**kw`` arguments.
+
+    The class will scan for methods matching the name template and decorate
+    them if necessary to ensure ``**kw`` parameters are accepted.
+
+    """
+
+    def __init_subclass__(cls) -> None:
+        fn_reg = cls.ensure_kwarg
+        clsdict = cls.__dict__
+        if fn_reg:
+            for key in clsdict:
+                m = re.match(fn_reg, key)
+                if m:
+                    fn = clsdict[key]
+                    spec = compat.inspect_getfullargspec(fn)
+                    if not spec.varkw:
+                        wrapped = cls._wrap_w_kw(fn)
+                        setattr(cls, key, wrapped)
+        super().__init_subclass__()
+
+    @classmethod
+    def _wrap_w_kw(cls, fn: Callable[..., Any]) -> Callable[..., Any]:
+        def wrap(*arg: Any, **kw: Any) -> Any:
+            return fn(*arg)
+
+        return update_wrapper(wrap, fn)
+
+
+def wrap_callable(wrapper, fn):
+    """Augment functools.update_wrapper() to work with objects with
+    a ``__call__()`` method.
+
+    :param fn:
+      object with __call__ method
+
+    """
+    if hasattr(fn, "__name__"):
+        return update_wrapper(wrapper, fn)
+    else:
+        _f = wrapper
+        _f.__name__ = fn.__class__.__name__
+        if hasattr(fn, "__module__"):
+            _f.__module__ = fn.__module__
+
+        if hasattr(fn.__call__, "__doc__") and fn.__call__.__doc__:
+            _f.__doc__ = fn.__call__.__doc__
+        elif fn.__doc__:
+            _f.__doc__ = fn.__doc__
+
+        return _f
+
+
+def quoted_token_parser(value):
+    """Parse a dotted identifier with accommodation for quoted names.
+
+    Includes support for SQL-style double quotes as a literal character.
+
+    E.g.::
+
+        >>> quoted_token_parser("name")
+        ["name"]
+        >>> quoted_token_parser("schema.name")
+        ["schema", "name"]
+        >>> quoted_token_parser('"Schema"."Name"')
+        ['Schema', 'Name']
+        >>> quoted_token_parser('"Schema"."Name""Foo"')
+        ['Schema', 'Name""Foo']
+
+    """
+
+    if '"' not in value:
+        return value.split(".")
+
+    # 0 = outside of quotes
+    # 1 = inside of quotes
+    state = 0
+    result: List[List[str]] = [[]]
+    idx = 0
+    lv = len(value)
+    while idx < lv:
+        char = value[idx]
+        if char == '"':
+            if state == 1 and idx < lv - 1 and value[idx + 1] == '"':
+                result[-1].append('"')
+                idx += 1
+            else:
+                state ^= 1
+        elif char == "." and state == 0:
+            result.append([])
+        else:
+            result[-1].append(char)
+        idx += 1
+
+    return ["".join(token) for token in result]
+
+
+def add_parameter_text(params: Any, text: str) -> Callable[[_F], _F]:
+    params = _collections.to_list(params)
+
+    def decorate(fn):
+        doc = fn.__doc__ is not None and fn.__doc__ or ""
+        if doc:
+            doc = inject_param_text(doc, {param: text for param in params})
+        fn.__doc__ = doc
+        return fn
+
+    return decorate
+
+
+def _dedent_docstring(text: str) -> str:
+    split_text = text.split("\n", 1)
+    if len(split_text) == 1:
+        return text
+    else:
+        firstline, remaining = split_text
+    if not firstline.startswith(" "):
+        return firstline + "\n" + textwrap.dedent(remaining)
+    else:
+        return textwrap.dedent(text)
+
+
+def inject_docstring_text(
+    given_doctext: Optional[str], injecttext: str, pos: int
+) -> str:
+    doctext: str = _dedent_docstring(given_doctext or "")
+    lines = doctext.split("\n")
+    if len(lines) == 1:
+        lines.append("")
+    injectlines = textwrap.dedent(injecttext).split("\n")
+    if injectlines[0]:
+        injectlines.insert(0, "")
+
+    blanks = [num for num, line in enumerate(lines) if not line.strip()]
+    blanks.insert(0, 0)
+
+    inject_pos = blanks[min(pos, len(blanks) - 1)]
+
+    lines = lines[0:inject_pos] + injectlines + lines[inject_pos:]
+    return "\n".join(lines)
+
+
+_param_reg = re.compile(r"(\s+):param (.+?):")
+
+
+def inject_param_text(doctext: str, inject_params: Dict[str, str]) -> str:
+    doclines = collections.deque(doctext.splitlines())
+    lines = []
+
+    # TODO: this is not working for params like ":param case_sensitive=True:"
+
+    to_inject = None
+    while doclines:
+        line = doclines.popleft()
+
+        m = _param_reg.match(line)
+
+        if to_inject is None:
+            if m:
+                param = m.group(2).lstrip("*")
+                if param in inject_params:
+                    # default indent to that of :param: plus one
+                    indent = " " * len(m.group(1)) + " "
+
+                    # but if the next line has text, use that line's
+                    # indentation
+                    if doclines:
+                        m2 = re.match(r"(\s+)\S", doclines[0])
+                        if m2:
+                            indent = " " * len(m2.group(1))
+
+                    to_inject = indent + inject_params[param]
+        elif m:
+            lines.extend(["\n", to_inject, "\n"])
+            to_inject = None
+        elif not line.rstrip():
+            lines.extend([line, to_inject, "\n"])
+            to_inject = None
+        elif line.endswith("::"):
+            # TODO: this still won't cover if the code example itself has
+            # blank lines in it, need to detect those via indentation.
+            lines.extend([line, doclines.popleft()])
+            continue
+        lines.append(line)
+
+    return "\n".join(lines)
+
+
+def repr_tuple_names(names: List[str]) -> Optional[str]:
+    """Trims a list of strings from the middle and return a string of up to
+    four elements. Strings greater than 11 characters will be truncated"""
+    if len(names) == 0:
+        return None
+    flag = len(names) <= 4
+    names = names[0:4] if flag else names[0:3] + names[-1:]
+    res = ["%s.." % name[:11] if len(name) > 11 else name for name in names]
+    if flag:
+        return ", ".join(res)
+    else:
+        return "%s, ..., %s" % (", ".join(res[0:3]), res[-1])
+
+
+def has_compiled_ext(raise_=False):
+    if HAS_CYEXTENSION:
+        return True
+    elif raise_:
+        raise ImportError(
+            "cython extensions were expected to be installed, "
+            "but are not present"
+        )
+    else:
+        return False
+
+
+class _Missing(enum.Enum):
+    Missing = enum.auto()
+
+
+Missing = _Missing.Missing
+MissingOr = Union[_T, Literal[_Missing.Missing]]