about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/click/_compat.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/click/_compat.py')
-rw-r--r--.venv/lib/python3.12/site-packages/click/_compat.py623
1 files changed, 623 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/click/_compat.py b/.venv/lib/python3.12/site-packages/click/_compat.py
new file mode 100644
index 00000000..9153d150
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/click/_compat.py
@@ -0,0 +1,623 @@
+import codecs
+import io
+import os
+import re
+import sys
+import typing as t
+from weakref import WeakKeyDictionary
+
+CYGWIN = sys.platform.startswith("cygwin")
+WIN = sys.platform.startswith("win")
+auto_wrap_for_ansi: t.Optional[t.Callable[[t.TextIO], t.TextIO]] = None
+_ansi_re = re.compile(r"\033\[[;?0-9]*[a-zA-Z]")
+
+
+def _make_text_stream(
+    stream: t.BinaryIO,
+    encoding: t.Optional[str],
+    errors: t.Optional[str],
+    force_readable: bool = False,
+    force_writable: bool = False,
+) -> t.TextIO:
+    if encoding is None:
+        encoding = get_best_encoding(stream)
+    if errors is None:
+        errors = "replace"
+    return _NonClosingTextIOWrapper(
+        stream,
+        encoding,
+        errors,
+        line_buffering=True,
+        force_readable=force_readable,
+        force_writable=force_writable,
+    )
+
+
+def is_ascii_encoding(encoding: str) -> bool:
+    """Checks if a given encoding is ascii."""
+    try:
+        return codecs.lookup(encoding).name == "ascii"
+    except LookupError:
+        return False
+
+
+def get_best_encoding(stream: t.IO[t.Any]) -> str:
+    """Returns the default stream encoding if not found."""
+    rv = getattr(stream, "encoding", None) or sys.getdefaultencoding()
+    if is_ascii_encoding(rv):
+        return "utf-8"
+    return rv
+
+
+class _NonClosingTextIOWrapper(io.TextIOWrapper):
+    def __init__(
+        self,
+        stream: t.BinaryIO,
+        encoding: t.Optional[str],
+        errors: t.Optional[str],
+        force_readable: bool = False,
+        force_writable: bool = False,
+        **extra: t.Any,
+    ) -> None:
+        self._stream = stream = t.cast(
+            t.BinaryIO, _FixupStream(stream, force_readable, force_writable)
+        )
+        super().__init__(stream, encoding, errors, **extra)
+
+    def __del__(self) -> None:
+        try:
+            self.detach()
+        except Exception:
+            pass
+
+    def isatty(self) -> bool:
+        # https://bitbucket.org/pypy/pypy/issue/1803
+        return self._stream.isatty()
+
+
+class _FixupStream:
+    """The new io interface needs more from streams than streams
+    traditionally implement.  As such, this fix-up code is necessary in
+    some circumstances.
+
+    The forcing of readable and writable flags are there because some tools
+    put badly patched objects on sys (one such offender are certain version
+    of jupyter notebook).
+    """
+
+    def __init__(
+        self,
+        stream: t.BinaryIO,
+        force_readable: bool = False,
+        force_writable: bool = False,
+    ):
+        self._stream = stream
+        self._force_readable = force_readable
+        self._force_writable = force_writable
+
+    def __getattr__(self, name: str) -> t.Any:
+        return getattr(self._stream, name)
+
+    def read1(self, size: int) -> bytes:
+        f = getattr(self._stream, "read1", None)
+
+        if f is not None:
+            return t.cast(bytes, f(size))
+
+        return self._stream.read(size)
+
+    def readable(self) -> bool:
+        if self._force_readable:
+            return True
+        x = getattr(self._stream, "readable", None)
+        if x is not None:
+            return t.cast(bool, x())
+        try:
+            self._stream.read(0)
+        except Exception:
+            return False
+        return True
+
+    def writable(self) -> bool:
+        if self._force_writable:
+            return True
+        x = getattr(self._stream, "writable", None)
+        if x is not None:
+            return t.cast(bool, x())
+        try:
+            self._stream.write("")  # type: ignore
+        except Exception:
+            try:
+                self._stream.write(b"")
+            except Exception:
+                return False
+        return True
+
+    def seekable(self) -> bool:
+        x = getattr(self._stream, "seekable", None)
+        if x is not None:
+            return t.cast(bool, x())
+        try:
+            self._stream.seek(self._stream.tell())
+        except Exception:
+            return False
+        return True
+
+
+def _is_binary_reader(stream: t.IO[t.Any], default: bool = False) -> bool:
+    try:
+        return isinstance(stream.read(0), bytes)
+    except Exception:
+        return default
+        # This happens in some cases where the stream was already
+        # closed.  In this case, we assume the default.
+
+
+def _is_binary_writer(stream: t.IO[t.Any], default: bool = False) -> bool:
+    try:
+        stream.write(b"")
+    except Exception:
+        try:
+            stream.write("")
+            return False
+        except Exception:
+            pass
+        return default
+    return True
+
+
+def _find_binary_reader(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:
+    # We need to figure out if the given stream is already binary.
+    # This can happen because the official docs recommend detaching
+    # the streams to get binary streams.  Some code might do this, so
+    # we need to deal with this case explicitly.
+    if _is_binary_reader(stream, False):
+        return t.cast(t.BinaryIO, stream)
+
+    buf = getattr(stream, "buffer", None)
+
+    # Same situation here; this time we assume that the buffer is
+    # actually binary in case it's closed.
+    if buf is not None and _is_binary_reader(buf, True):
+        return t.cast(t.BinaryIO, buf)
+
+    return None
+
+
+def _find_binary_writer(stream: t.IO[t.Any]) -> t.Optional[t.BinaryIO]:
+    # We need to figure out if the given stream is already binary.
+    # This can happen because the official docs recommend detaching
+    # the streams to get binary streams.  Some code might do this, so
+    # we need to deal with this case explicitly.
+    if _is_binary_writer(stream, False):
+        return t.cast(t.BinaryIO, stream)
+
+    buf = getattr(stream, "buffer", None)
+
+    # Same situation here; this time we assume that the buffer is
+    # actually binary in case it's closed.
+    if buf is not None and _is_binary_writer(buf, True):
+        return t.cast(t.BinaryIO, buf)
+
+    return None
+
+
+def _stream_is_misconfigured(stream: t.TextIO) -> bool:
+    """A stream is misconfigured if its encoding is ASCII."""
+    # If the stream does not have an encoding set, we assume it's set
+    # to ASCII.  This appears to happen in certain unittest
+    # environments.  It's not quite clear what the correct behavior is
+    # but this at least will force Click to recover somehow.
+    return is_ascii_encoding(getattr(stream, "encoding", None) or "ascii")
+
+
+def _is_compat_stream_attr(stream: t.TextIO, attr: str, value: t.Optional[str]) -> bool:
+    """A stream attribute is compatible if it is equal to the
+    desired value or the desired value is unset and the attribute
+    has a value.
+    """
+    stream_value = getattr(stream, attr, None)
+    return stream_value == value or (value is None and stream_value is not None)
+
+
+def _is_compatible_text_stream(
+    stream: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
+) -> bool:
+    """Check if a stream's encoding and errors attributes are
+    compatible with the desired values.
+    """
+    return _is_compat_stream_attr(
+        stream, "encoding", encoding
+    ) and _is_compat_stream_attr(stream, "errors", errors)
+
+
+def _force_correct_text_stream(
+    text_stream: t.IO[t.Any],
+    encoding: t.Optional[str],
+    errors: t.Optional[str],
+    is_binary: t.Callable[[t.IO[t.Any], bool], bool],
+    find_binary: t.Callable[[t.IO[t.Any]], t.Optional[t.BinaryIO]],
+    force_readable: bool = False,
+    force_writable: bool = False,
+) -> t.TextIO:
+    if is_binary(text_stream, False):
+        binary_reader = t.cast(t.BinaryIO, text_stream)
+    else:
+        text_stream = t.cast(t.TextIO, text_stream)
+        # If the stream looks compatible, and won't default to a
+        # misconfigured ascii encoding, return it as-is.
+        if _is_compatible_text_stream(text_stream, encoding, errors) and not (
+            encoding is None and _stream_is_misconfigured(text_stream)
+        ):
+            return text_stream
+
+        # Otherwise, get the underlying binary reader.
+        possible_binary_reader = find_binary(text_stream)
+
+        # If that's not possible, silently use the original reader
+        # and get mojibake instead of exceptions.
+        if possible_binary_reader is None:
+            return text_stream
+
+        binary_reader = possible_binary_reader
+
+    # Default errors to replace instead of strict in order to get
+    # something that works.
+    if errors is None:
+        errors = "replace"
+
+    # Wrap the binary stream in a text stream with the correct
+    # encoding parameters.
+    return _make_text_stream(
+        binary_reader,
+        encoding,
+        errors,
+        force_readable=force_readable,
+        force_writable=force_writable,
+    )
+
+
+def _force_correct_text_reader(
+    text_reader: t.IO[t.Any],
+    encoding: t.Optional[str],
+    errors: t.Optional[str],
+    force_readable: bool = False,
+) -> t.TextIO:
+    return _force_correct_text_stream(
+        text_reader,
+        encoding,
+        errors,
+        _is_binary_reader,
+        _find_binary_reader,
+        force_readable=force_readable,
+    )
+
+
+def _force_correct_text_writer(
+    text_writer: t.IO[t.Any],
+    encoding: t.Optional[str],
+    errors: t.Optional[str],
+    force_writable: bool = False,
+) -> t.TextIO:
+    return _force_correct_text_stream(
+        text_writer,
+        encoding,
+        errors,
+        _is_binary_writer,
+        _find_binary_writer,
+        force_writable=force_writable,
+    )
+
+
+def get_binary_stdin() -> t.BinaryIO:
+    reader = _find_binary_reader(sys.stdin)
+    if reader is None:
+        raise RuntimeError("Was not able to determine binary stream for sys.stdin.")
+    return reader
+
+
+def get_binary_stdout() -> t.BinaryIO:
+    writer = _find_binary_writer(sys.stdout)
+    if writer is None:
+        raise RuntimeError("Was not able to determine binary stream for sys.stdout.")
+    return writer
+
+
+def get_binary_stderr() -> t.BinaryIO:
+    writer = _find_binary_writer(sys.stderr)
+    if writer is None:
+        raise RuntimeError("Was not able to determine binary stream for sys.stderr.")
+    return writer
+
+
+def get_text_stdin(
+    encoding: t.Optional[str] = None, errors: t.Optional[str] = None
+) -> t.TextIO:
+    rv = _get_windows_console_stream(sys.stdin, encoding, errors)
+    if rv is not None:
+        return rv
+    return _force_correct_text_reader(sys.stdin, encoding, errors, force_readable=True)
+
+
+def get_text_stdout(
+    encoding: t.Optional[str] = None, errors: t.Optional[str] = None
+) -> t.TextIO:
+    rv = _get_windows_console_stream(sys.stdout, encoding, errors)
+    if rv is not None:
+        return rv
+    return _force_correct_text_writer(sys.stdout, encoding, errors, force_writable=True)
+
+
+def get_text_stderr(
+    encoding: t.Optional[str] = None, errors: t.Optional[str] = None
+) -> t.TextIO:
+    rv = _get_windows_console_stream(sys.stderr, encoding, errors)
+    if rv is not None:
+        return rv
+    return _force_correct_text_writer(sys.stderr, encoding, errors, force_writable=True)
+
+
+def _wrap_io_open(
+    file: t.Union[str, "os.PathLike[str]", int],
+    mode: str,
+    encoding: t.Optional[str],
+    errors: t.Optional[str],
+) -> t.IO[t.Any]:
+    """Handles not passing ``encoding`` and ``errors`` in binary mode."""
+    if "b" in mode:
+        return open(file, mode)
+
+    return open(file, mode, encoding=encoding, errors=errors)
+
+
+def open_stream(
+    filename: "t.Union[str, os.PathLike[str]]",
+    mode: str = "r",
+    encoding: t.Optional[str] = None,
+    errors: t.Optional[str] = "strict",
+    atomic: bool = False,
+) -> t.Tuple[t.IO[t.Any], bool]:
+    binary = "b" in mode
+    filename = os.fspath(filename)
+
+    # Standard streams first. These are simple because they ignore the
+    # atomic flag. Use fsdecode to handle Path("-").
+    if os.fsdecode(filename) == "-":
+        if any(m in mode for m in ["w", "a", "x"]):
+            if binary:
+                return get_binary_stdout(), False
+            return get_text_stdout(encoding=encoding, errors=errors), False
+        if binary:
+            return get_binary_stdin(), False
+        return get_text_stdin(encoding=encoding, errors=errors), False
+
+    # Non-atomic writes directly go out through the regular open functions.
+    if not atomic:
+        return _wrap_io_open(filename, mode, encoding, errors), True
+
+    # Some usability stuff for atomic writes
+    if "a" in mode:
+        raise ValueError(
+            "Appending to an existing file is not supported, because that"
+            " would involve an expensive `copy`-operation to a temporary"
+            " file. Open the file in normal `w`-mode and copy explicitly"
+            " if that's what you're after."
+        )
+    if "x" in mode:
+        raise ValueError("Use the `overwrite`-parameter instead.")
+    if "w" not in mode:
+        raise ValueError("Atomic writes only make sense with `w`-mode.")
+
+    # Atomic writes are more complicated.  They work by opening a file
+    # as a proxy in the same folder and then using the fdopen
+    # functionality to wrap it in a Python file.  Then we wrap it in an
+    # atomic file that moves the file over on close.
+    import errno
+    import random
+
+    try:
+        perm: t.Optional[int] = os.stat(filename).st_mode
+    except OSError:
+        perm = None
+
+    flags = os.O_RDWR | os.O_CREAT | os.O_EXCL
+
+    if binary:
+        flags |= getattr(os, "O_BINARY", 0)
+
+    while True:
+        tmp_filename = os.path.join(
+            os.path.dirname(filename),
+            f".__atomic-write{random.randrange(1 << 32):08x}",
+        )
+        try:
+            fd = os.open(tmp_filename, flags, 0o666 if perm is None else perm)
+            break
+        except OSError as e:
+            if e.errno == errno.EEXIST or (
+                os.name == "nt"
+                and e.errno == errno.EACCES
+                and os.path.isdir(e.filename)
+                and os.access(e.filename, os.W_OK)
+            ):
+                continue
+            raise
+
+    if perm is not None:
+        os.chmod(tmp_filename, perm)  # in case perm includes bits in umask
+
+    f = _wrap_io_open(fd, mode, encoding, errors)
+    af = _AtomicFile(f, tmp_filename, os.path.realpath(filename))
+    return t.cast(t.IO[t.Any], af), True
+
+
+class _AtomicFile:
+    def __init__(self, f: t.IO[t.Any], tmp_filename: str, real_filename: str) -> None:
+        self._f = f
+        self._tmp_filename = tmp_filename
+        self._real_filename = real_filename
+        self.closed = False
+
+    @property
+    def name(self) -> str:
+        return self._real_filename
+
+    def close(self, delete: bool = False) -> None:
+        if self.closed:
+            return
+        self._f.close()
+        os.replace(self._tmp_filename, self._real_filename)
+        self.closed = True
+
+    def __getattr__(self, name: str) -> t.Any:
+        return getattr(self._f, name)
+
+    def __enter__(self) -> "_AtomicFile":
+        return self
+
+    def __exit__(self, exc_type: t.Optional[t.Type[BaseException]], *_: t.Any) -> None:
+        self.close(delete=exc_type is not None)
+
+    def __repr__(self) -> str:
+        return repr(self._f)
+
+
+def strip_ansi(value: str) -> str:
+    return _ansi_re.sub("", value)
+
+
+def _is_jupyter_kernel_output(stream: t.IO[t.Any]) -> bool:
+    while isinstance(stream, (_FixupStream, _NonClosingTextIOWrapper)):
+        stream = stream._stream
+
+    return stream.__class__.__module__.startswith("ipykernel.")
+
+
+def should_strip_ansi(
+    stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None
+) -> bool:
+    if color is None:
+        if stream is None:
+            stream = sys.stdin
+        return not isatty(stream) and not _is_jupyter_kernel_output(stream)
+    return not color
+
+
+# On Windows, wrap the output streams with colorama to support ANSI
+# color codes.
+# NOTE: double check is needed so mypy does not analyze this on Linux
+if sys.platform.startswith("win") and WIN:
+    from ._winconsole import _get_windows_console_stream
+
+    def _get_argv_encoding() -> str:
+        import locale
+
+        return locale.getpreferredencoding()
+
+    _ansi_stream_wrappers: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
+
+    def auto_wrap_for_ansi(
+        stream: t.TextIO, color: t.Optional[bool] = None
+    ) -> t.TextIO:
+        """Support ANSI color and style codes on Windows by wrapping a
+        stream with colorama.
+        """
+        try:
+            cached = _ansi_stream_wrappers.get(stream)
+        except Exception:
+            cached = None
+
+        if cached is not None:
+            return cached
+
+        import colorama
+
+        strip = should_strip_ansi(stream, color)
+        ansi_wrapper = colorama.AnsiToWin32(stream, strip=strip)
+        rv = t.cast(t.TextIO, ansi_wrapper.stream)
+        _write = rv.write
+
+        def _safe_write(s):
+            try:
+                return _write(s)
+            except BaseException:
+                ansi_wrapper.reset_all()
+                raise
+
+        rv.write = _safe_write
+
+        try:
+            _ansi_stream_wrappers[stream] = rv
+        except Exception:
+            pass
+
+        return rv
+
+else:
+
+    def _get_argv_encoding() -> str:
+        return getattr(sys.stdin, "encoding", None) or sys.getfilesystemencoding()
+
+    def _get_windows_console_stream(
+        f: t.TextIO, encoding: t.Optional[str], errors: t.Optional[str]
+    ) -> t.Optional[t.TextIO]:
+        return None
+
+
+def term_len(x: str) -> int:
+    return len(strip_ansi(x))
+
+
+def isatty(stream: t.IO[t.Any]) -> bool:
+    try:
+        return stream.isatty()
+    except Exception:
+        return False
+
+
+def _make_cached_stream_func(
+    src_func: t.Callable[[], t.Optional[t.TextIO]],
+    wrapper_func: t.Callable[[], t.TextIO],
+) -> t.Callable[[], t.Optional[t.TextIO]]:
+    cache: t.MutableMapping[t.TextIO, t.TextIO] = WeakKeyDictionary()
+
+    def func() -> t.Optional[t.TextIO]:
+        stream = src_func()
+
+        if stream is None:
+            return None
+
+        try:
+            rv = cache.get(stream)
+        except Exception:
+            rv = None
+        if rv is not None:
+            return rv
+        rv = wrapper_func()
+        try:
+            cache[stream] = rv
+        except Exception:
+            pass
+        return rv
+
+    return func
+
+
+_default_text_stdin = _make_cached_stream_func(lambda: sys.stdin, get_text_stdin)
+_default_text_stdout = _make_cached_stream_func(lambda: sys.stdout, get_text_stdout)
+_default_text_stderr = _make_cached_stream_func(lambda: sys.stderr, get_text_stderr)
+
+
+binary_streams: t.Mapping[str, t.Callable[[], t.BinaryIO]] = {
+    "stdin": get_binary_stdin,
+    "stdout": get_binary_stdout,
+    "stderr": get_binary_stderr,
+}
+
+text_streams: t.Mapping[
+    str, t.Callable[[t.Optional[str], t.Optional[str]], t.TextIO]
+] = {
+    "stdin": get_text_stdin,
+    "stdout": get_text_stdout,
+    "stderr": get_text_stderr,
+}