about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/spec.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/spec.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/spec.py2242
1 files changed, 2242 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/spec.py b/.venv/lib/python3.12/site-packages/fsspec/spec.py
new file mode 100644
index 00000000..7daf850e
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/spec.py
@@ -0,0 +1,2242 @@
+from __future__ import annotations
+
+import io
+import json
+import logging
+import os
+import threading
+import warnings
+import weakref
+from errno import ESPIPE
+from glob import has_magic
+from hashlib import sha256
+from typing import Any, ClassVar
+
+from .callbacks import DEFAULT_CALLBACK
+from .config import apply_config, conf
+from .dircache import DirCache
+from .transaction import Transaction
+from .utils import (
+    _unstrip_protocol,
+    glob_translate,
+    isfilelike,
+    other_paths,
+    read_block,
+    stringify_path,
+    tokenize,
+)
+
+logger = logging.getLogger("fsspec")
+
+
+def make_instance(cls, args, kwargs):
+    return cls(*args, **kwargs)
+
+
+class _Cached(type):
+    """
+    Metaclass for caching file system instances.
+
+    Notes
+    -----
+    Instances are cached according to
+
+    * The values of the class attributes listed in `_extra_tokenize_attributes`
+    * The arguments passed to ``__init__``.
+
+    This creates an additional reference to the filesystem, which prevents the
+    filesystem from being garbage collected when all *user* references go away.
+    A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also*
+    be made for a filesystem instance to be garbage collected.
+    """
+
+    def __init__(cls, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        # Note: we intentionally create a reference here, to avoid garbage
+        # collecting instances when all other references are gone. To really
+        # delete a FileSystem, the cache must be cleared.
+        if conf.get("weakref_instance_cache"):  # pragma: no cover
+            # debug option for analysing fork/spawn conditions
+            cls._cache = weakref.WeakValueDictionary()
+        else:
+            cls._cache = {}
+        cls._pid = os.getpid()
+
+    def __call__(cls, *args, **kwargs):
+        kwargs = apply_config(cls, kwargs)
+        extra_tokens = tuple(
+            getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes
+        )
+        token = tokenize(
+            cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
+        )
+        skip = kwargs.pop("skip_instance_cache", False)
+        if os.getpid() != cls._pid:
+            cls._cache.clear()
+            cls._pid = os.getpid()
+        if not skip and cls.cachable and token in cls._cache:
+            cls._latest = token
+            return cls._cache[token]
+        else:
+            obj = super().__call__(*args, **kwargs)
+            # Setting _fs_token here causes some static linters to complain.
+            obj._fs_token_ = token
+            obj.storage_args = args
+            obj.storage_options = kwargs
+            if obj.async_impl and obj.mirror_sync_methods:
+                from .asyn import mirror_sync_methods
+
+                mirror_sync_methods(obj)
+
+            if cls.cachable and not skip:
+                cls._latest = token
+                cls._cache[token] = obj
+            return obj
+
+
+class AbstractFileSystem(metaclass=_Cached):
+    """
+    An abstract super-class for pythonic file-systems
+
+    Implementations are expected to be compatible with or, better, subclass
+    from here.
+    """
+
+    cachable = True  # this class can be cached, instances reused
+    _cached = False
+    blocksize = 2**22
+    sep = "/"
+    protocol: ClassVar[str | tuple[str, ...]] = "abstract"
+    _latest = None
+    async_impl = False
+    mirror_sync_methods = False
+    root_marker = ""  # For some FSs, may require leading '/' or other character
+    transaction_type = Transaction
+
+    #: Extra *class attributes* that should be considered when hashing.
+    _extra_tokenize_attributes = ()
+
+    # Set by _Cached metaclass
+    storage_args: tuple[Any, ...]
+    storage_options: dict[str, Any]
+
+    def __init__(self, *args, **storage_options):
+        """Create and configure file-system instance
+
+        Instances may be cachable, so if similar enough arguments are seen
+        a new instance is not required. The token attribute exists to allow
+        implementations to cache instances if they wish.
+
+        A reasonable default should be provided if there are no arguments.
+
+        Subclasses should call this method.
+
+        Parameters
+        ----------
+        use_listings_cache, listings_expiry_time, max_paths:
+            passed to ``DirCache``, if the implementation supports
+            directory listing caching. Pass use_listings_cache=False
+            to disable such caching.
+        skip_instance_cache: bool
+            If this is a cachable implementation, pass True here to force
+            creating a new instance even if a matching instance exists, and prevent
+            storing this instance.
+        asynchronous: bool
+        loop: asyncio-compatible IOLoop or None
+        """
+        if self._cached:
+            # reusing instance, don't change
+            return
+        self._cached = True
+        self._intrans = False
+        self._transaction = None
+        self._invalidated_caches_in_transaction = []
+        self.dircache = DirCache(**storage_options)
+
+        if storage_options.pop("add_docs", None):
+            warnings.warn("add_docs is no longer supported.", FutureWarning)
+
+        if storage_options.pop("add_aliases", None):
+            warnings.warn("add_aliases has been removed.", FutureWarning)
+        # This is set in _Cached
+        self._fs_token_ = None
+
+    @property
+    def fsid(self):
+        """Persistent filesystem id that can be used to compare filesystems
+        across sessions.
+        """
+        raise NotImplementedError
+
+    @property
+    def _fs_token(self):
+        return self._fs_token_
+
+    def __dask_tokenize__(self):
+        return self._fs_token
+
+    def __hash__(self):
+        return int(self._fs_token, 16)
+
+    def __eq__(self, other):
+        return isinstance(other, type(self)) and self._fs_token == other._fs_token
+
+    def __reduce__(self):
+        return make_instance, (type(self), self.storage_args, self.storage_options)
+
+    @classmethod
+    def _strip_protocol(cls, path):
+        """Turn path from fully-qualified to file-system-specific
+
+        May require FS-specific handling, e.g., for relative paths or links.
+        """
+        if isinstance(path, list):
+            return [cls._strip_protocol(p) for p in path]
+        path = stringify_path(path)
+        protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
+        for protocol in protos:
+            if path.startswith(protocol + "://"):
+                path = path[len(protocol) + 3 :]
+            elif path.startswith(protocol + "::"):
+                path = path[len(protocol) + 2 :]
+        path = path.rstrip("/")
+        # use of root_marker to make minimum required path, e.g., "/"
+        return path or cls.root_marker
+
+    def unstrip_protocol(self, name: str) -> str:
+        """Format FS-specific path to generic, including protocol"""
+        protos = (self.protocol,) if isinstance(self.protocol, str) else self.protocol
+        for protocol in protos:
+            if name.startswith(f"{protocol}://"):
+                return name
+        return f"{protos[0]}://{name}"
+
+    @staticmethod
+    def _get_kwargs_from_urls(path):
+        """If kwargs can be encoded in the paths, extract them here
+
+        This should happen before instantiation of the class; incoming paths
+        then should be amended to strip the options in methods.
+
+        Examples may look like an sftp path "sftp://user@host:/my/path", where
+        the user and host should become kwargs and later get stripped.
+        """
+        # by default, nothing happens
+        return {}
+
+    @classmethod
+    def current(cls):
+        """Return the most recently instantiated FileSystem
+
+        If no instance has been created, then create one with defaults
+        """
+        if cls._latest in cls._cache:
+            return cls._cache[cls._latest]
+        return cls()
+
+    @property
+    def transaction(self):
+        """A context within which files are committed together upon exit
+
+        Requires the file class to implement `.commit()` and `.discard()`
+        for the normal and exception cases.
+        """
+        if self._transaction is None:
+            self._transaction = self.transaction_type(self)
+        return self._transaction
+
+    def start_transaction(self):
+        """Begin write transaction for deferring files, non-context version"""
+        self._intrans = True
+        self._transaction = self.transaction_type(self)
+        return self.transaction
+
+    def end_transaction(self):
+        """Finish write transaction, non-context version"""
+        self.transaction.complete()
+        self._transaction = None
+        # The invalid cache must be cleared after the transaction is completed.
+        for path in self._invalidated_caches_in_transaction:
+            self.invalidate_cache(path)
+        self._invalidated_caches_in_transaction.clear()
+
+    def invalidate_cache(self, path=None):
+        """
+        Discard any cached directory information
+
+        Parameters
+        ----------
+        path: string or None
+            If None, clear all listings cached else listings at or under given
+            path.
+        """
+        # Not necessary to implement invalidation mechanism, may have no cache.
+        # But if have, you should call this method of parent class from your
+        # subclass to ensure expiring caches after transacations correctly.
+        # See the implementation of FTPFileSystem in ftp.py
+        if self._intrans:
+            self._invalidated_caches_in_transaction.append(path)
+
+    def mkdir(self, path, create_parents=True, **kwargs):
+        """
+        Create directory entry at path
+
+        For systems that don't have true directories, may create an for
+        this instance only and not touch the real filesystem
+
+        Parameters
+        ----------
+        path: str
+            location
+        create_parents: bool
+            if True, this is equivalent to ``makedirs``
+        kwargs:
+            may be permissions, etc.
+        """
+        pass  # not necessary to implement, may not have directories
+
+    def makedirs(self, path, exist_ok=False):
+        """Recursively make directories
+
+        Creates directory at path and any intervening required directories.
+        Raises exception if, for instance, the path already exists but is a
+        file.
+
+        Parameters
+        ----------
+        path: str
+            leaf directory name
+        exist_ok: bool (False)
+            If False, will error if the target already exists
+        """
+        pass  # not necessary to implement, may not have directories
+
+    def rmdir(self, path):
+        """Remove a directory, if empty"""
+        pass  # not necessary to implement, may not have directories
+
+    def ls(self, path, detail=True, **kwargs):
+        """List objects at path.
+
+        This should include subdirectories and files at that location. The
+        difference between a file and a directory must be clear when details
+        are requested.
+
+        The specific keys, or perhaps a FileInfo class, or similar, is TBD,
+        but must be consistent across implementations.
+        Must include:
+
+        - full path to the entry (without protocol)
+        - size of the entry, in bytes. If the value cannot be determined, will
+          be ``None``.
+        - type of entry, "file", "directory" or other
+
+        Additional information
+        may be present, appropriate to the file-system, e.g., generation,
+        checksum, etc.
+
+        May use refresh=True|False to allow use of self._ls_from_cache to
+        check for a saved listing and avoid calling the backend. This would be
+        common where listing may be expensive.
+
+        Parameters
+        ----------
+        path: str
+        detail: bool
+            if True, gives a list of dictionaries, where each is the same as
+            the result of ``info(path)``. If False, gives a list of paths
+            (str).
+        kwargs: may have additional backend-specific options, such as version
+            information
+
+        Returns
+        -------
+        List of strings if detail is False, or list of directory information
+        dicts if detail is True.
+        """
+        raise NotImplementedError
+
+    def _ls_from_cache(self, path):
+        """Check cache for listing
+
+        Returns listing, if found (may be empty list for a directly that exists
+        but contains nothing), None if not in cache.
+        """
+        parent = self._parent(path)
+        try:
+            return self.dircache[path.rstrip("/")]
+        except KeyError:
+            pass
+        try:
+            files = [
+                f
+                for f in self.dircache[parent]
+                if f["name"] == path
+                or (f["name"] == path.rstrip("/") and f["type"] == "directory")
+            ]
+            if len(files) == 0:
+                # parent dir was listed but did not contain this file
+                raise FileNotFoundError(path)
+            return files
+        except KeyError:
+            pass
+
+    def walk(self, path, maxdepth=None, topdown=True, on_error="omit", **kwargs):
+        """Return all files belows path
+
+        List all files, recursing into subdirectories; output is iterator-style,
+        like ``os.walk()``. For a simple list of files, ``find()`` is available.
+
+        When topdown is True, the caller can modify the dirnames list in-place (perhaps
+        using del or slice assignment), and walk() will
+        only recurse into the subdirectories whose names remain in dirnames;
+        this can be used to prune the search, impose a specific order of visiting,
+        or even to inform walk() about directories the caller creates or renames before
+        it resumes walk() again.
+        Modifying dirnames when topdown is False has no effect. (see os.walk)
+
+        Note that the "files" outputted will include anything that is not
+        a directory, such as links.
+
+        Parameters
+        ----------
+        path: str
+            Root to recurse into
+        maxdepth: int
+            Maximum recursion depth. None means limitless, but not recommended
+            on link-based file-systems.
+        topdown: bool (True)
+            Whether to walk the directory tree from the top downwards or from
+            the bottom upwards.
+        on_error: "omit", "raise", a callable
+            if omit (default), path with exception will simply be empty;
+            If raise, an underlying exception will be raised;
+            if callable, it will be called with a single OSError instance as argument
+        kwargs: passed to ``ls``
+        """
+        if maxdepth is not None and maxdepth < 1:
+            raise ValueError("maxdepth must be at least 1")
+
+        path = self._strip_protocol(path)
+        full_dirs = {}
+        dirs = {}
+        files = {}
+
+        detail = kwargs.pop("detail", False)
+        try:
+            listing = self.ls(path, detail=True, **kwargs)
+        except (FileNotFoundError, OSError) as e:
+            if on_error == "raise":
+                raise
+            if callable(on_error):
+                on_error(e)
+            return
+
+        for info in listing:
+            # each info name must be at least [path]/part , but here
+            # we check also for names like [path]/part/
+            pathname = info["name"].rstrip("/")
+            name = pathname.rsplit("/", 1)[-1]
+            if info["type"] == "directory" and pathname != path:
+                # do not include "self" path
+                full_dirs[name] = pathname
+                dirs[name] = info
+            elif pathname == path:
+                # file-like with same name as give path
+                files[""] = info
+            else:
+                files[name] = info
+
+        if not detail:
+            dirs = list(dirs)
+            files = list(files)
+
+        if topdown:
+            # Yield before recursion if walking top down
+            yield path, dirs, files
+
+        if maxdepth is not None:
+            maxdepth -= 1
+            if maxdepth < 1:
+                if not topdown:
+                    yield path, dirs, files
+                return
+
+        for d in dirs:
+            yield from self.walk(
+                full_dirs[d],
+                maxdepth=maxdepth,
+                detail=detail,
+                topdown=topdown,
+                **kwargs,
+            )
+
+        if not topdown:
+            # Yield after recursion if walking bottom up
+            yield path, dirs, files
+
+    def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
+        """List all files below path.
+
+        Like posix ``find`` command without conditions
+
+        Parameters
+        ----------
+        path : str
+        maxdepth: int or None
+            If not None, the maximum number of levels to descend
+        withdirs: bool
+            Whether to include directory paths in the output. This is True
+            when used by glob, but users usually only want files.
+        kwargs are passed to ``ls``.
+        """
+        # TODO: allow equivalent of -name parameter
+        path = self._strip_protocol(path)
+        out = {}
+
+        # Add the root directory if withdirs is requested
+        # This is needed for posix glob compliance
+        if withdirs and path != "" and self.isdir(path):
+            out[path] = self.info(path)
+
+        for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs):
+            if withdirs:
+                files.update(dirs)
+            out.update({info["name"]: info for name, info in files.items()})
+        if not out and self.isfile(path):
+            # walk works on directories, but find should also return [path]
+            # when path happens to be a file
+            out[path] = {}
+        names = sorted(out)
+        if not detail:
+            return names
+        else:
+            return {name: out[name] for name in names}
+
+    def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs):
+        """Space used by files and optionally directories within a path
+
+        Directory size does not include the size of its contents.
+
+        Parameters
+        ----------
+        path: str
+        total: bool
+            Whether to sum all the file sizes
+        maxdepth: int or None
+            Maximum number of directory levels to descend, None for unlimited.
+        withdirs: bool
+            Whether to include directory paths in the output.
+        kwargs: passed to ``find``
+
+        Returns
+        -------
+        Dict of {path: size} if total=False, or int otherwise, where numbers
+        refer to bytes used.
+        """
+        sizes = {}
+        if withdirs and self.isdir(path):
+            # Include top-level directory in output
+            info = self.info(path)
+            sizes[info["name"]] = info["size"]
+        for f in self.find(path, maxdepth=maxdepth, withdirs=withdirs, **kwargs):
+            info = self.info(f)
+            sizes[info["name"]] = info["size"]
+        if total:
+            return sum(sizes.values())
+        else:
+            return sizes
+
+    def glob(self, path, maxdepth=None, **kwargs):
+        """
+        Find files by glob-matching.
+
+        If the path ends with '/', only folders are returned.
+
+        We support ``"**"``,
+        ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation.
+
+        The `maxdepth` option is applied on the first `**` found in the path.
+
+        kwargs are passed to ``ls``.
+        """
+        if maxdepth is not None and maxdepth < 1:
+            raise ValueError("maxdepth must be at least 1")
+
+        import re
+
+        seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
+        ends_with_sep = path.endswith(seps)  # _strip_protocol strips trailing slash
+        path = self._strip_protocol(path)
+        append_slash_to_dirname = ends_with_sep or path.endswith(
+            tuple(sep + "**" for sep in seps)
+        )
+        idx_star = path.find("*") if path.find("*") >= 0 else len(path)
+        idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
+        idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
+
+        min_idx = min(idx_star, idx_qmark, idx_brace)
+
+        detail = kwargs.pop("detail", False)
+
+        if not has_magic(path):
+            if self.exists(path, **kwargs):
+                if not detail:
+                    return [path]
+                else:
+                    return {path: self.info(path, **kwargs)}
+            else:
+                if not detail:
+                    return []  # glob of non-existent returns empty
+                else:
+                    return {}
+        elif "/" in path[:min_idx]:
+            min_idx = path[:min_idx].rindex("/")
+            root = path[: min_idx + 1]
+            depth = path[min_idx + 1 :].count("/") + 1
+        else:
+            root = ""
+            depth = path[min_idx + 1 :].count("/") + 1
+
+        if "**" in path:
+            if maxdepth is not None:
+                idx_double_stars = path.find("**")
+                depth_double_stars = path[idx_double_stars:].count("/") + 1
+                depth = depth - depth_double_stars + maxdepth
+            else:
+                depth = None
+
+        allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs)
+
+        pattern = glob_translate(path + ("/" if ends_with_sep else ""))
+        pattern = re.compile(pattern)
+
+        out = {
+            p: info
+            for p, info in sorted(allpaths.items())
+            if pattern.match(
+                p + "/"
+                if append_slash_to_dirname and info["type"] == "directory"
+                else p
+            )
+        }
+
+        if detail:
+            return out
+        else:
+            return list(out)
+
+    def exists(self, path, **kwargs):
+        """Is there a file at the given path"""
+        try:
+            self.info(path, **kwargs)
+            return True
+        except:  # noqa: E722
+            # any exception allowed bar FileNotFoundError?
+            return False
+
+    def lexists(self, path, **kwargs):
+        """If there is a file at the given path (including
+        broken links)"""
+        return self.exists(path)
+
+    def info(self, path, **kwargs):
+        """Give details of entry at path
+
+        Returns a single dictionary, with exactly the same information as ``ls``
+        would with ``detail=True``.
+
+        The default implementation calls ls and could be overridden by a
+        shortcut. kwargs are passed on to ```ls()``.
+
+        Some file systems might not be able to measure the file's size, in
+        which case, the returned dict will include ``'size': None``.
+
+        Returns
+        -------
+        dict with keys: name (full path in the FS), size (in bytes), type (file,
+        directory, or something else) and other FS-specific keys.
+        """
+        path = self._strip_protocol(path)
+        out = self.ls(self._parent(path), detail=True, **kwargs)
+        out = [o for o in out if o["name"].rstrip("/") == path]
+        if out:
+            return out[0]
+        out = self.ls(path, detail=True, **kwargs)
+        path = path.rstrip("/")
+        out1 = [o for o in out if o["name"].rstrip("/") == path]
+        if len(out1) == 1:
+            if "size" not in out1[0]:
+                out1[0]["size"] = None
+            return out1[0]
+        elif len(out1) > 1 or out:
+            return {"name": path, "size": 0, "type": "directory"}
+        else:
+            raise FileNotFoundError(path)
+
+    def checksum(self, path):
+        """Unique value for current version of file
+
+        If the checksum is the same from one moment to another, the contents
+        are guaranteed to be the same. If the checksum changes, the contents
+        *might* have changed.
+
+        This should normally be overridden; default will probably capture
+        creation/modification timestamp (which would be good) or maybe
+        access timestamp (which would be bad)
+        """
+        return int(tokenize(self.info(path)), 16)
+
+    def size(self, path):
+        """Size in bytes of file"""
+        return self.info(path).get("size", None)
+
+    def sizes(self, paths):
+        """Size in bytes of each file in a list of paths"""
+        return [self.size(p) for p in paths]
+
+    def isdir(self, path):
+        """Is this entry directory-like?"""
+        try:
+            return self.info(path)["type"] == "directory"
+        except OSError:
+            return False
+
+    def isfile(self, path):
+        """Is this entry file-like?"""
+        try:
+            return self.info(path)["type"] == "file"
+        except:  # noqa: E722
+            return False
+
+    def read_text(self, path, encoding=None, errors=None, newline=None, **kwargs):
+        """Get the contents of the file as a string.
+
+        Parameters
+        ----------
+        path: str
+            URL of file on this filesystems
+        encoding, errors, newline: same as `open`.
+        """
+        with self.open(
+            path,
+            mode="r",
+            encoding=encoding,
+            errors=errors,
+            newline=newline,
+            **kwargs,
+        ) as f:
+            return f.read()
+
+    def write_text(
+        self, path, value, encoding=None, errors=None, newline=None, **kwargs
+    ):
+        """Write the text to the given file.
+
+        An existing file will be overwritten.
+
+        Parameters
+        ----------
+        path: str
+            URL of file on this filesystems
+        value: str
+            Text to write.
+        encoding, errors, newline: same as `open`.
+        """
+        with self.open(
+            path,
+            mode="w",
+            encoding=encoding,
+            errors=errors,
+            newline=newline,
+            **kwargs,
+        ) as f:
+            return f.write(value)
+
+    def cat_file(self, path, start=None, end=None, **kwargs):
+        """Get the content of a file
+
+        Parameters
+        ----------
+        path: URL of file on this filesystems
+        start, end: int
+            Bytes limits of the read. If negative, backwards from end,
+            like usual python slices. Either can be None for start or
+            end of file, respectively
+        kwargs: passed to ``open()``.
+        """
+        # explicitly set buffering off?
+        with self.open(path, "rb", **kwargs) as f:
+            if start is not None:
+                if start >= 0:
+                    f.seek(start)
+                else:
+                    f.seek(max(0, f.size + start))
+            if end is not None:
+                if end < 0:
+                    end = f.size + end
+                return f.read(end - f.tell())
+            return f.read()
+
+    def pipe_file(self, path, value, mode="overwrite", **kwargs):
+        """Set the bytes of given file"""
+        if mode == "create" and self.exists(path):
+            # non-atomic but simple way; or could use "xb" in open(), which is likely
+            # not as well supported
+            raise FileExistsError
+        with self.open(path, "wb", **kwargs) as f:
+            f.write(value)
+
+    def pipe(self, path, value=None, **kwargs):
+        """Put value into path
+
+        (counterpart to ``cat``)
+
+        Parameters
+        ----------
+        path: string or dict(str, bytes)
+            If a string, a single remote location to put ``value`` bytes; if a dict,
+            a mapping of {path: bytesvalue}.
+        value: bytes, optional
+            If using a single path, these are the bytes to put there. Ignored if
+            ``path`` is a dict
+        """
+        if isinstance(path, str):
+            self.pipe_file(self._strip_protocol(path), value, **kwargs)
+        elif isinstance(path, dict):
+            for k, v in path.items():
+                self.pipe_file(self._strip_protocol(k), v, **kwargs)
+        else:
+            raise ValueError("path must be str or dict")
+
+    def cat_ranges(
+        self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
+    ):
+        """Get the contents of byte ranges from one or more files
+
+        Parameters
+        ----------
+        paths: list
+            A list of of filepaths on this filesystems
+        starts, ends: int or list
+            Bytes limits of the read. If using a single int, the same value will be
+            used to read all the specified files.
+        """
+        if max_gap is not None:
+            raise NotImplementedError
+        if not isinstance(paths, list):
+            raise TypeError
+        if not isinstance(starts, list):
+            starts = [starts] * len(paths)
+        if not isinstance(ends, list):
+            ends = [ends] * len(paths)
+        if len(starts) != len(paths) or len(ends) != len(paths):
+            raise ValueError
+        out = []
+        for p, s, e in zip(paths, starts, ends):
+            try:
+                out.append(self.cat_file(p, s, e))
+            except Exception as e:
+                if on_error == "return":
+                    out.append(e)
+                else:
+                    raise
+        return out
+
+    def cat(self, path, recursive=False, on_error="raise", **kwargs):
+        """Fetch (potentially multiple) paths' contents
+
+        Parameters
+        ----------
+        recursive: bool
+            If True, assume the path(s) are directories, and get all the
+            contained files
+        on_error : "raise", "omit", "return"
+            If raise, an underlying exception will be raised (converted to KeyError
+            if the type is in self.missing_exceptions); if omit, keys with exception
+            will simply not be included in the output; if "return", all keys are
+            included in the output, but the value will be bytes or an exception
+            instance.
+        kwargs: passed to cat_file
+
+        Returns
+        -------
+        dict of {path: contents} if there are multiple paths
+        or the path has been otherwise expanded
+        """
+        paths = self.expand_path(path, recursive=recursive)
+        if (
+            len(paths) > 1
+            or isinstance(path, list)
+            or paths[0] != self._strip_protocol(path)
+        ):
+            out = {}
+            for path in paths:
+                try:
+                    out[path] = self.cat_file(path, **kwargs)
+                except Exception as e:
+                    if on_error == "raise":
+                        raise
+                    if on_error == "return":
+                        out[path] = e
+            return out
+        else:
+            return self.cat_file(paths[0], **kwargs)
+
+    def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, outfile=None, **kwargs):
+        """Copy single remote file to local"""
+        from .implementations.local import LocalFileSystem
+
+        if isfilelike(lpath):
+            outfile = lpath
+        elif self.isdir(rpath):
+            os.makedirs(lpath, exist_ok=True)
+            return None
+
+        fs = LocalFileSystem(auto_mkdir=True)
+        fs.makedirs(fs._parent(lpath), exist_ok=True)
+
+        with self.open(rpath, "rb", **kwargs) as f1:
+            if outfile is None:
+                outfile = open(lpath, "wb")
+
+            try:
+                callback.set_size(getattr(f1, "size", None))
+                data = True
+                while data:
+                    data = f1.read(self.blocksize)
+                    segment_len = outfile.write(data)
+                    if segment_len is None:
+                        segment_len = len(data)
+                    callback.relative_update(segment_len)
+            finally:
+                if not isfilelike(lpath):
+                    outfile.close()
+
+    def get(
+        self,
+        rpath,
+        lpath,
+        recursive=False,
+        callback=DEFAULT_CALLBACK,
+        maxdepth=None,
+        **kwargs,
+    ):
+        """Copy file(s) to local.
+
+        Copies a specific file or tree of files (if recursive=True). If lpath
+        ends with a "/", it will be assumed to be a directory, and target files
+        will go within. Can submit a list of paths, which may be glob-patterns
+        and will be expanded.
+
+        Calls get_file for each source.
+        """
+        if isinstance(lpath, list) and isinstance(rpath, list):
+            # No need to expand paths when both source and destination
+            # are provided as lists
+            rpaths = rpath
+            lpaths = lpath
+        else:
+            from .implementations.local import (
+                LocalFileSystem,
+                make_path_posix,
+                trailing_sep,
+            )
+
+            source_is_str = isinstance(rpath, str)
+            rpaths = self.expand_path(rpath, recursive=recursive, maxdepth=maxdepth)
+            if source_is_str and (not recursive or maxdepth is not None):
+                # Non-recursive glob does not copy directories
+                rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))]
+                if not rpaths:
+                    return
+
+            if isinstance(lpath, str):
+                lpath = make_path_posix(lpath)
+
+            source_is_file = len(rpaths) == 1
+            dest_is_dir = isinstance(lpath, str) and (
+                trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
+            )
+
+            exists = source_is_str and (
+                (has_magic(rpath) and source_is_file)
+                or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath))
+            )
+            lpaths = other_paths(
+                rpaths,
+                lpath,
+                exists=exists,
+                flatten=not source_is_str,
+            )
+
+        callback.set_size(len(lpaths))
+        for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
+            with callback.branched(rpath, lpath) as child:
+                self.get_file(rpath, lpath, callback=child, **kwargs)
+
+    def put_file(
+        self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs
+    ):
+        """Copy single file to remote"""
+        if mode == "create" and self.exists(rpath):
+            raise FileExistsError
+        if os.path.isdir(lpath):
+            self.makedirs(rpath, exist_ok=True)
+            return None
+
+        with open(lpath, "rb") as f1:
+            size = f1.seek(0, 2)
+            callback.set_size(size)
+            f1.seek(0)
+
+            self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True)
+            with self.open(rpath, "wb", **kwargs) as f2:
+                while f1.tell() < size:
+                    data = f1.read(self.blocksize)
+                    segment_len = f2.write(data)
+                    if segment_len is None:
+                        segment_len = len(data)
+                    callback.relative_update(segment_len)
+
+    def put(
+        self,
+        lpath,
+        rpath,
+        recursive=False,
+        callback=DEFAULT_CALLBACK,
+        maxdepth=None,
+        **kwargs,
+    ):
+        """Copy file(s) from local.
+
+        Copies a specific file or tree of files (if recursive=True). If rpath
+        ends with a "/", it will be assumed to be a directory, and target files
+        will go within.
+
+        Calls put_file for each source.
+        """
+        if isinstance(lpath, list) and isinstance(rpath, list):
+            # No need to expand paths when both source and destination
+            # are provided as lists
+            rpaths = rpath
+            lpaths = lpath
+        else:
+            from .implementations.local import (
+                LocalFileSystem,
+                make_path_posix,
+                trailing_sep,
+            )
+
+            source_is_str = isinstance(lpath, str)
+            if source_is_str:
+                lpath = make_path_posix(lpath)
+            fs = LocalFileSystem()
+            lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
+            if source_is_str and (not recursive or maxdepth is not None):
+                # Non-recursive glob does not copy directories
+                lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
+                if not lpaths:
+                    return
+
+            source_is_file = len(lpaths) == 1
+            dest_is_dir = isinstance(rpath, str) and (
+                trailing_sep(rpath) or self.isdir(rpath)
+            )
+
+            rpath = (
+                self._strip_protocol(rpath)
+                if isinstance(rpath, str)
+                else [self._strip_protocol(p) for p in rpath]
+            )
+            exists = source_is_str and (
+                (has_magic(lpath) and source_is_file)
+                or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
+            )
+            rpaths = other_paths(
+                lpaths,
+                rpath,
+                exists=exists,
+                flatten=not source_is_str,
+            )
+
+        callback.set_size(len(rpaths))
+        for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
+            with callback.branched(lpath, rpath) as child:
+                self.put_file(lpath, rpath, callback=child, **kwargs)
+
+    def head(self, path, size=1024):
+        """Get the first ``size`` bytes from file"""
+        with self.open(path, "rb") as f:
+            return f.read(size)
+
+    def tail(self, path, size=1024):
+        """Get the last ``size`` bytes from file"""
+        with self.open(path, "rb") as f:
+            f.seek(max(-size, -f.size), 2)
+            return f.read()
+
+    def cp_file(self, path1, path2, **kwargs):
+        raise NotImplementedError
+
+    def copy(
+        self, path1, path2, recursive=False, maxdepth=None, on_error=None, **kwargs
+    ):
+        """Copy within two locations in the filesystem
+
+        on_error : "raise", "ignore"
+            If raise, any not-found exceptions will be raised; if ignore any
+            not-found exceptions will cause the path to be skipped; defaults to
+            raise unless recursive is true, where the default is ignore
+        """
+        if on_error is None and recursive:
+            on_error = "ignore"
+        elif on_error is None:
+            on_error = "raise"
+
+        if isinstance(path1, list) and isinstance(path2, list):
+            # No need to expand paths when both source and destination
+            # are provided as lists
+            paths1 = path1
+            paths2 = path2
+        else:
+            from .implementations.local import trailing_sep
+
+            source_is_str = isinstance(path1, str)
+            paths1 = self.expand_path(path1, recursive=recursive, maxdepth=maxdepth)
+            if source_is_str and (not recursive or maxdepth is not None):
+                # Non-recursive glob does not copy directories
+                paths1 = [p for p in paths1 if not (trailing_sep(p) or self.isdir(p))]
+                if not paths1:
+                    return
+
+            source_is_file = len(paths1) == 1
+            dest_is_dir = isinstance(path2, str) and (
+                trailing_sep(path2) or self.isdir(path2)
+            )
+
+            exists = source_is_str and (
+                (has_magic(path1) and source_is_file)
+                or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
+            )
+            paths2 = other_paths(
+                paths1,
+                path2,
+                exists=exists,
+                flatten=not source_is_str,
+            )
+
+        for p1, p2 in zip(paths1, paths2):
+            try:
+                self.cp_file(p1, p2, **kwargs)
+            except FileNotFoundError:
+                if on_error == "raise":
+                    raise
+
+    def expand_path(self, path, recursive=False, maxdepth=None, **kwargs):
+        """Turn one or more globs or directories into a list of all matching paths
+        to files or directories.
+
+        kwargs are passed to ``glob`` or ``find``, which may in turn call ``ls``
+        """
+
+        if maxdepth is not None and maxdepth < 1:
+            raise ValueError("maxdepth must be at least 1")
+
+        if isinstance(path, (str, os.PathLike)):
+            out = self.expand_path([path], recursive, maxdepth)
+        else:
+            out = set()
+            path = [self._strip_protocol(p) for p in path]
+            for p in path:
+                if has_magic(p):
+                    bit = set(self.glob(p, maxdepth=maxdepth, **kwargs))
+                    out |= bit
+                    if recursive:
+                        # glob call above expanded one depth so if maxdepth is defined
+                        # then decrement it in expand_path call below. If it is zero
+                        # after decrementing then avoid expand_path call.
+                        if maxdepth is not None and maxdepth <= 1:
+                            continue
+                        out |= set(
+                            self.expand_path(
+                                list(bit),
+                                recursive=recursive,
+                                maxdepth=maxdepth - 1 if maxdepth is not None else None,
+                                **kwargs,
+                            )
+                        )
+                    continue
+                elif recursive:
+                    rec = set(
+                        self.find(
+                            p, maxdepth=maxdepth, withdirs=True, detail=False, **kwargs
+                        )
+                    )
+                    out |= rec
+                if p not in out and (recursive is False or self.exists(p)):
+                    # should only check once, for the root
+                    out.add(p)
+        if not out:
+            raise FileNotFoundError(path)
+        return sorted(out)
+
+    def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
+        """Move file(s) from one location to another"""
+        if path1 == path2:
+            logger.debug("%s mv: The paths are the same, so no files were moved.", self)
+        else:
+            # explicitly raise exception to prevent data corruption
+            self.copy(
+                path1, path2, recursive=recursive, maxdepth=maxdepth, onerror="raise"
+            )
+            self.rm(path1, recursive=recursive)
+
+    def rm_file(self, path):
+        """Delete a file"""
+        self._rm(path)
+
+    def _rm(self, path):
+        """Delete one file"""
+        # this is the old name for the method, prefer rm_file
+        raise NotImplementedError
+
+    def rm(self, path, recursive=False, maxdepth=None):
+        """Delete files.
+
+        Parameters
+        ----------
+        path: str or list of str
+            File(s) to delete.
+        recursive: bool
+            If file(s) are directories, recursively delete contents and then
+            also remove the directory
+        maxdepth: int or None
+            Depth to pass to walk for finding files to delete, if recursive.
+            If None, there will be no limit and infinite recursion may be
+            possible.
+        """
+        path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
+        for p in reversed(path):
+            self.rm_file(p)
+
+    @classmethod
+    def _parent(cls, path):
+        path = cls._strip_protocol(path)
+        if "/" in path:
+            parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker)
+            return cls.root_marker + parent
+        else:
+            return cls.root_marker
+
+    def _open(
+        self,
+        path,
+        mode="rb",
+        block_size=None,
+        autocommit=True,
+        cache_options=None,
+        **kwargs,
+    ):
+        """Return raw bytes-mode file-like from the file-system"""
+        return AbstractBufferedFile(
+            self,
+            path,
+            mode,
+            block_size,
+            autocommit,
+            cache_options=cache_options,
+            **kwargs,
+        )
+
+    def open(
+        self,
+        path,
+        mode="rb",
+        block_size=None,
+        cache_options=None,
+        compression=None,
+        **kwargs,
+    ):
+        """
+        Return a file-like object from the filesystem
+
+        The resultant instance must function correctly in a context ``with``
+        block.
+
+        Parameters
+        ----------
+        path: str
+            Target file
+        mode: str like 'rb', 'w'
+            See builtin ``open()``
+            Mode "x" (exclusive write) may be implemented by the backend. Even if
+            it is, whether  it is checked up front or on commit, and whether it is
+            atomic is implementation-dependent.
+        block_size: int
+            Some indication of buffering - this is a value in bytes
+        cache_options : dict, optional
+            Extra arguments to pass through to the cache.
+        compression: string or None
+            If given, open file using compression codec. Can either be a compression
+            name (a key in ``fsspec.compression.compr``) or "infer" to guess the
+            compression from the filename suffix.
+        encoding, errors, newline: passed on to TextIOWrapper for text mode
+        """
+        import io
+
+        path = self._strip_protocol(path)
+        if "b" not in mode:
+            mode = mode.replace("t", "") + "b"
+
+            text_kwargs = {
+                k: kwargs.pop(k)
+                for k in ["encoding", "errors", "newline"]
+                if k in kwargs
+            }
+            return io.TextIOWrapper(
+                self.open(
+                    path,
+                    mode,
+                    block_size=block_size,
+                    cache_options=cache_options,
+                    compression=compression,
+                    **kwargs,
+                ),
+                **text_kwargs,
+            )
+        else:
+            ac = kwargs.pop("autocommit", not self._intrans)
+            f = self._open(
+                path,
+                mode=mode,
+                block_size=block_size,
+                autocommit=ac,
+                cache_options=cache_options,
+                **kwargs,
+            )
+            if compression is not None:
+                from fsspec.compression import compr
+                from fsspec.core import get_compression
+
+                compression = get_compression(path, compression)
+                compress = compr[compression]
+                f = compress(f, mode=mode[0])
+
+            if not ac and "r" not in mode:
+                self.transaction.files.append(f)
+            return f
+
+    def touch(self, path, truncate=True, **kwargs):
+        """Create empty file, or update timestamp
+
+        Parameters
+        ----------
+        path: str
+            file location
+        truncate: bool
+            If True, always set file size to 0; if False, update timestamp and
+            leave file unchanged, if backend allows this
+        """
+        if truncate or not self.exists(path):
+            with self.open(path, "wb", **kwargs):
+                pass
+        else:
+            raise NotImplementedError  # update timestamp, if possible
+
+    def ukey(self, path):
+        """Hash of file properties, to tell if it has changed"""
+        return sha256(str(self.info(path)).encode()).hexdigest()
+
+    def read_block(self, fn, offset, length, delimiter=None):
+        """Read a block of bytes from
+
+        Starting at ``offset`` of the file, read ``length`` bytes.  If
+        ``delimiter`` is set then we ensure that the read starts and stops at
+        delimiter boundaries that follow the locations ``offset`` and ``offset
+        + length``.  If ``offset`` is zero then we start at zero.  The
+        bytestring returned WILL include the end delimiter string.
+
+        If offset+length is beyond the eof, reads to eof.
+
+        Parameters
+        ----------
+        fn: string
+            Path to filename
+        offset: int
+            Byte offset to start read
+        length: int
+            Number of bytes to read. If None, read to end.
+        delimiter: bytes (optional)
+            Ensure reading starts and stops at delimiter bytestring
+
+        Examples
+        --------
+        >>> fs.read_block('data/file.csv', 0, 13)  # doctest: +SKIP
+        b'Alice, 100\\nBo'
+        >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n')  # doctest: +SKIP
+        b'Alice, 100\\nBob, 200\\n'
+
+        Use ``length=None`` to read to the end of the file.
+        >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n')  # doctest: +SKIP
+        b'Alice, 100\\nBob, 200\\nCharlie, 300'
+
+        See Also
+        --------
+        :func:`fsspec.utils.read_block`
+        """
+        with self.open(fn, "rb") as f:
+            size = f.size
+            if length is None:
+                length = size
+            if size is not None and offset + length > size:
+                length = size - offset
+            return read_block(f, offset, length, delimiter)
+
+    def to_json(self, *, include_password: bool = True) -> str:
+        """
+        JSON representation of this filesystem instance.
+
+        Parameters
+        ----------
+        include_password: bool, default True
+            Whether to include the password (if any) in the output.
+
+        Returns
+        -------
+        JSON string with keys ``cls`` (the python location of this class),
+        protocol (text name of this class's protocol, first one in case of
+        multiple), ``args`` (positional args, usually empty), and all other
+        keyword arguments as their own keys.
+
+        Warnings
+        --------
+        Serialized filesystems may contain sensitive information which have been
+        passed to the constructor, such as passwords and tokens. Make sure you
+        store and send them in a secure environment!
+        """
+        from .json import FilesystemJSONEncoder
+
+        return json.dumps(
+            self,
+            cls=type(
+                "_FilesystemJSONEncoder",
+                (FilesystemJSONEncoder,),
+                {"include_password": include_password},
+            ),
+        )
+
+    @staticmethod
+    def from_json(blob: str) -> AbstractFileSystem:
+        """
+        Recreate a filesystem instance from JSON representation.
+
+        See ``.to_json()`` for the expected structure of the input.
+
+        Parameters
+        ----------
+        blob: str
+
+        Returns
+        -------
+        file system instance, not necessarily of this particular class.
+
+        Warnings
+        --------
+        This can import arbitrary modules (as determined by the ``cls`` key).
+        Make sure you haven't installed any modules that may execute malicious code
+        at import time.
+        """
+        from .json import FilesystemJSONDecoder
+
+        return json.loads(blob, cls=FilesystemJSONDecoder)
+
+    def to_dict(self, *, include_password: bool = True) -> dict[str, Any]:
+        """
+        JSON-serializable dictionary representation of this filesystem instance.
+
+        Parameters
+        ----------
+        include_password: bool, default True
+            Whether to include the password (if any) in the output.
+
+        Returns
+        -------
+        Dictionary with keys ``cls`` (the python location of this class),
+        protocol (text name of this class's protocol, first one in case of
+        multiple), ``args`` (positional args, usually empty), and all other
+        keyword arguments as their own keys.
+
+        Warnings
+        --------
+        Serialized filesystems may contain sensitive information which have been
+        passed to the constructor, such as passwords and tokens. Make sure you
+        store and send them in a secure environment!
+        """
+        from .json import FilesystemJSONEncoder
+
+        json_encoder = FilesystemJSONEncoder()
+
+        cls = type(self)
+        proto = self.protocol
+
+        storage_options = dict(self.storage_options)
+        if not include_password:
+            storage_options.pop("password", None)
+
+        return dict(
+            cls=f"{cls.__module__}:{cls.__name__}",
+            protocol=proto[0] if isinstance(proto, (tuple, list)) else proto,
+            args=json_encoder.make_serializable(self.storage_args),
+            **json_encoder.make_serializable(storage_options),
+        )
+
+    @staticmethod
+    def from_dict(dct: dict[str, Any]) -> AbstractFileSystem:
+        """
+        Recreate a filesystem instance from dictionary representation.
+
+        See ``.to_dict()`` for the expected structure of the input.
+
+        Parameters
+        ----------
+        dct: Dict[str, Any]
+
+        Returns
+        -------
+        file system instance, not necessarily of this particular class.
+
+        Warnings
+        --------
+        This can import arbitrary modules (as determined by the ``cls`` key).
+        Make sure you haven't installed any modules that may execute malicious code
+        at import time.
+        """
+        from .json import FilesystemJSONDecoder
+
+        json_decoder = FilesystemJSONDecoder()
+
+        dct = dict(dct)  # Defensive copy
+
+        cls = FilesystemJSONDecoder.try_resolve_fs_cls(dct)
+        if cls is None:
+            raise ValueError("Not a serialized AbstractFileSystem")
+
+        dct.pop("cls", None)
+        dct.pop("protocol", None)
+
+        return cls(
+            *json_decoder.unmake_serializable(dct.pop("args", ())),
+            **json_decoder.unmake_serializable(dct),
+        )
+
+    def _get_pyarrow_filesystem(self):
+        """
+        Make a version of the FS instance which will be acceptable to pyarrow
+        """
+        # all instances already also derive from pyarrow
+        return self
+
+    def get_mapper(self, root="", check=False, create=False, missing_exceptions=None):
+        """Create key/value store based on this file-system
+
+        Makes a MutableMapping interface to the FS at the given root path.
+        See ``fsspec.mapping.FSMap`` for further details.
+        """
+        from .mapping import FSMap
+
+        return FSMap(
+            root,
+            self,
+            check=check,
+            create=create,
+            missing_exceptions=missing_exceptions,
+        )
+
+    @classmethod
+    def clear_instance_cache(cls):
+        """
+        Clear the cache of filesystem instances.
+
+        Notes
+        -----
+        Unless overridden by setting the ``cachable`` class attribute to False,
+        the filesystem class stores a reference to newly created instances. This
+        prevents Python's normal rules around garbage collection from working,
+        since the instances refcount will not drop to zero until
+        ``clear_instance_cache`` is called.
+        """
+        cls._cache.clear()
+
+    def created(self, path):
+        """Return the created timestamp of a file as a datetime.datetime"""
+        raise NotImplementedError
+
+    def modified(self, path):
+        """Return the modified timestamp of a file as a datetime.datetime"""
+        raise NotImplementedError
+
+    def tree(
+        self,
+        path: str = "/",
+        recursion_limit: int = 2,
+        max_display: int = 25,
+        display_size: bool = False,
+        prefix: str = "",
+        is_last: bool = True,
+        first: bool = True,
+        indent_size: int = 4,
+    ) -> str:
+        """
+        Return a tree-like structure of the filesystem starting from the given path as a string.
+
+        Parameters
+        ----------
+            path: Root path to start traversal from
+            recursion_limit: Maximum depth of directory traversal
+            max_display: Maximum number of items to display per directory
+            display_size: Whether to display file sizes
+            prefix: Current line prefix for visual tree structure
+            is_last: Whether current item is last in its level
+            first: Whether this is the first call (displays root path)
+            indent_size: Number of spaces by indent
+
+        Returns
+        -------
+            str: A string representing the tree structure.
+
+        Example
+        -------
+            >>> from fsspec import filesystem
+
+            >>> fs = filesystem('ftp', host='test.rebex.net', user='demo', password='password')
+            >>> tree = fs.tree(display_size=True, recursion_limit=3, indent_size=8, max_display=10)
+            >>> print(tree)
+        """
+
+        def format_bytes(n: int) -> str:
+            """Format bytes as text."""
+            for prefix, k in (
+                ("P", 2**50),
+                ("T", 2**40),
+                ("G", 2**30),
+                ("M", 2**20),
+                ("k", 2**10),
+            ):
+                if n >= 0.9 * k:
+                    return f"{n / k:.2f} {prefix}b"
+            return f"{n}B"
+
+        result = []
+
+        if first:
+            result.append(path)
+
+        if recursion_limit:
+            indent = " " * indent_size
+            contents = self.ls(path, detail=True)
+            contents.sort(
+                key=lambda x: (x.get("type") != "directory", x.get("name", ""))
+            )
+
+            if max_display is not None and len(contents) > max_display:
+                displayed_contents = contents[:max_display]
+                remaining_count = len(contents) - max_display
+            else:
+                displayed_contents = contents
+                remaining_count = 0
+
+            for i, item in enumerate(displayed_contents):
+                is_last_item = (i == len(displayed_contents) - 1) and (
+                    remaining_count == 0
+                )
+
+                branch = (
+                    "└" + ("─" * (indent_size - 2))
+                    if is_last_item
+                    else "├" + ("─" * (indent_size - 2))
+                )
+                branch += " "
+                new_prefix = prefix + (
+                    indent if is_last_item else "│" + " " * (indent_size - 1)
+                )
+
+                name = os.path.basename(item.get("name", ""))
+
+                if display_size and item.get("type") == "directory":
+                    sub_contents = self.ls(item.get("name", ""), detail=True)
+                    num_files = sum(
+                        1 for sub_item in sub_contents if sub_item.get("type") == "file"
+                    )
+                    num_folders = sum(
+                        1
+                        for sub_item in sub_contents
+                        if sub_item.get("type") == "directory"
+                    )
+
+                    if num_files == 0 and num_folders == 0:
+                        size = " (empty folder)"
+                    elif num_files == 0:
+                        size = f" ({num_folders} subfolder{'s' if num_folders > 1 else ''})"
+                    elif num_folders == 0:
+                        size = f" ({num_files} file{'s' if num_files > 1 else ''})"
+                    else:
+                        size = f" ({num_files} file{'s' if num_files > 1 else ''}, {num_folders} subfolder{'s' if num_folders > 1 else ''})"
+                elif display_size and item.get("type") == "file":
+                    size = f" ({format_bytes(item.get('size', 0))})"
+                else:
+                    size = ""
+
+                result.append(f"{prefix}{branch}{name}{size}")
+
+                if item.get("type") == "directory" and recursion_limit > 0:
+                    result.append(
+                        self.tree(
+                            path=item.get("name", ""),
+                            recursion_limit=recursion_limit - 1,
+                            max_display=max_display,
+                            display_size=display_size,
+                            prefix=new_prefix,
+                            is_last=is_last_item,
+                            first=False,
+                            indent_size=indent_size,
+                        )
+                    )
+
+            if remaining_count > 0:
+                more_message = f"{remaining_count} more item(s) not displayed."
+                result.append(
+                    f"{prefix}{'└' + ('─' * (indent_size - 2))} {more_message}"
+                )
+
+        return "\n".join(_ for _ in result if _)
+
+    # ------------------------------------------------------------------------
+    # Aliases
+
+    def read_bytes(self, path, start=None, end=None, **kwargs):
+        """Alias of `AbstractFileSystem.cat_file`."""
+        return self.cat_file(path, start=start, end=end, **kwargs)
+
+    def write_bytes(self, path, value, **kwargs):
+        """Alias of `AbstractFileSystem.pipe_file`."""
+        self.pipe_file(path, value, **kwargs)
+
+    def makedir(self, path, create_parents=True, **kwargs):
+        """Alias of `AbstractFileSystem.mkdir`."""
+        return self.mkdir(path, create_parents=create_parents, **kwargs)
+
+    def mkdirs(self, path, exist_ok=False):
+        """Alias of `AbstractFileSystem.makedirs`."""
+        return self.makedirs(path, exist_ok=exist_ok)
+
+    def listdir(self, path, detail=True, **kwargs):
+        """Alias of `AbstractFileSystem.ls`."""
+        return self.ls(path, detail=detail, **kwargs)
+
+    def cp(self, path1, path2, **kwargs):
+        """Alias of `AbstractFileSystem.copy`."""
+        return self.copy(path1, path2, **kwargs)
+
+    def move(self, path1, path2, **kwargs):
+        """Alias of `AbstractFileSystem.mv`."""
+        return self.mv(path1, path2, **kwargs)
+
+    def stat(self, path, **kwargs):
+        """Alias of `AbstractFileSystem.info`."""
+        return self.info(path, **kwargs)
+
+    def disk_usage(self, path, total=True, maxdepth=None, **kwargs):
+        """Alias of `AbstractFileSystem.du`."""
+        return self.du(path, total=total, maxdepth=maxdepth, **kwargs)
+
+    def rename(self, path1, path2, **kwargs):
+        """Alias of `AbstractFileSystem.mv`."""
+        return self.mv(path1, path2, **kwargs)
+
+    def delete(self, path, recursive=False, maxdepth=None):
+        """Alias of `AbstractFileSystem.rm`."""
+        return self.rm(path, recursive=recursive, maxdepth=maxdepth)
+
+    def upload(self, lpath, rpath, recursive=False, **kwargs):
+        """Alias of `AbstractFileSystem.put`."""
+        return self.put(lpath, rpath, recursive=recursive, **kwargs)
+
+    def download(self, rpath, lpath, recursive=False, **kwargs):
+        """Alias of `AbstractFileSystem.get`."""
+        return self.get(rpath, lpath, recursive=recursive, **kwargs)
+
+    def sign(self, path, expiration=100, **kwargs):
+        """Create a signed URL representing the given path
+
+        Some implementations allow temporary URLs to be generated, as a
+        way of delegating credentials.
+
+        Parameters
+        ----------
+        path : str
+             The path on the filesystem
+        expiration : int
+            Number of seconds to enable the URL for (if supported)
+
+        Returns
+        -------
+        URL : str
+            The signed URL
+
+        Raises
+        ------
+        NotImplementedError : if method is not implemented for a filesystem
+        """
+        raise NotImplementedError("Sign is not implemented for this filesystem")
+
+    def _isfilestore(self):
+        # Originally inherited from pyarrow DaskFileSystem. Keeping this
+        # here for backwards compatibility as long as pyarrow uses its
+        # legacy fsspec-compatible filesystems and thus accepts fsspec
+        # filesystems as well
+        return False
+
+
+class AbstractBufferedFile(io.IOBase):
+    """Convenient class to derive from to provide buffering
+
+    In the case that the backend does not provide a pythonic file-like object
+    already, this class contains much of the logic to build one. The only
+    methods that need to be overridden are ``_upload_chunk``,
+    ``_initiate_upload`` and ``_fetch_range``.
+    """
+
+    DEFAULT_BLOCK_SIZE = 5 * 2**20
+    _details = None
+
+    def __init__(
+        self,
+        fs,
+        path,
+        mode="rb",
+        block_size="default",
+        autocommit=True,
+        cache_type="readahead",
+        cache_options=None,
+        size=None,
+        **kwargs,
+    ):
+        """
+        Template for files with buffered reading and writing
+
+        Parameters
+        ----------
+        fs: instance of FileSystem
+        path: str
+            location in file-system
+        mode: str
+            Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file
+            systems may be read-only, and some may not support append.
+        block_size: int
+            Buffer size for reading or writing, 'default' for class default
+        autocommit: bool
+            Whether to write to final destination; may only impact what
+            happens when file is being closed.
+        cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead"
+            Caching policy in read mode. See the definitions in ``core``.
+        cache_options : dict
+            Additional options passed to the constructor for the cache specified
+            by `cache_type`.
+        size: int
+            If given and in read mode, suppressed having to look up the file size
+        kwargs:
+            Gets stored as self.kwargs
+        """
+        from .core import caches
+
+        self.path = path
+        self.fs = fs
+        self.mode = mode
+        self.blocksize = (
+            self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
+        )
+        self.loc = 0
+        self.autocommit = autocommit
+        self.end = None
+        self.start = None
+        self.closed = False
+
+        if cache_options is None:
+            cache_options = {}
+
+        if "trim" in kwargs:
+            warnings.warn(
+                "Passing 'trim' to control the cache behavior has been deprecated. "
+                "Specify it within the 'cache_options' argument instead.",
+                FutureWarning,
+            )
+            cache_options["trim"] = kwargs.pop("trim")
+
+        self.kwargs = kwargs
+
+        if mode not in {"ab", "rb", "wb", "xb"}:
+            raise NotImplementedError("File mode not supported")
+        if mode == "rb":
+            if size is not None:
+                self.size = size
+            else:
+                self.size = self.details["size"]
+            self.cache = caches[cache_type](
+                self.blocksize, self._fetch_range, self.size, **cache_options
+            )
+        else:
+            self.buffer = io.BytesIO()
+            self.offset = None
+            self.forced = False
+            self.location = None
+
+    @property
+    def details(self):
+        if self._details is None:
+            self._details = self.fs.info(self.path)
+        return self._details
+
+    @details.setter
+    def details(self, value):
+        self._details = value
+        self.size = value["size"]
+
+    @property
+    def full_name(self):
+        return _unstrip_protocol(self.path, self.fs)
+
+    @property
+    def closed(self):
+        # get around this attr being read-only in IOBase
+        # use getattr here, since this can be called during del
+        return getattr(self, "_closed", True)
+
+    @closed.setter
+    def closed(self, c):
+        self._closed = c
+
+    def __hash__(self):
+        if "w" in self.mode:
+            return id(self)
+        else:
+            return int(tokenize(self.details), 16)
+
+    def __eq__(self, other):
+        """Files are equal if they have the same checksum, only in read mode"""
+        if self is other:
+            return True
+        return (
+            isinstance(other, type(self))
+            and self.mode == "rb"
+            and other.mode == "rb"
+            and hash(self) == hash(other)
+        )
+
+    def commit(self):
+        """Move from temp to final destination"""
+
+    def discard(self):
+        """Throw away temporary file"""
+
+    def info(self):
+        """File information about this path"""
+        if self.readable():
+            return self.details
+        else:
+            raise ValueError("Info not available while writing")
+
+    def tell(self):
+        """Current file location"""
+        return self.loc
+
+    def seek(self, loc, whence=0):
+        """Set current file location
+
+        Parameters
+        ----------
+        loc: int
+            byte location
+        whence: {0, 1, 2}
+            from start of file, current location or end of file, resp.
+        """
+        loc = int(loc)
+        if not self.mode == "rb":
+            raise OSError(ESPIPE, "Seek only available in read mode")
+        if whence == 0:
+            nloc = loc
+        elif whence == 1:
+            nloc = self.loc + loc
+        elif whence == 2:
+            nloc = self.size + loc
+        else:
+            raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
+        if nloc < 0:
+            raise ValueError("Seek before start of file")
+        self.loc = nloc
+        return self.loc
+
+    def write(self, data):
+        """
+        Write data to buffer.
+
+        Buffer only sent on flush() or if buffer is greater than
+        or equal to blocksize.
+
+        Parameters
+        ----------
+        data: bytes
+            Set of bytes to be written.
+        """
+        if not self.writable():
+            raise ValueError("File not in write mode")
+        if self.closed:
+            raise ValueError("I/O operation on closed file.")
+        if self.forced:
+            raise ValueError("This file has been force-flushed, can only close")
+        out = self.buffer.write(data)
+        self.loc += out
+        if self.buffer.tell() >= self.blocksize:
+            self.flush()
+        return out
+
+    def flush(self, force=False):
+        """
+        Write buffered data to backend store.
+
+        Writes the current buffer, if it is larger than the block-size, or if
+        the file is being closed.
+
+        Parameters
+        ----------
+        force: bool
+            When closing, write the last block even if it is smaller than
+            blocks are allowed to be. Disallows further writing to this file.
+        """
+
+        if self.closed:
+            raise ValueError("Flush on closed file")
+        if force and self.forced:
+            raise ValueError("Force flush cannot be called more than once")
+        if force:
+            self.forced = True
+
+        if self.readable():
+            # no-op to flush on read-mode
+            return
+
+        if not force and self.buffer.tell() < self.blocksize:
+            # Defer write on small block
+            return
+
+        if self.offset is None:
+            # Initialize a multipart upload
+            self.offset = 0
+            try:
+                self._initiate_upload()
+            except:
+                self.closed = True
+                raise
+
+        if self._upload_chunk(final=force) is not False:
+            self.offset += self.buffer.seek(0, 2)
+            self.buffer = io.BytesIO()
+
+    def _upload_chunk(self, final=False):
+        """Write one part of a multi-block file upload
+
+        Parameters
+        ==========
+        final: bool
+            This is the last block, so should complete file, if
+            self.autocommit is True.
+        """
+        # may not yet have been initialized, may need to call _initialize_upload
+
+    def _initiate_upload(self):
+        """Create remote file/upload"""
+        pass
+
+    def _fetch_range(self, start, end):
+        """Get the specified set of bytes from remote"""
+        return self.fs.cat_file(self.path, start=start, end=end)
+
+    def read(self, length=-1):
+        """
+        Return data from cache, or fetch pieces as necessary
+
+        Parameters
+        ----------
+        length: int (-1)
+            Number of bytes to read; if <0, all remaining bytes.
+        """
+        length = -1 if length is None else int(length)
+        if self.mode != "rb":
+            raise ValueError("File not in read mode")
+        if length < 0:
+            length = self.size - self.loc
+        if self.closed:
+            raise ValueError("I/O operation on closed file.")
+        if length == 0:
+            # don't even bother calling fetch
+            return b""
+        out = self.cache._fetch(self.loc, self.loc + length)
+
+        logger.debug(
+            "%s read: %i - %i %s",
+            self,
+            self.loc,
+            self.loc + length,
+            self.cache._log_stats(),
+        )
+        self.loc += len(out)
+        return out
+
+    def readinto(self, b):
+        """mirrors builtin file's readinto method
+
+        https://docs.python.org/3/library/io.html#io.RawIOBase.readinto
+        """
+        out = memoryview(b).cast("B")
+        data = self.read(out.nbytes)
+        out[: len(data)] = data
+        return len(data)
+
+    def readuntil(self, char=b"\n", blocks=None):
+        """Return data between current position and first occurrence of char
+
+        char is included in the output, except if the end of the tile is
+        encountered first.
+
+        Parameters
+        ----------
+        char: bytes
+            Thing to find
+        blocks: None or int
+            How much to read in each go. Defaults to file blocksize - which may
+            mean a new read on every call.
+        """
+        out = []
+        while True:
+            start = self.tell()
+            part = self.read(blocks or self.blocksize)
+            if len(part) == 0:
+                break
+            found = part.find(char)
+            if found > -1:
+                out.append(part[: found + len(char)])
+                self.seek(start + found + len(char))
+                break
+            out.append(part)
+        return b"".join(out)
+
+    def readline(self):
+        """Read until first occurrence of newline character
+
+        Note that, because of character encoding, this is not necessarily a
+        true line ending.
+        """
+        return self.readuntil(b"\n")
+
+    def __next__(self):
+        out = self.readline()
+        if out:
+            return out
+        raise StopIteration
+
+    def __iter__(self):
+        return self
+
+    def readlines(self):
+        """Return all data, split by the newline character"""
+        data = self.read()
+        lines = data.split(b"\n")
+        out = [l + b"\n" for l in lines[:-1]]
+        if data.endswith(b"\n"):
+            return out
+        else:
+            return out + [lines[-1]]
+        # return list(self)  ???
+
+    def readinto1(self, b):
+        return self.readinto(b)
+
+    def close(self):
+        """Close file
+
+        Finalizes writes, discards cache
+        """
+        if getattr(self, "_unclosable", False):
+            return
+        if self.closed:
+            return
+        try:
+            if self.mode == "rb":
+                self.cache = None
+            else:
+                if not self.forced:
+                    self.flush(force=True)
+
+                if self.fs is not None:
+                    self.fs.invalidate_cache(self.path)
+                    self.fs.invalidate_cache(self.fs._parent(self.path))
+        finally:
+            self.closed = True
+
+    def readable(self):
+        """Whether opened for reading"""
+        return "r" in self.mode and not self.closed
+
+    def seekable(self):
+        """Whether is seekable (only in read mode)"""
+        return self.readable()
+
+    def writable(self):
+        """Whether opened for writing"""
+        return self.mode in {"wb", "ab", "xb"} and not self.closed
+
+    def __reduce__(self):
+        if self.mode != "rb":
+            raise RuntimeError("Pickling a writeable file is not supported")
+
+        return reopen, (
+            self.fs,
+            self.path,
+            self.mode,
+            self.blocksize,
+            self.loc,
+            self.size,
+            self.autocommit,
+            self.cache.name if self.cache else "none",
+            self.kwargs,
+        )
+
+    def __del__(self):
+        if not self.closed:
+            self.close()
+
+    def __str__(self):
+        return f"<File-like object {type(self.fs).__name__}, {self.path}>"
+
+    __repr__ = __str__
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        self.close()
+
+
+def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs):
+    file = fs.open(
+        path,
+        mode=mode,
+        block_size=blocksize,
+        autocommit=autocommit,
+        cache_type=cache_type,
+        size=size,
+        **kwargs,
+    )
+    if loc > 0:
+        file.seek(loc)
+    return file