diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/alembic/util/messaging.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/alembic/util/messaging.py | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/alembic/util/messaging.py b/.venv/lib/python3.12/site-packages/alembic/util/messaging.py new file mode 100644 index 00000000..a2dbefa6 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/alembic/util/messaging.py @@ -0,0 +1,118 @@ +from __future__ import annotations + +from collections.abc import Iterable +from contextlib import contextmanager +import logging +import sys +import textwrap +from typing import Iterator +from typing import Optional +from typing import TextIO +from typing import Union +import warnings + +from sqlalchemy.engine import url + +log = logging.getLogger(__name__) + +# disable "no handler found" errors +logging.getLogger("alembic").addHandler(logging.NullHandler()) + + +try: + import fcntl + import termios + import struct + + ioctl = fcntl.ioctl(0, termios.TIOCGWINSZ, struct.pack("HHHH", 0, 0, 0, 0)) + _h, TERMWIDTH, _hp, _wp = struct.unpack("HHHH", ioctl) + if TERMWIDTH <= 0: # can occur if running in emacs pseudo-tty + TERMWIDTH = None +except (ImportError, OSError): + TERMWIDTH = None + + +def write_outstream( + stream: TextIO, *text: Union[str, bytes], quiet: bool = False +) -> None: + if quiet: + return + encoding = getattr(stream, "encoding", "ascii") or "ascii" + for t in text: + if not isinstance(t, bytes): + t = t.encode(encoding, "replace") + t = t.decode(encoding) + try: + stream.write(t) + except OSError: + # suppress "broken pipe" errors. + # no known way to handle this on Python 3 however + # as the exception is "ignored" (noisily) in TextIOWrapper. + break + + +@contextmanager +def status( + status_msg: str, newline: bool = False, quiet: bool = False +) -> Iterator[None]: + msg(status_msg + " ...", newline, flush=True, quiet=quiet) + try: + yield + except: + if not quiet: + write_outstream(sys.stdout, " FAILED\n") + raise + else: + if not quiet: + write_outstream(sys.stdout, " done\n") + + +def err(message: str, quiet: bool = False) -> None: + log.error(message) + msg(f"FAILED: {message}", quiet=quiet) + sys.exit(-1) + + +def obfuscate_url_pw(input_url: str) -> str: + return url.make_url(input_url).render_as_string(hide_password=True) + + +def warn(msg: str, stacklevel: int = 2) -> None: + warnings.warn(msg, UserWarning, stacklevel=stacklevel) + + +def msg( + msg: str, newline: bool = True, flush: bool = False, quiet: bool = False +) -> None: + if quiet: + return + if TERMWIDTH is None: + write_outstream(sys.stdout, msg) + if newline: + write_outstream(sys.stdout, "\n") + else: + # left indent output lines + indent = " " + lines = textwrap.wrap( + msg, + TERMWIDTH, + initial_indent=indent, + subsequent_indent=indent, + ) + if len(lines) > 1: + for line in lines[0:-1]: + write_outstream(sys.stdout, line, "\n") + write_outstream(sys.stdout, lines[-1], ("\n" if newline else "")) + if flush: + sys.stdout.flush() + + +def format_as_comma(value: Optional[Union[str, Iterable[str]]]) -> str: + if value is None: + return "" + elif isinstance(value, str): + return value + elif isinstance(value, Iterable): + return ", ".join(value) + else: + raise ValueError("Don't know how to comma-format %r" % value) |