about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/utils.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/utils.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/utils.py739
1 files changed, 739 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/utils.py b/.venv/lib/python3.12/site-packages/fsspec/utils.py
new file mode 100644
index 00000000..917af8d1
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/utils.py
@@ -0,0 +1,739 @@
+from __future__ import annotations
+
+import contextlib
+import logging
+import math
+import os
+import re
+import sys
+import tempfile
+from functools import partial
+from hashlib import md5
+from importlib.metadata import version
+from typing import (
+    IO,
+    TYPE_CHECKING,
+    Any,
+    Callable,
+    Iterable,
+    Iterator,
+    Sequence,
+    TypeVar,
+)
+from urllib.parse import urlsplit
+
+if TYPE_CHECKING:
+    import pathlib
+
+    from typing_extensions import TypeGuard
+
+    from fsspec.spec import AbstractFileSystem
+
+
+DEFAULT_BLOCK_SIZE = 5 * 2**20
+
+T = TypeVar("T")
+
+
+def infer_storage_options(
+    urlpath: str, inherit_storage_options: dict[str, Any] | None = None
+) -> dict[str, Any]:
+    """Infer storage options from URL path and merge it with existing storage
+    options.
+
+    Parameters
+    ----------
+    urlpath: str or unicode
+        Either local absolute file path or URL (hdfs://namenode:8020/file.csv)
+    inherit_storage_options: dict (optional)
+        Its contents will get merged with the inferred information from the
+        given path
+
+    Returns
+    -------
+    Storage options dict.
+
+    Examples
+    --------
+    >>> infer_storage_options('/mnt/datasets/test.csv')  # doctest: +SKIP
+    {"protocol": "file", "path", "/mnt/datasets/test.csv"}
+    >>> infer_storage_options(
+    ...     'hdfs://username:pwd@node:123/mnt/datasets/test.csv?q=1',
+    ...     inherit_storage_options={'extra': 'value'},
+    ... )  # doctest: +SKIP
+    {"protocol": "hdfs", "username": "username", "password": "pwd",
+    "host": "node", "port": 123, "path": "/mnt/datasets/test.csv",
+    "url_query": "q=1", "extra": "value"}
+    """
+    # Handle Windows paths including disk name in this special case
+    if (
+        re.match(r"^[a-zA-Z]:[\\/]", urlpath)
+        or re.match(r"^[a-zA-Z0-9]+://", urlpath) is None
+    ):
+        return {"protocol": "file", "path": urlpath}
+
+    parsed_path = urlsplit(urlpath)
+    protocol = parsed_path.scheme or "file"
+    if parsed_path.fragment:
+        path = "#".join([parsed_path.path, parsed_path.fragment])
+    else:
+        path = parsed_path.path
+    if protocol == "file":
+        # Special case parsing file protocol URL on Windows according to:
+        # https://msdn.microsoft.com/en-us/library/jj710207.aspx
+        windows_path = re.match(r"^/([a-zA-Z])[:|]([\\/].*)$", path)
+        if windows_path:
+            drive, path = windows_path.groups()
+            path = f"{drive}:{path}"
+
+    if protocol in ["http", "https"]:
+        # for HTTP, we don't want to parse, as requests will anyway
+        return {"protocol": protocol, "path": urlpath}
+
+    options: dict[str, Any] = {"protocol": protocol, "path": path}
+
+    if parsed_path.netloc:
+        # Parse `hostname` from netloc manually because `parsed_path.hostname`
+        # lowercases the hostname which is not always desirable (e.g. in S3):
+        # https://github.com/dask/dask/issues/1417
+        options["host"] = parsed_path.netloc.rsplit("@", 1)[-1].rsplit(":", 1)[0]
+
+        if protocol in ("s3", "s3a", "gcs", "gs"):
+            options["path"] = options["host"] + options["path"]
+        else:
+            options["host"] = options["host"]
+        if parsed_path.port:
+            options["port"] = parsed_path.port
+        if parsed_path.username:
+            options["username"] = parsed_path.username
+        if parsed_path.password:
+            options["password"] = parsed_path.password
+
+    if parsed_path.query:
+        options["url_query"] = parsed_path.query
+    if parsed_path.fragment:
+        options["url_fragment"] = parsed_path.fragment
+
+    if inherit_storage_options:
+        update_storage_options(options, inherit_storage_options)
+
+    return options
+
+
+def update_storage_options(
+    options: dict[str, Any], inherited: dict[str, Any] | None = None
+) -> None:
+    if not inherited:
+        inherited = {}
+    collisions = set(options) & set(inherited)
+    if collisions:
+        for collision in collisions:
+            if options.get(collision) != inherited.get(collision):
+                raise KeyError(
+                    f"Collision between inferred and specified storage "
+                    f"option:\n{collision}"
+                )
+    options.update(inherited)
+
+
+# Compression extensions registered via fsspec.compression.register_compression
+compressions: dict[str, str] = {}
+
+
+def infer_compression(filename: str) -> str | None:
+    """Infer compression, if available, from filename.
+
+    Infer a named compression type, if registered and available, from filename
+    extension. This includes builtin (gz, bz2, zip) compressions, as well as
+    optional compressions. See fsspec.compression.register_compression.
+    """
+    extension = os.path.splitext(filename)[-1].strip(".").lower()
+    if extension in compressions:
+        return compressions[extension]
+    return None
+
+
+def build_name_function(max_int: float) -> Callable[[int], str]:
+    """Returns a function that receives a single integer
+    and returns it as a string padded by enough zero characters
+    to align with maximum possible integer
+
+    >>> name_f = build_name_function(57)
+
+    >>> name_f(7)
+    '07'
+    >>> name_f(31)
+    '31'
+    >>> build_name_function(1000)(42)
+    '0042'
+    >>> build_name_function(999)(42)
+    '042'
+    >>> build_name_function(0)(0)
+    '0'
+    """
+    # handle corner cases max_int is 0 or exact power of 10
+    max_int += 1e-8
+
+    pad_length = int(math.ceil(math.log10(max_int)))
+
+    def name_function(i: int) -> str:
+        return str(i).zfill(pad_length)
+
+    return name_function
+
+
+def seek_delimiter(file: IO[bytes], delimiter: bytes, blocksize: int) -> bool:
+    r"""Seek current file to file start, file end, or byte after delimiter seq.
+
+    Seeks file to next chunk delimiter, where chunks are defined on file start,
+    a delimiting sequence, and file end. Use file.tell() to see location afterwards.
+    Note that file start is a valid split, so must be at offset > 0 to seek for
+    delimiter.
+
+    Parameters
+    ----------
+    file: a file
+    delimiter: bytes
+        a delimiter like ``b'\n'`` or message sentinel, matching file .read() type
+    blocksize: int
+        Number of bytes to read from the file at once.
+
+
+    Returns
+    -------
+    Returns True if a delimiter was found, False if at file start or end.
+
+    """
+
+    if file.tell() == 0:
+        # beginning-of-file, return without seek
+        return False
+
+    # Interface is for binary IO, with delimiter as bytes, but initialize last
+    # with result of file.read to preserve compatibility with text IO.
+    last: bytes | None = None
+    while True:
+        current = file.read(blocksize)
+        if not current:
+            # end-of-file without delimiter
+            return False
+        full = last + current if last else current
+        try:
+            if delimiter in full:
+                i = full.index(delimiter)
+                file.seek(file.tell() - (len(full) - i) + len(delimiter))
+                return True
+            elif len(current) < blocksize:
+                # end-of-file without delimiter
+                return False
+        except (OSError, ValueError):
+            pass
+        last = full[-len(delimiter) :]
+
+
+def read_block(
+    f: IO[bytes],
+    offset: int,
+    length: int | None,
+    delimiter: bytes | None = None,
+    split_before: bool = False,
+) -> bytes:
+    """Read a block of bytes from a file
+
+    Parameters
+    ----------
+    f: File
+        Open file
+    offset: int
+        Byte offset to start read
+    length: int
+        Number of bytes to read, read through end of file if None
+    delimiter: bytes (optional)
+        Ensure reading starts and stops at delimiter bytestring
+    split_before: bool (optional)
+        Start/stop read *before* delimiter bytestring.
+
+
+    If using the ``delimiter=`` keyword argument we ensure that the read
+    starts and stops at delimiter boundaries that follow the locations
+    ``offset`` and ``offset + length``.  If ``offset`` is zero then we
+    start at zero, regardless of delimiter.  The bytestring returned WILL
+    include the terminating delimiter string.
+
+    Examples
+    --------
+
+    >>> from io import BytesIO  # doctest: +SKIP
+    >>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300')  # doctest: +SKIP
+    >>> read_block(f, 0, 13)  # doctest: +SKIP
+    b'Alice, 100\\nBo'
+
+    >>> read_block(f, 0, 13, delimiter=b'\\n')  # doctest: +SKIP
+    b'Alice, 100\\nBob, 200\\n'
+
+    >>> read_block(f, 10, 10, delimiter=b'\\n')  # doctest: +SKIP
+    b'Bob, 200\\nCharlie, 300'
+    """
+    if delimiter:
+        f.seek(offset)
+        found_start_delim = seek_delimiter(f, delimiter, 2**16)
+        if length is None:
+            return f.read()
+        start = f.tell()
+        length -= start - offset
+
+        f.seek(start + length)
+        found_end_delim = seek_delimiter(f, delimiter, 2**16)
+        end = f.tell()
+
+        # Adjust split location to before delimiter if seek found the
+        # delimiter sequence, not start or end of file.
+        if found_start_delim and split_before:
+            start -= len(delimiter)
+
+        if found_end_delim and split_before:
+            end -= len(delimiter)
+
+        offset = start
+        length = end - start
+
+    f.seek(offset)
+
+    # TODO: allow length to be None and read to the end of the file?
+    assert length is not None
+    b = f.read(length)
+    return b
+
+
+def tokenize(*args: Any, **kwargs: Any) -> str:
+    """Deterministic token
+
+    (modified from dask.base)
+
+    >>> tokenize([1, 2, '3'])
+    '9d71491b50023b06fc76928e6eddb952'
+
+    >>> tokenize('Hello') == tokenize('Hello')
+    True
+    """
+    if kwargs:
+        args += (kwargs,)
+    try:
+        h = md5(str(args).encode())
+    except ValueError:
+        # FIPS systems: https://github.com/fsspec/filesystem_spec/issues/380
+        h = md5(str(args).encode(), usedforsecurity=False)
+    return h.hexdigest()
+
+
+def stringify_path(filepath: str | os.PathLike[str] | pathlib.Path) -> str:
+    """Attempt to convert a path-like object to a string.
+
+    Parameters
+    ----------
+    filepath: object to be converted
+
+    Returns
+    -------
+    filepath_str: maybe a string version of the object
+
+    Notes
+    -----
+    Objects supporting the fspath protocol are coerced according to its
+    __fspath__ method.
+
+    For backwards compatibility with older Python version, pathlib.Path
+    objects are specially coerced.
+
+    Any other object is passed through unchanged, which includes bytes,
+    strings, buffers, or anything else that's not even path-like.
+    """
+    if isinstance(filepath, str):
+        return filepath
+    elif hasattr(filepath, "__fspath__"):
+        return filepath.__fspath__()
+    elif hasattr(filepath, "path"):
+        return filepath.path
+    else:
+        return filepath  # type: ignore[return-value]
+
+
+def make_instance(
+    cls: Callable[..., T], args: Sequence[Any], kwargs: dict[str, Any]
+) -> T:
+    inst = cls(*args, **kwargs)
+    inst._determine_worker()  # type: ignore[attr-defined]
+    return inst
+
+
+def common_prefix(paths: Iterable[str]) -> str:
+    """For a list of paths, find the shortest prefix common to all"""
+    parts = [p.split("/") for p in paths]
+    lmax = min(len(p) for p in parts)
+    end = 0
+    for i in range(lmax):
+        end = all(p[i] == parts[0][i] for p in parts)
+        if not end:
+            break
+    i += end
+    return "/".join(parts[0][:i])
+
+
+def other_paths(
+    paths: list[str],
+    path2: str | list[str],
+    exists: bool = False,
+    flatten: bool = False,
+) -> list[str]:
+    """In bulk file operations, construct a new file tree from a list of files
+
+    Parameters
+    ----------
+    paths: list of str
+        The input file tree
+    path2: str or list of str
+        Root to construct the new list in. If this is already a list of str, we just
+        assert it has the right number of elements.
+    exists: bool (optional)
+        For a str destination, it is already exists (and is a dir), files should
+        end up inside.
+    flatten: bool (optional)
+        Whether to flatten the input directory tree structure so that the output files
+        are in the same directory.
+
+    Returns
+    -------
+    list of str
+    """
+
+    if isinstance(path2, str):
+        path2 = path2.rstrip("/")
+
+        if flatten:
+            path2 = ["/".join((path2, p.split("/")[-1])) for p in paths]
+        else:
+            cp = common_prefix(paths)
+            if exists:
+                cp = cp.rsplit("/", 1)[0]
+            if not cp and all(not s.startswith("/") for s in paths):
+                path2 = ["/".join([path2, p]) for p in paths]
+            else:
+                path2 = [p.replace(cp, path2, 1) for p in paths]
+    else:
+        assert len(paths) == len(path2)
+    return path2
+
+
+def is_exception(obj: Any) -> bool:
+    return isinstance(obj, BaseException)
+
+
+def isfilelike(f: Any) -> TypeGuard[IO[bytes]]:
+    return all(hasattr(f, attr) for attr in ["read", "close", "tell"])
+
+
+def get_protocol(url: str) -> str:
+    url = stringify_path(url)
+    parts = re.split(r"(\:\:|\://)", url, maxsplit=1)
+    if len(parts) > 1:
+        return parts[0]
+    return "file"
+
+
+def can_be_local(path: str) -> bool:
+    """Can the given URL be used with open_local?"""
+    from fsspec import get_filesystem_class
+
+    try:
+        return getattr(get_filesystem_class(get_protocol(path)), "local_file", False)
+    except (ValueError, ImportError):
+        # not in registry or import failed
+        return False
+
+
+def get_package_version_without_import(name: str) -> str | None:
+    """For given package name, try to find the version without importing it
+
+    Import and package.__version__ is still the backup here, so an import
+    *might* happen.
+
+    Returns either the version string, or None if the package
+    or the version was not readily  found.
+    """
+    if name in sys.modules:
+        mod = sys.modules[name]
+        if hasattr(mod, "__version__"):
+            return mod.__version__
+    try:
+        return version(name)
+    except:  # noqa: E722
+        pass
+    try:
+        import importlib
+
+        mod = importlib.import_module(name)
+        return mod.__version__
+    except (ImportError, AttributeError):
+        return None
+
+
+def setup_logging(
+    logger: logging.Logger | None = None,
+    logger_name: str | None = None,
+    level: str = "DEBUG",
+    clear: bool = True,
+) -> logging.Logger:
+    if logger is None and logger_name is None:
+        raise ValueError("Provide either logger object or logger name")
+    logger = logger or logging.getLogger(logger_name)
+    handle = logging.StreamHandler()
+    formatter = logging.Formatter(
+        "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s -- %(message)s"
+    )
+    handle.setFormatter(formatter)
+    if clear:
+        logger.handlers.clear()
+    logger.addHandler(handle)
+    logger.setLevel(level)
+    return logger
+
+
+def _unstrip_protocol(name: str, fs: AbstractFileSystem) -> str:
+    return fs.unstrip_protocol(name)
+
+
+def mirror_from(
+    origin_name: str, methods: Iterable[str]
+) -> Callable[[type[T]], type[T]]:
+    """Mirror attributes and methods from the given
+    origin_name attribute of the instance to the
+    decorated class"""
+
+    def origin_getter(method: str, self: Any) -> Any:
+        origin = getattr(self, origin_name)
+        return getattr(origin, method)
+
+    def wrapper(cls: type[T]) -> type[T]:
+        for method in methods:
+            wrapped_method = partial(origin_getter, method)
+            setattr(cls, method, property(wrapped_method))
+        return cls
+
+    return wrapper
+
+
+@contextlib.contextmanager
+def nullcontext(obj: T) -> Iterator[T]:
+    yield obj
+
+
+def merge_offset_ranges(
+    paths: list[str],
+    starts: list[int] | int,
+    ends: list[int] | int,
+    max_gap: int = 0,
+    max_block: int | None = None,
+    sort: bool = True,
+) -> tuple[list[str], list[int], list[int]]:
+    """Merge adjacent byte-offset ranges when the inter-range
+    gap is <= `max_gap`, and when the merged byte range does not
+    exceed `max_block` (if specified). By default, this function
+    will re-order the input paths and byte ranges to ensure sorted
+    order. If the user can guarantee that the inputs are already
+    sorted, passing `sort=False` will skip the re-ordering.
+    """
+    # Check input
+    if not isinstance(paths, list):
+        raise TypeError
+    if not isinstance(starts, list):
+        starts = [starts] * len(paths)
+    if not isinstance(ends, list):
+        ends = [ends] * len(paths)
+    if len(starts) != len(paths) or len(ends) != len(paths):
+        raise ValueError
+
+    # Early Return
+    if len(starts) <= 1:
+        return paths, starts, ends
+
+    starts = [s or 0 for s in starts]
+    # Sort by paths and then ranges if `sort=True`
+    if sort:
+        paths, starts, ends = (
+            list(v)
+            for v in zip(
+                *sorted(
+                    zip(paths, starts, ends),
+                )
+            )
+        )
+
+    if paths:
+        # Loop through the coupled `paths`, `starts`, and
+        # `ends`, and merge adjacent blocks when appropriate
+        new_paths = paths[:1]
+        new_starts = starts[:1]
+        new_ends = ends[:1]
+        for i in range(1, len(paths)):
+            if paths[i] == paths[i - 1] and new_ends[-1] is None:
+                continue
+            elif (
+                paths[i] != paths[i - 1]
+                or ((starts[i] - new_ends[-1]) > max_gap)
+                or (max_block is not None and (ends[i] - new_starts[-1]) > max_block)
+            ):
+                # Cannot merge with previous block.
+                # Add new `paths`, `starts`, and `ends` elements
+                new_paths.append(paths[i])
+                new_starts.append(starts[i])
+                new_ends.append(ends[i])
+            else:
+                # Merge with previous block by updating the
+                # last element of `ends`
+                new_ends[-1] = ends[i]
+        return new_paths, new_starts, new_ends
+
+    # `paths` is empty. Just return input lists
+    return paths, starts, ends
+
+
+def file_size(filelike: IO[bytes]) -> int:
+    """Find length of any open read-mode file-like"""
+    pos = filelike.tell()
+    try:
+        return filelike.seek(0, 2)
+    finally:
+        filelike.seek(pos)
+
+
+@contextlib.contextmanager
+def atomic_write(path: str, mode: str = "wb"):
+    """
+    A context manager that opens a temporary file next to `path` and, on exit,
+    replaces `path` with the temporary file, thereby updating `path`
+    atomically.
+    """
+    fd, fn = tempfile.mkstemp(
+        dir=os.path.dirname(path), prefix=os.path.basename(path) + "-"
+    )
+    try:
+        with open(fd, mode) as fp:
+            yield fp
+    except BaseException:
+        with contextlib.suppress(FileNotFoundError):
+            os.unlink(fn)
+        raise
+    else:
+        os.replace(fn, path)
+
+
+def _translate(pat, STAR, QUESTION_MARK):
+    # Copied from: https://github.com/python/cpython/pull/106703.
+    res: list[str] = []
+    add = res.append
+    i, n = 0, len(pat)
+    while i < n:
+        c = pat[i]
+        i = i + 1
+        if c == "*":
+            # compress consecutive `*` into one
+            if (not res) or res[-1] is not STAR:
+                add(STAR)
+        elif c == "?":
+            add(QUESTION_MARK)
+        elif c == "[":
+            j = i
+            if j < n and pat[j] == "!":
+                j = j + 1
+            if j < n and pat[j] == "]":
+                j = j + 1
+            while j < n and pat[j] != "]":
+                j = j + 1
+            if j >= n:
+                add("\\[")
+            else:
+                stuff = pat[i:j]
+                if "-" not in stuff:
+                    stuff = stuff.replace("\\", r"\\")
+                else:
+                    chunks = []
+                    k = i + 2 if pat[i] == "!" else i + 1
+                    while True:
+                        k = pat.find("-", k, j)
+                        if k < 0:
+                            break
+                        chunks.append(pat[i:k])
+                        i = k + 1
+                        k = k + 3
+                    chunk = pat[i:j]
+                    if chunk:
+                        chunks.append(chunk)
+                    else:
+                        chunks[-1] += "-"
+                    # Remove empty ranges -- invalid in RE.
+                    for k in range(len(chunks) - 1, 0, -1):
+                        if chunks[k - 1][-1] > chunks[k][0]:
+                            chunks[k - 1] = chunks[k - 1][:-1] + chunks[k][1:]
+                            del chunks[k]
+                    # Escape backslashes and hyphens for set difference (--).
+                    # Hyphens that create ranges shouldn't be escaped.
+                    stuff = "-".join(
+                        s.replace("\\", r"\\").replace("-", r"\-") for s in chunks
+                    )
+                # Escape set operations (&&, ~~ and ||).
+                stuff = re.sub(r"([&~|])", r"\\\1", stuff)
+                i = j + 1
+                if not stuff:
+                    # Empty range: never match.
+                    add("(?!)")
+                elif stuff == "!":
+                    # Negated empty range: match any character.
+                    add(".")
+                else:
+                    if stuff[0] == "!":
+                        stuff = "^" + stuff[1:]
+                    elif stuff[0] in ("^", "["):
+                        stuff = "\\" + stuff
+                    add(f"[{stuff}]")
+        else:
+            add(re.escape(c))
+    assert i == n
+    return res
+
+
+def glob_translate(pat):
+    # Copied from: https://github.com/python/cpython/pull/106703.
+    # The keyword parameters' values are fixed to:
+    # recursive=True, include_hidden=True, seps=None
+    """Translate a pathname with shell wildcards to a regular expression."""
+    if os.path.altsep:
+        seps = os.path.sep + os.path.altsep
+    else:
+        seps = os.path.sep
+    escaped_seps = "".join(map(re.escape, seps))
+    any_sep = f"[{escaped_seps}]" if len(seps) > 1 else escaped_seps
+    not_sep = f"[^{escaped_seps}]"
+    one_last_segment = f"{not_sep}+"
+    one_segment = f"{one_last_segment}{any_sep}"
+    any_segments = f"(?:.+{any_sep})?"
+    any_last_segments = ".*"
+    results = []
+    parts = re.split(any_sep, pat)
+    last_part_idx = len(parts) - 1
+    for idx, part in enumerate(parts):
+        if part == "*":
+            results.append(one_segment if idx < last_part_idx else one_last_segment)
+            continue
+        if part == "**":
+            results.append(any_segments if idx < last_part_idx else any_last_segments)
+            continue
+        elif "**" in part:
+            raise ValueError(
+                "Invalid pattern: '**' can only be an entire path component"
+            )
+        if part:
+            results.extend(_translate(part, f"{not_sep}*", not_sep))
+        if idx < last_part_idx:
+            results.append(any_sep)
+    res = "".join(results)
+    return rf"(?s:{res})\Z"