about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/core.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/core.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/core.py743
1 files changed, 743 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/core.py b/.venv/lib/python3.12/site-packages/fsspec/core.py
new file mode 100644
index 00000000..1d3c04c6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/core.py
@@ -0,0 +1,743 @@
+from __future__ import annotations
+
+import io
+import logging
+import os
+import re
+from glob import has_magic
+from pathlib import Path
+
+# for backwards compat, we export cache things from here too
+from fsspec.caching import (  # noqa: F401
+    BaseCache,
+    BlockCache,
+    BytesCache,
+    MMapCache,
+    ReadAheadCache,
+    caches,
+)
+from fsspec.compression import compr
+from fsspec.config import conf
+from fsspec.registry import filesystem, get_filesystem_class
+from fsspec.utils import (
+    _unstrip_protocol,
+    build_name_function,
+    infer_compression,
+    stringify_path,
+)
+
+logger = logging.getLogger("fsspec")
+
+
+class OpenFile:
+    """
+    File-like object to be used in a context
+
+    Can layer (buffered) text-mode and compression over any file-system, which
+    are typically binary-only.
+
+    These instances are safe to serialize, as the low-level file object
+    is not created until invoked using ``with``.
+
+    Parameters
+    ----------
+    fs: FileSystem
+        The file system to use for opening the file. Should be a subclass or duck-type
+        with ``fsspec.spec.AbstractFileSystem``
+    path: str
+        Location to open
+    mode: str like 'rb', optional
+        Mode of the opened file
+    compression: str or None, optional
+        Compression to apply
+    encoding: str or None, optional
+        The encoding to use if opened in text mode.
+    errors: str or None, optional
+        How to handle encoding errors if opened in text mode.
+    newline: None or str
+        Passed to TextIOWrapper in text mode, how to handle line endings.
+    autoopen: bool
+        If True, calls open() immediately. Mostly used by pickle
+    pos: int
+        If given and autoopen is True, seek to this location immediately
+    """
+
+    def __init__(
+        self,
+        fs,
+        path,
+        mode="rb",
+        compression=None,
+        encoding=None,
+        errors=None,
+        newline=None,
+    ):
+        self.fs = fs
+        self.path = path
+        self.mode = mode
+        self.compression = get_compression(path, compression)
+        self.encoding = encoding
+        self.errors = errors
+        self.newline = newline
+        self.fobjects = []
+
+    def __reduce__(self):
+        return (
+            OpenFile,
+            (
+                self.fs,
+                self.path,
+                self.mode,
+                self.compression,
+                self.encoding,
+                self.errors,
+                self.newline,
+            ),
+        )
+
+    def __repr__(self):
+        return f"<OpenFile '{self.path}'>"
+
+    def __enter__(self):
+        mode = self.mode.replace("t", "").replace("b", "") + "b"
+
+        try:
+            f = self.fs.open(self.path, mode=mode)
+        except FileNotFoundError as e:
+            if has_magic(self.path):
+                raise FileNotFoundError(
+                    "%s not found. The URL contains glob characters: you maybe needed\n"
+                    "to pass expand=True in fsspec.open() or the storage_options of \n"
+                    "your library. You can also set the config value 'open_expand'\n"
+                    "before import, or fsspec.core.DEFAULT_EXPAND at runtime, to True.",
+                    self.path,
+                ) from e
+            raise
+
+        self.fobjects = [f]
+
+        if self.compression is not None:
+            compress = compr[self.compression]
+            f = compress(f, mode=mode[0])
+            self.fobjects.append(f)
+
+        if "b" not in self.mode:
+            # assume, for example, that 'r' is equivalent to 'rt' as in builtin
+            f = PickleableTextIOWrapper(
+                f, encoding=self.encoding, errors=self.errors, newline=self.newline
+            )
+            self.fobjects.append(f)
+
+        return self.fobjects[-1]
+
+    def __exit__(self, *args):
+        self.close()
+
+    @property
+    def full_name(self):
+        return _unstrip_protocol(self.path, self.fs)
+
+    def open(self):
+        """Materialise this as a real open file without context
+
+        The OpenFile object should be explicitly closed to avoid enclosed file
+        instances persisting. You must, therefore, keep a reference to the OpenFile
+        during the life of the file-like it generates.
+        """
+        return self.__enter__()
+
+    def close(self):
+        """Close all encapsulated file objects"""
+        for f in reversed(self.fobjects):
+            if "r" not in self.mode and not f.closed:
+                f.flush()
+            f.close()
+        self.fobjects.clear()
+
+
+class OpenFiles(list):
+    """List of OpenFile instances
+
+    Can be used in a single context, which opens and closes all of the
+    contained files. Normal list access to get the elements works as
+    normal.
+
+    A special case is made for caching filesystems - the files will
+    be down/uploaded together at the start or end of the context, and
+    this may happen concurrently, if the target filesystem supports it.
+    """
+
+    def __init__(self, *args, mode="rb", fs=None):
+        self.mode = mode
+        self.fs = fs
+        self.files = []
+        super().__init__(*args)
+
+    def __enter__(self):
+        if self.fs is None:
+            raise ValueError("Context has already been used")
+
+        fs = self.fs
+        while True:
+            if hasattr(fs, "open_many"):
+                # check for concurrent cache download; or set up for upload
+                self.files = fs.open_many(self)
+                return self.files
+            if hasattr(fs, "fs") and fs.fs is not None:
+                fs = fs.fs
+            else:
+                break
+        return [s.__enter__() for s in self]
+
+    def __exit__(self, *args):
+        fs = self.fs
+        [s.__exit__(*args) for s in self]
+        if "r" not in self.mode:
+            while True:
+                if hasattr(fs, "open_many"):
+                    # check for concurrent cache upload
+                    fs.commit_many(self.files)
+                    return
+                if hasattr(fs, "fs") and fs.fs is not None:
+                    fs = fs.fs
+                else:
+                    break
+
+    def __getitem__(self, item):
+        out = super().__getitem__(item)
+        if isinstance(item, slice):
+            return OpenFiles(out, mode=self.mode, fs=self.fs)
+        return out
+
+    def __repr__(self):
+        return f"<List of {len(self)} OpenFile instances>"
+
+
+def open_files(
+    urlpath,
+    mode="rb",
+    compression=None,
+    encoding="utf8",
+    errors=None,
+    name_function=None,
+    num=1,
+    protocol=None,
+    newline=None,
+    auto_mkdir=True,
+    expand=True,
+    **kwargs,
+):
+    """Given a path or paths, return a list of ``OpenFile`` objects.
+
+    For writing, a str path must contain the "*" character, which will be filled
+    in by increasing numbers, e.g., "part*" ->  "part1", "part2" if num=2.
+
+    For either reading or writing, can instead provide explicit list of paths.
+
+    Parameters
+    ----------
+    urlpath: string or list
+        Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
+        to read from alternative filesystems. To read from multiple files you
+        can pass a globstring or a list of paths, with the caveat that they
+        must all have the same protocol.
+    mode: 'rb', 'wt', etc.
+    compression: string or None
+        If given, open file using compression codec. Can either be a compression
+        name (a key in ``fsspec.compression.compr``) or "infer" to guess the
+        compression from the filename suffix.
+    encoding: str
+        For text mode only
+    errors: None or str
+        Passed to TextIOWrapper in text mode
+    name_function: function or None
+        if opening a set of files for writing, those files do not yet exist,
+        so we need to generate their names by formatting the urlpath for
+        each sequence number
+    num: int [1]
+        if writing mode, number of files we expect to create (passed to
+        name+function)
+    protocol: str or None
+        If given, overrides the protocol found in the URL.
+    newline: bytes or None
+        Used for line terminator in text mode. If None, uses system default;
+        if blank, uses no translation.
+    auto_mkdir: bool (True)
+        If in write mode, this will ensure the target directory exists before
+        writing, by calling ``fs.mkdirs(exist_ok=True)``.
+    expand: bool
+    **kwargs: dict
+        Extra options that make sense to a particular storage connection, e.g.
+        host, port, username, password, etc.
+
+    Examples
+    --------
+    >>> files = open_files('2015-*-*.csv')  # doctest: +SKIP
+    >>> files = open_files(
+    ...     's3://bucket/2015-*-*.csv.gz', compression='gzip'
+    ... )  # doctest: +SKIP
+
+    Returns
+    -------
+    An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can
+    be used as a single context
+
+    Notes
+    -----
+    For a full list of the available protocols and the implementations that
+    they map across to see the latest online documentation:
+
+    - For implementations built into ``fsspec`` see
+      https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
+    - For implementations in separate packages see
+      https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
+    """
+    fs, fs_token, paths = get_fs_token_paths(
+        urlpath,
+        mode,
+        num=num,
+        name_function=name_function,
+        storage_options=kwargs,
+        protocol=protocol,
+        expand=expand,
+    )
+    if fs.protocol == "file":
+        fs.auto_mkdir = auto_mkdir
+    elif "r" not in mode and auto_mkdir:
+        parents = {fs._parent(path) for path in paths}
+        for parent in parents:
+            try:
+                fs.makedirs(parent, exist_ok=True)
+            except PermissionError:
+                pass
+    return OpenFiles(
+        [
+            OpenFile(
+                fs,
+                path,
+                mode=mode,
+                compression=compression,
+                encoding=encoding,
+                errors=errors,
+                newline=newline,
+            )
+            for path in paths
+        ],
+        mode=mode,
+        fs=fs,
+    )
+
+
+def _un_chain(path, kwargs):
+    # Avoid a circular import
+    from fsspec.implementations.cached import CachingFileSystem
+
+    if "::" in path:
+        x = re.compile(".*[^a-z]+.*")  # test for non protocol-like single word
+        bits = []
+        for p in path.split("::"):
+            if "://" in p or x.match(p):
+                bits.append(p)
+            else:
+                bits.append(p + "://")
+    else:
+        bits = [path]
+    # [[url, protocol, kwargs], ...]
+    out = []
+    previous_bit = None
+    kwargs = kwargs.copy()
+    for bit in reversed(bits):
+        protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file"
+        cls = get_filesystem_class(protocol)
+        extra_kwargs = cls._get_kwargs_from_urls(bit)
+        kws = kwargs.pop(protocol, {})
+        if bit is bits[0]:
+            kws.update(kwargs)
+        kw = dict(
+            **{k: v for k, v in extra_kwargs.items() if k not in kws or v != kws[k]},
+            **kws,
+        )
+        bit = cls._strip_protocol(bit)
+        if "target_protocol" not in kw and issubclass(cls, CachingFileSystem):
+            bit = previous_bit
+        out.append((bit, protocol, kw))
+        previous_bit = bit
+    out.reverse()
+    return out
+
+
+def url_to_fs(url, **kwargs):
+    """
+    Turn fully-qualified and potentially chained URL into filesystem instance
+
+    Parameters
+    ----------
+    url : str
+        The fsspec-compatible URL
+    **kwargs: dict
+        Extra options that make sense to a particular storage connection, e.g.
+        host, port, username, password, etc.
+
+    Returns
+    -------
+    filesystem : FileSystem
+        The new filesystem discovered from ``url`` and created with
+        ``**kwargs``.
+    urlpath : str
+        The file-systems-specific URL for ``url``.
+    """
+    url = stringify_path(url)
+    # non-FS arguments that appear in fsspec.open()
+    # inspect could keep this in sync with open()'s signature
+    known_kwargs = {
+        "compression",
+        "encoding",
+        "errors",
+        "expand",
+        "mode",
+        "name_function",
+        "newline",
+        "num",
+    }
+    kwargs = {k: v for k, v in kwargs.items() if k not in known_kwargs}
+    chain = _un_chain(url, kwargs)
+    inkwargs = {}
+    # Reverse iterate the chain, creating a nested target_* structure
+    for i, ch in enumerate(reversed(chain)):
+        urls, protocol, kw = ch
+        if i == len(chain) - 1:
+            inkwargs = dict(**kw, **inkwargs)
+            continue
+        inkwargs["target_options"] = dict(**kw, **inkwargs)
+        inkwargs["target_protocol"] = protocol
+        inkwargs["fo"] = urls
+    urlpath, protocol, _ = chain[0]
+    fs = filesystem(protocol, **inkwargs)
+    return fs, urlpath
+
+
+DEFAULT_EXPAND = conf.get("open_expand", False)
+
+
+def open(
+    urlpath,
+    mode="rb",
+    compression=None,
+    encoding="utf8",
+    errors=None,
+    protocol=None,
+    newline=None,
+    expand=None,
+    **kwargs,
+):
+    """Given a path or paths, return one ``OpenFile`` object.
+
+    Parameters
+    ----------
+    urlpath: string or list
+        Absolute or relative filepath. Prefix with a protocol like ``s3://``
+        to read from alternative filesystems. Should not include glob
+        character(s).
+    mode: 'rb', 'wt', etc.
+    compression: string or None
+        If given, open file using compression codec. Can either be a compression
+        name (a key in ``fsspec.compression.compr``) or "infer" to guess the
+        compression from the filename suffix.
+    encoding: str
+        For text mode only
+    errors: None or str
+        Passed to TextIOWrapper in text mode
+    protocol: str or None
+        If given, overrides the protocol found in the URL.
+    newline: bytes or None
+        Used for line terminator in text mode. If None, uses system default;
+        if blank, uses no translation.
+    expand: bool or Nonw
+        Whether to regard file paths containing special glob characters as needing
+        expansion (finding the first match) or absolute. Setting False allows using
+        paths which do embed such characters. If None (default), this argument
+        takes its value from the DEFAULT_EXPAND module variable, which takes
+        its initial value from the "open_expand" config value at startup, which will
+        be False if not set.
+    **kwargs: dict
+        Extra options that make sense to a particular storage connection, e.g.
+        host, port, username, password, etc.
+
+    Examples
+    --------
+    >>> openfile = open('2015-01-01.csv')  # doctest: +SKIP
+    >>> openfile = open(
+    ...     's3://bucket/2015-01-01.csv.gz', compression='gzip'
+    ... )  # doctest: +SKIP
+    >>> with openfile as f:
+    ...     df = pd.read_csv(f)  # doctest: +SKIP
+    ...
+
+    Returns
+    -------
+    ``OpenFile`` object.
+
+    Notes
+    -----
+    For a full list of the available protocols and the implementations that
+    they map across to see the latest online documentation:
+
+    - For implementations built into ``fsspec`` see
+      https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
+    - For implementations in separate packages see
+      https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
+    """
+    expand = DEFAULT_EXPAND if expand is None else expand
+    out = open_files(
+        urlpath=[urlpath],
+        mode=mode,
+        compression=compression,
+        encoding=encoding,
+        errors=errors,
+        protocol=protocol,
+        newline=newline,
+        expand=expand,
+        **kwargs,
+    )
+    if not out:
+        raise FileNotFoundError(urlpath)
+    return out[0]
+
+
+def open_local(
+    url: str | list[str] | Path | list[Path],
+    mode: str = "rb",
+    **storage_options: dict,
+) -> str | list[str]:
+    """Open file(s) which can be resolved to local
+
+    For files which either are local, or get downloaded upon open
+    (e.g., by file caching)
+
+    Parameters
+    ----------
+    url: str or list(str)
+    mode: str
+        Must be read mode
+    storage_options:
+        passed on to FS for or used by open_files (e.g., compression)
+    """
+    if "r" not in mode:
+        raise ValueError("Can only ensure local files when reading")
+    of = open_files(url, mode=mode, **storage_options)
+    if not getattr(of[0].fs, "local_file", False):
+        raise ValueError(
+            "open_local can only be used on a filesystem which"
+            " has attribute local_file=True"
+        )
+    with of as files:
+        paths = [f.name for f in files]
+    if (isinstance(url, str) and not has_magic(url)) or isinstance(url, Path):
+        return paths[0]
+    return paths
+
+
+def get_compression(urlpath, compression):
+    if compression == "infer":
+        compression = infer_compression(urlpath)
+    if compression is not None and compression not in compr:
+        raise ValueError(f"Compression type {compression} not supported")
+    return compression
+
+
+def split_protocol(urlpath):
+    """Return protocol, path pair"""
+    urlpath = stringify_path(urlpath)
+    if "://" in urlpath:
+        protocol, path = urlpath.split("://", 1)
+        if len(protocol) > 1:
+            # excludes Windows paths
+            return protocol, path
+    if urlpath.startswith("data:"):
+        return urlpath.split(":", 1)
+    return None, urlpath
+
+
+def strip_protocol(urlpath):
+    """Return only path part of full URL, according to appropriate backend"""
+    protocol, _ = split_protocol(urlpath)
+    cls = get_filesystem_class(protocol)
+    return cls._strip_protocol(urlpath)
+
+
+def expand_paths_if_needed(paths, mode, num, fs, name_function):
+    """Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]``
+    in them (read mode).
+
+    :param paths: list of paths
+    mode: str
+        Mode in which to open files.
+    num: int
+        If opening in writing mode, number of files we expect to create.
+    fs: filesystem object
+    name_function: callable
+        If opening in writing mode, this callable is used to generate path
+        names. Names are generated for each partition by
+        ``urlpath.replace('*', name_function(partition_index))``.
+    :return: list of paths
+    """
+    expanded_paths = []
+    paths = list(paths)
+
+    if "w" in mode:  # read mode
+        if sum(1 for p in paths if "*" in p) > 1:
+            raise ValueError(
+                "When writing data, only one filename mask can be specified."
+            )
+        num = max(num, len(paths))
+
+        for curr_path in paths:
+            if "*" in curr_path:
+                # expand using name_function
+                expanded_paths.extend(_expand_paths(curr_path, name_function, num))
+            else:
+                expanded_paths.append(curr_path)
+        # if we generated more paths that asked for, trim the list
+        if len(expanded_paths) > num:
+            expanded_paths = expanded_paths[:num]
+
+    else:  # read mode
+        for curr_path in paths:
+            if has_magic(curr_path):
+                # expand using glob
+                expanded_paths.extend(fs.glob(curr_path))
+            else:
+                expanded_paths.append(curr_path)
+
+    return expanded_paths
+
+
+def get_fs_token_paths(
+    urlpath,
+    mode="rb",
+    num=1,
+    name_function=None,
+    storage_options=None,
+    protocol=None,
+    expand=True,
+):
+    """Filesystem, deterministic token, and paths from a urlpath and options.
+
+    Parameters
+    ----------
+    urlpath: string or iterable
+        Absolute or relative filepath, URL (may include protocols like
+        ``s3://``), or globstring pointing to data.
+    mode: str, optional
+        Mode in which to open files.
+    num: int, optional
+        If opening in writing mode, number of files we expect to create.
+    name_function: callable, optional
+        If opening in writing mode, this callable is used to generate path
+        names. Names are generated for each partition by
+        ``urlpath.replace('*', name_function(partition_index))``.
+    storage_options: dict, optional
+        Additional keywords to pass to the filesystem class.
+    protocol: str or None
+        To override the protocol specifier in the URL
+    expand: bool
+        Expand string paths for writing, assuming the path is a directory
+    """
+    if isinstance(urlpath, (list, tuple, set)):
+        if not urlpath:
+            raise ValueError("empty urlpath sequence")
+        urlpath0 = stringify_path(next(iter(urlpath)))
+    else:
+        urlpath0 = stringify_path(urlpath)
+    storage_options = storage_options or {}
+    if protocol:
+        storage_options["protocol"] = protocol
+    chain = _un_chain(urlpath0, storage_options or {})
+    inkwargs = {}
+    # Reverse iterate the chain, creating a nested target_* structure
+    for i, ch in enumerate(reversed(chain)):
+        urls, nested_protocol, kw = ch
+        if i == len(chain) - 1:
+            inkwargs = dict(**kw, **inkwargs)
+            continue
+        inkwargs["target_options"] = dict(**kw, **inkwargs)
+        inkwargs["target_protocol"] = nested_protocol
+        inkwargs["fo"] = urls
+    paths, protocol, _ = chain[0]
+    fs = filesystem(protocol, **inkwargs)
+    if isinstance(urlpath, (list, tuple, set)):
+        pchains = [
+            _un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath
+        ]
+        if len({pc[1] for pc in pchains}) > 1:
+            raise ValueError("Protocol mismatch getting fs from %s", urlpath)
+        paths = [pc[0] for pc in pchains]
+    else:
+        paths = fs._strip_protocol(paths)
+    if isinstance(paths, (list, tuple, set)):
+        if expand:
+            paths = expand_paths_if_needed(paths, mode, num, fs, name_function)
+        elif not isinstance(paths, list):
+            paths = list(paths)
+    else:
+        if ("w" in mode or "x" in mode) and expand:
+            paths = _expand_paths(paths, name_function, num)
+        elif "*" in paths:
+            paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)]
+        else:
+            paths = [paths]
+
+    return fs, fs._fs_token, paths
+
+
+def _expand_paths(path, name_function, num):
+    if isinstance(path, str):
+        if path.count("*") > 1:
+            raise ValueError("Output path spec must contain exactly one '*'.")
+        elif "*" not in path:
+            path = os.path.join(path, "*.part")
+
+        if name_function is None:
+            name_function = build_name_function(num - 1)
+
+        paths = [path.replace("*", name_function(i)) for i in range(num)]
+        if paths != sorted(paths):
+            logger.warning(
+                "In order to preserve order between partitions"
+                " paths created with ``name_function`` should "
+                "sort to partition order"
+            )
+    elif isinstance(path, (tuple, list)):
+        assert len(path) == num
+        paths = list(path)
+    else:
+        raise ValueError(
+            "Path should be either\n"
+            "1. A list of paths: ['foo.json', 'bar.json', ...]\n"
+            "2. A directory: 'foo/\n"
+            "3. A path with a '*' in it: 'foo.*.json'"
+        )
+    return paths
+
+
+class PickleableTextIOWrapper(io.TextIOWrapper):
+    """TextIOWrapper cannot be pickled. This solves it.
+
+    Requires that ``buffer`` be pickleable, which all instances of
+    AbstractBufferedFile are.
+    """
+
+    def __init__(
+        self,
+        buffer,
+        encoding=None,
+        errors=None,
+        newline=None,
+        line_buffering=False,
+        write_through=False,
+    ):
+        self.args = buffer, encoding, errors, newline, line_buffering, write_through
+        super().__init__(*self.args)
+
+    def __reduce__(self):
+        return PickleableTextIOWrapper, self.args