aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/fsspec/implementations/cached.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/cached.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/implementations/cached.py929
1 files changed, 929 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/cached.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/cached.py
new file mode 100644
index 00000000..8b8b8329
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/cached.py
@@ -0,0 +1,929 @@
+from __future__ import annotations
+
+import inspect
+import logging
+import os
+import tempfile
+import time
+import weakref
+from shutil import rmtree
+from typing import TYPE_CHECKING, Any, Callable, ClassVar
+
+from fsspec import AbstractFileSystem, filesystem
+from fsspec.callbacks import DEFAULT_CALLBACK
+from fsspec.compression import compr
+from fsspec.core import BaseCache, MMapCache
+from fsspec.exceptions import BlocksizeMismatchError
+from fsspec.implementations.cache_mapper import create_cache_mapper
+from fsspec.implementations.cache_metadata import CacheMetadata
+from fsspec.spec import AbstractBufferedFile
+from fsspec.transaction import Transaction
+from fsspec.utils import infer_compression
+
+if TYPE_CHECKING:
+ from fsspec.implementations.cache_mapper import AbstractCacheMapper
+
+logger = logging.getLogger("fsspec.cached")
+
+
+class WriteCachedTransaction(Transaction):
+ def complete(self, commit=True):
+ rpaths = [f.path for f in self.files]
+ lpaths = [f.fn for f in self.files]
+ if commit:
+ self.fs.put(lpaths, rpaths)
+ self.files.clear()
+ self.fs._intrans = False
+ self.fs._transaction = None
+ self.fs = None # break cycle
+
+
+class CachingFileSystem(AbstractFileSystem):
+ """Locally caching filesystem, layer over any other FS
+
+ This class implements chunk-wise local storage of remote files, for quick
+ access after the initial download. The files are stored in a given
+ directory with hashes of URLs for the filenames. If no directory is given,
+ a temporary one is used, which should be cleaned up by the OS after the
+ process ends. The files themselves are sparse (as implemented in
+ :class:`~fsspec.caching.MMapCache`), so only the data which is accessed
+ takes up space.
+
+ Restrictions:
+
+ - the block-size must be the same for each access of a given file, unless
+ all blocks of the file have already been read
+ - caching can only be applied to file-systems which produce files
+ derived from fsspec.spec.AbstractBufferedFile ; LocalFileSystem is also
+ allowed, for testing
+ """
+
+ protocol: ClassVar[str | tuple[str, ...]] = ("blockcache", "cached")
+
+ def __init__(
+ self,
+ target_protocol=None,
+ cache_storage="TMP",
+ cache_check=10,
+ check_files=False,
+ expiry_time=604800,
+ target_options=None,
+ fs=None,
+ same_names: bool | None = None,
+ compression=None,
+ cache_mapper: AbstractCacheMapper | None = None,
+ **kwargs,
+ ):
+ """
+
+ Parameters
+ ----------
+ target_protocol: str (optional)
+ Target filesystem protocol. Provide either this or ``fs``.
+ cache_storage: str or list(str)
+ Location to store files. If "TMP", this is a temporary directory,
+ and will be cleaned up by the OS when this process ends (or later).
+ If a list, each location will be tried in the order given, but
+ only the last will be considered writable.
+ cache_check: int
+ Number of seconds between reload of cache metadata
+ check_files: bool
+ Whether to explicitly see if the UID of the remote file matches
+ the stored one before using. Warning: some file systems such as
+ HTTP cannot reliably give a unique hash of the contents of some
+ path, so be sure to set this option to False.
+ expiry_time: int
+ The time in seconds after which a local copy is considered useless.
+ Set to falsy to prevent expiry. The default is equivalent to one
+ week.
+ target_options: dict or None
+ Passed to the instantiation of the FS, if fs is None.
+ fs: filesystem instance
+ The target filesystem to run against. Provide this or ``protocol``.
+ same_names: bool (optional)
+ By default, target URLs are hashed using a ``HashCacheMapper`` so
+ that files from different backends with the same basename do not
+ conflict. If this argument is ``true``, a ``BasenameCacheMapper``
+ is used instead. Other cache mapper options are available by using
+ the ``cache_mapper`` keyword argument. Only one of this and
+ ``cache_mapper`` should be specified.
+ compression: str (optional)
+ To decompress on download. Can be 'infer' (guess from the URL name),
+ one of the entries in ``fsspec.compression.compr``, or None for no
+ decompression.
+ cache_mapper: AbstractCacheMapper (optional)
+ The object use to map from original filenames to cached filenames.
+ Only one of this and ``same_names`` should be specified.
+ """
+ super().__init__(**kwargs)
+ if fs is None and target_protocol is None:
+ raise ValueError(
+ "Please provide filesystem instance(fs) or target_protocol"
+ )
+ if not (fs is None) ^ (target_protocol is None):
+ raise ValueError(
+ "Both filesystems (fs) and target_protocol may not be both given."
+ )
+ if cache_storage == "TMP":
+ tempdir = tempfile.mkdtemp()
+ storage = [tempdir]
+ weakref.finalize(self, self._remove_tempdir, tempdir)
+ else:
+ if isinstance(cache_storage, str):
+ storage = [cache_storage]
+ else:
+ storage = cache_storage
+ os.makedirs(storage[-1], exist_ok=True)
+ self.storage = storage
+ self.kwargs = target_options or {}
+ self.cache_check = cache_check
+ self.check_files = check_files
+ self.expiry = expiry_time
+ self.compression = compression
+
+ # Size of cache in bytes. If None then the size is unknown and will be
+ # recalculated the next time cache_size() is called. On writes to the
+ # cache this is reset to None.
+ self._cache_size = None
+
+ if same_names is not None and cache_mapper is not None:
+ raise ValueError(
+ "Cannot specify both same_names and cache_mapper in "
+ "CachingFileSystem.__init__"
+ )
+ if cache_mapper is not None:
+ self._mapper = cache_mapper
+ else:
+ self._mapper = create_cache_mapper(
+ same_names if same_names is not None else False
+ )
+
+ self.target_protocol = (
+ target_protocol
+ if isinstance(target_protocol, str)
+ else (fs.protocol if isinstance(fs.protocol, str) else fs.protocol[0])
+ )
+ self._metadata = CacheMetadata(self.storage)
+ self.load_cache()
+ self.fs = fs if fs is not None else filesystem(target_protocol, **self.kwargs)
+
+ def _strip_protocol(path):
+ # acts as a method, since each instance has a difference target
+ return self.fs._strip_protocol(type(self)._strip_protocol(path))
+
+ self._strip_protocol: Callable = _strip_protocol
+
+ @staticmethod
+ def _remove_tempdir(tempdir):
+ try:
+ rmtree(tempdir)
+ except Exception:
+ pass
+
+ def _mkcache(self):
+ os.makedirs(self.storage[-1], exist_ok=True)
+
+ def cache_size(self):
+ """Return size of cache in bytes.
+
+ If more than one cache directory is in use, only the size of the last
+ one (the writable cache directory) is returned.
+ """
+ if self._cache_size is None:
+ cache_dir = self.storage[-1]
+ self._cache_size = filesystem("file").du(cache_dir, withdirs=True)
+ return self._cache_size
+
+ def load_cache(self):
+ """Read set of stored blocks from file"""
+ self._metadata.load()
+ self._mkcache()
+ self.last_cache = time.time()
+
+ def save_cache(self):
+ """Save set of stored blocks from file"""
+ self._mkcache()
+ self._metadata.save()
+ self.last_cache = time.time()
+ self._cache_size = None
+
+ def _check_cache(self):
+ """Reload caches if time elapsed or any disappeared"""
+ self._mkcache()
+ if not self.cache_check:
+ # explicitly told not to bother checking
+ return
+ timecond = time.time() - self.last_cache > self.cache_check
+ existcond = all(os.path.exists(storage) for storage in self.storage)
+ if timecond or not existcond:
+ self.load_cache()
+
+ def _check_file(self, path):
+ """Is path in cache and still valid"""
+ path = self._strip_protocol(path)
+ self._check_cache()
+ return self._metadata.check_file(path, self)
+
+ def clear_cache(self):
+ """Remove all files and metadata from the cache
+
+ In the case of multiple cache locations, this clears only the last one,
+ which is assumed to be the read/write one.
+ """
+ rmtree(self.storage[-1])
+ self.load_cache()
+ self._cache_size = None
+
+ def clear_expired_cache(self, expiry_time=None):
+ """Remove all expired files and metadata from the cache
+
+ In the case of multiple cache locations, this clears only the last one,
+ which is assumed to be the read/write one.
+
+ Parameters
+ ----------
+ expiry_time: int
+ The time in seconds after which a local copy is considered useless.
+ If not defined the default is equivalent to the attribute from the
+ file caching instantiation.
+ """
+
+ if not expiry_time:
+ expiry_time = self.expiry
+
+ self._check_cache()
+
+ expired_files, writable_cache_empty = self._metadata.clear_expired(expiry_time)
+ for fn in expired_files:
+ if os.path.exists(fn):
+ os.remove(fn)
+
+ if writable_cache_empty:
+ rmtree(self.storage[-1])
+ self.load_cache()
+
+ self._cache_size = None
+
+ def pop_from_cache(self, path):
+ """Remove cached version of given file
+
+ Deletes local copy of the given (remote) path. If it is found in a cache
+ location which is not the last, it is assumed to be read-only, and
+ raises PermissionError
+ """
+ path = self._strip_protocol(path)
+ fn = self._metadata.pop_file(path)
+ if fn is not None:
+ os.remove(fn)
+ self._cache_size = None
+
+ def _open(
+ self,
+ path,
+ mode="rb",
+ block_size=None,
+ autocommit=True,
+ cache_options=None,
+ **kwargs,
+ ):
+ """Wrap the target _open
+
+ If the whole file exists in the cache, just open it locally and
+ return that.
+
+ Otherwise, open the file on the target FS, and make it have a mmap
+ cache pointing to the location which we determine, in our cache.
+ The ``blocks`` instance is shared, so as the mmap cache instance
+ updates, so does the entry in our ``cached_files`` attribute.
+ We monkey-patch this file, so that when it closes, we call
+ ``close_and_update`` to save the state of the blocks.
+ """
+ path = self._strip_protocol(path)
+
+ path = self.fs._strip_protocol(path)
+ if "r" not in mode:
+ return self.fs._open(
+ path,
+ mode=mode,
+ block_size=block_size,
+ autocommit=autocommit,
+ cache_options=cache_options,
+ **kwargs,
+ )
+ detail = self._check_file(path)
+ if detail:
+ # file is in cache
+ detail, fn = detail
+ hash, blocks = detail["fn"], detail["blocks"]
+ if blocks is True:
+ # stored file is complete
+ logger.debug("Opening local copy of %s", path)
+ return open(fn, mode)
+ # TODO: action where partial file exists in read-only cache
+ logger.debug("Opening partially cached copy of %s", path)
+ else:
+ hash = self._mapper(path)
+ fn = os.path.join(self.storage[-1], hash)
+ blocks = set()
+ detail = {
+ "original": path,
+ "fn": hash,
+ "blocks": blocks,
+ "time": time.time(),
+ "uid": self.fs.ukey(path),
+ }
+ self._metadata.update_file(path, detail)
+ logger.debug("Creating local sparse file for %s", path)
+
+ # call target filesystems open
+ self._mkcache()
+ f = self.fs._open(
+ path,
+ mode=mode,
+ block_size=block_size,
+ autocommit=autocommit,
+ cache_options=cache_options,
+ cache_type="none",
+ **kwargs,
+ )
+ if self.compression:
+ comp = (
+ infer_compression(path)
+ if self.compression == "infer"
+ else self.compression
+ )
+ f = compr[comp](f, mode="rb")
+ if "blocksize" in detail:
+ if detail["blocksize"] != f.blocksize:
+ raise BlocksizeMismatchError(
+ f"Cached file must be reopened with same block"
+ f" size as original (old: {detail['blocksize']},"
+ f" new {f.blocksize})"
+ )
+ else:
+ detail["blocksize"] = f.blocksize
+ f.cache = MMapCache(f.blocksize, f._fetch_range, f.size, fn, blocks)
+ close = f.close
+ f.close = lambda: self.close_and_update(f, close)
+ self.save_cache()
+ return f
+
+ def _parent(self, path):
+ return self.fs._parent(path)
+
+ def hash_name(self, path: str, *args: Any) -> str:
+ # Kept for backward compatibility with downstream libraries.
+ # Ignores extra arguments, previously same_name boolean.
+ return self._mapper(path)
+
+ def close_and_update(self, f, close):
+ """Called when a file is closing, so store the set of blocks"""
+ if f.closed:
+ return
+ path = self._strip_protocol(f.path)
+ self._metadata.on_close_cached_file(f, path)
+ try:
+ logger.debug("going to save")
+ self.save_cache()
+ logger.debug("saved")
+ except OSError:
+ logger.debug("Cache saving failed while closing file")
+ except NameError:
+ logger.debug("Cache save failed due to interpreter shutdown")
+ close()
+ f.closed = True
+
+ def ls(self, path, detail=True):
+ return self.fs.ls(path, detail)
+
+ def __getattribute__(self, item):
+ if item in {
+ "load_cache",
+ "_open",
+ "save_cache",
+ "close_and_update",
+ "__init__",
+ "__getattribute__",
+ "__reduce__",
+ "_make_local_details",
+ "open",
+ "cat",
+ "cat_file",
+ "cat_ranges",
+ "get",
+ "read_block",
+ "tail",
+ "head",
+ "info",
+ "ls",
+ "exists",
+ "isfile",
+ "isdir",
+ "_check_file",
+ "_check_cache",
+ "_mkcache",
+ "clear_cache",
+ "clear_expired_cache",
+ "pop_from_cache",
+ "local_file",
+ "_paths_from_path",
+ "get_mapper",
+ "open_many",
+ "commit_many",
+ "hash_name",
+ "__hash__",
+ "__eq__",
+ "to_json",
+ "to_dict",
+ "cache_size",
+ "pipe_file",
+ "pipe",
+ "start_transaction",
+ "end_transaction",
+ }:
+ # all the methods defined in this class. Note `open` here, since
+ # it calls `_open`, but is actually in superclass
+ return lambda *args, **kw: getattr(type(self), item).__get__(self)(
+ *args, **kw
+ )
+ if item in ["__reduce_ex__"]:
+ raise AttributeError
+ if item in ["transaction"]:
+ # property
+ return type(self).transaction.__get__(self)
+ if item in ["_cache", "transaction_type"]:
+ # class attributes
+ return getattr(type(self), item)
+ if item == "__class__":
+ return type(self)
+ d = object.__getattribute__(self, "__dict__")
+ fs = d.get("fs", None) # fs is not immediately defined
+ if item in d:
+ return d[item]
+ elif fs is not None:
+ if item in fs.__dict__:
+ # attribute of instance
+ return fs.__dict__[item]
+ # attributed belonging to the target filesystem
+ cls = type(fs)
+ m = getattr(cls, item)
+ if (inspect.isfunction(m) or inspect.isdatadescriptor(m)) and (
+ not hasattr(m, "__self__") or m.__self__ is None
+ ):
+ # instance method
+ return m.__get__(fs, cls)
+ return m # class method or attribute
+ else:
+ # attributes of the superclass, while target is being set up
+ return super().__getattribute__(item)
+
+ def __eq__(self, other):
+ """Test for equality."""
+ if self is other:
+ return True
+ if not isinstance(other, type(self)):
+ return False
+ return (
+ self.storage == other.storage
+ and self.kwargs == other.kwargs
+ and self.cache_check == other.cache_check
+ and self.check_files == other.check_files
+ and self.expiry == other.expiry
+ and self.compression == other.compression
+ and self._mapper == other._mapper
+ and self.target_protocol == other.target_protocol
+ )
+
+ def __hash__(self):
+ """Calculate hash."""
+ return (
+ hash(tuple(self.storage))
+ ^ hash(str(self.kwargs))
+ ^ hash(self.cache_check)
+ ^ hash(self.check_files)
+ ^ hash(self.expiry)
+ ^ hash(self.compression)
+ ^ hash(self._mapper)
+ ^ hash(self.target_protocol)
+ )
+
+
+class WholeFileCacheFileSystem(CachingFileSystem):
+ """Caches whole remote files on first access
+
+ This class is intended as a layer over any other file system, and
+ will make a local copy of each file accessed, so that all subsequent
+ reads are local. This is similar to ``CachingFileSystem``, but without
+ the block-wise functionality and so can work even when sparse files
+ are not allowed. See its docstring for definition of the init
+ arguments.
+
+ The class still needs access to the remote store for listing files,
+ and may refresh cached files.
+ """
+
+ protocol = "filecache"
+ local_file = True
+
+ def open_many(self, open_files, **kwargs):
+ paths = [of.path for of in open_files]
+ if "r" in open_files.mode:
+ self._mkcache()
+ else:
+ return [
+ LocalTempFile(
+ self.fs,
+ path,
+ mode=open_files.mode,
+ fn=os.path.join(self.storage[-1], self._mapper(path)),
+ **kwargs,
+ )
+ for path in paths
+ ]
+
+ if self.compression:
+ raise NotImplementedError
+ details = [self._check_file(sp) for sp in paths]
+ downpath = [p for p, d in zip(paths, details) if not d]
+ downfn0 = [
+ os.path.join(self.storage[-1], self._mapper(p))
+ for p, d in zip(paths, details)
+ ] # keep these path names for opening later
+ downfn = [fn for fn, d in zip(downfn0, details) if not d]
+ if downpath:
+ # skip if all files are already cached and up to date
+ self.fs.get(downpath, downfn)
+
+ # update metadata - only happens when downloads are successful
+ newdetail = [
+ {
+ "original": path,
+ "fn": self._mapper(path),
+ "blocks": True,
+ "time": time.time(),
+ "uid": self.fs.ukey(path),
+ }
+ for path in downpath
+ ]
+ for path, detail in zip(downpath, newdetail):
+ self._metadata.update_file(path, detail)
+ self.save_cache()
+
+ def firstpart(fn):
+ # helper to adapt both whole-file and simple-cache
+ return fn[1] if isinstance(fn, tuple) else fn
+
+ return [
+ open(firstpart(fn0) if fn0 else fn1, mode=open_files.mode)
+ for fn0, fn1 in zip(details, downfn0)
+ ]
+
+ def commit_many(self, open_files):
+ self.fs.put([f.fn for f in open_files], [f.path for f in open_files])
+ [f.close() for f in open_files]
+ for f in open_files:
+ # in case autocommit is off, and so close did not already delete
+ try:
+ os.remove(f.name)
+ except FileNotFoundError:
+ pass
+ self._cache_size = None
+
+ def _make_local_details(self, path):
+ hash = self._mapper(path)
+ fn = os.path.join(self.storage[-1], hash)
+ detail = {
+ "original": path,
+ "fn": hash,
+ "blocks": True,
+ "time": time.time(),
+ "uid": self.fs.ukey(path),
+ }
+ self._metadata.update_file(path, detail)
+ logger.debug("Copying %s to local cache", path)
+ return fn
+
+ def cat(
+ self,
+ path,
+ recursive=False,
+ on_error="raise",
+ callback=DEFAULT_CALLBACK,
+ **kwargs,
+ ):
+ paths = self.expand_path(
+ path, recursive=recursive, maxdepth=kwargs.get("maxdepth")
+ )
+ getpaths = []
+ storepaths = []
+ fns = []
+ out = {}
+ for p in paths.copy():
+ try:
+ detail = self._check_file(p)
+ if not detail:
+ fn = self._make_local_details(p)
+ getpaths.append(p)
+ storepaths.append(fn)
+ else:
+ detail, fn = detail if isinstance(detail, tuple) else (None, detail)
+ fns.append(fn)
+ except Exception as e:
+ if on_error == "raise":
+ raise
+ if on_error == "return":
+ out[p] = e
+ paths.remove(p)
+
+ if getpaths:
+ self.fs.get(getpaths, storepaths)
+ self.save_cache()
+
+ callback.set_size(len(paths))
+ for p, fn in zip(paths, fns):
+ with open(fn, "rb") as f:
+ out[p] = f.read()
+ callback.relative_update(1)
+ if isinstance(path, str) and len(paths) == 1 and recursive is False:
+ out = out[paths[0]]
+ return out
+
+ def _open(self, path, mode="rb", **kwargs):
+ path = self._strip_protocol(path)
+ if "r" not in mode:
+ hash = self._mapper(path)
+ fn = os.path.join(self.storage[-1], hash)
+ user_specified_kwargs = {
+ k: v
+ for k, v in kwargs.items()
+ # those kwargs were added by open(), we don't want them
+ if k not in ["autocommit", "block_size", "cache_options"]
+ }
+ return LocalTempFile(self, path, mode=mode, fn=fn, **user_specified_kwargs)
+ detail = self._check_file(path)
+ if detail:
+ detail, fn = detail
+ _, blocks = detail["fn"], detail["blocks"]
+ if blocks is True:
+ logger.debug("Opening local copy of %s", path)
+
+ # In order to support downstream filesystems to be able to
+ # infer the compression from the original filename, like
+ # the `TarFileSystem`, let's extend the `io.BufferedReader`
+ # fileobject protocol by adding a dedicated attribute
+ # `original`.
+ f = open(fn, mode)
+ f.original = detail.get("original")
+ return f
+ else:
+ raise ValueError(
+ f"Attempt to open partially cached file {path}"
+ f" as a wholly cached file"
+ )
+ else:
+ fn = self._make_local_details(path)
+ kwargs["mode"] = mode
+
+ # call target filesystems open
+ self._mkcache()
+ if self.compression:
+ with self.fs._open(path, **kwargs) as f, open(fn, "wb") as f2:
+ if isinstance(f, AbstractBufferedFile):
+ # want no type of caching if just downloading whole thing
+ f.cache = BaseCache(0, f.cache.fetcher, f.size)
+ comp = (
+ infer_compression(path)
+ if self.compression == "infer"
+ else self.compression
+ )
+ f = compr[comp](f, mode="rb")
+ data = True
+ while data:
+ block = getattr(f, "blocksize", 5 * 2**20)
+ data = f.read(block)
+ f2.write(data)
+ else:
+ self.fs.get_file(path, fn)
+ self.save_cache()
+ return self._open(path, mode)
+
+
+class SimpleCacheFileSystem(WholeFileCacheFileSystem):
+ """Caches whole remote files on first access
+
+ This class is intended as a layer over any other file system, and
+ will make a local copy of each file accessed, so that all subsequent
+ reads are local. This implementation only copies whole files, and
+ does not keep any metadata about the download time or file details.
+ It is therefore safer to use in multi-threaded/concurrent situations.
+
+ This is the only of the caching filesystems that supports write: you will
+ be given a real local open file, and upon close and commit, it will be
+ uploaded to the target filesystem; the writability or the target URL is
+ not checked until that time.
+
+ """
+
+ protocol = "simplecache"
+ local_file = True
+ transaction_type = WriteCachedTransaction
+
+ def __init__(self, **kwargs):
+ kw = kwargs.copy()
+ for key in ["cache_check", "expiry_time", "check_files"]:
+ kw[key] = False
+ super().__init__(**kw)
+ for storage in self.storage:
+ if not os.path.exists(storage):
+ os.makedirs(storage, exist_ok=True)
+
+ def _check_file(self, path):
+ self._check_cache()
+ sha = self._mapper(path)
+ for storage in self.storage:
+ fn = os.path.join(storage, sha)
+ if os.path.exists(fn):
+ return fn
+
+ def save_cache(self):
+ pass
+
+ def load_cache(self):
+ pass
+
+ def pipe_file(self, path, value=None, **kwargs):
+ if self._intrans:
+ with self.open(path, "wb") as f:
+ f.write(value)
+ else:
+ super().pipe_file(path, value)
+
+ def ls(self, path, detail=True, **kwargs):
+ path = self._strip_protocol(path)
+ details = []
+ try:
+ details = self.fs.ls(
+ path, detail=True, **kwargs
+ ).copy() # don't edit original!
+ except FileNotFoundError as e:
+ ex = e
+ else:
+ ex = None
+ if self._intrans:
+ path1 = path.rstrip("/") + "/"
+ for f in self.transaction.files:
+ if f.path == path:
+ details.append(
+ {"name": path, "size": f.size or f.tell(), "type": "file"}
+ )
+ elif f.path.startswith(path1):
+ if f.path.count("/") == path1.count("/"):
+ details.append(
+ {"name": f.path, "size": f.size or f.tell(), "type": "file"}
+ )
+ else:
+ dname = "/".join(f.path.split("/")[: path1.count("/") + 1])
+ details.append({"name": dname, "size": 0, "type": "directory"})
+ if ex is not None and not details:
+ raise ex
+ if detail:
+ return details
+ return sorted(_["name"] for _ in details)
+
+ def info(self, path, **kwargs):
+ path = self._strip_protocol(path)
+ if self._intrans:
+ f = [_ for _ in self.transaction.files if _.path == path]
+ if f:
+ size = os.path.getsize(f[0].fn) if f[0].closed else f[0].tell()
+ return {"name": path, "size": size, "type": "file"}
+ f = any(_.path.startswith(path + "/") for _ in self.transaction.files)
+ if f:
+ return {"name": path, "size": 0, "type": "directory"}
+ return self.fs.info(path, **kwargs)
+
+ def pipe(self, path, value=None, **kwargs):
+ if isinstance(path, str):
+ self.pipe_file(self._strip_protocol(path), value, **kwargs)
+ elif isinstance(path, dict):
+ for k, v in path.items():
+ self.pipe_file(self._strip_protocol(k), v, **kwargs)
+ else:
+ raise ValueError("path must be str or dict")
+
+ def cat_ranges(
+ self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
+ ):
+ lpaths = [self._check_file(p) for p in paths]
+ rpaths = [p for l, p in zip(lpaths, paths) if l is False]
+ lpaths = [l for l, p in zip(lpaths, paths) if l is False]
+ self.fs.get(rpaths, lpaths)
+ return super().cat_ranges(
+ paths, starts, ends, max_gap=max_gap, on_error=on_error, **kwargs
+ )
+
+ def _open(self, path, mode="rb", **kwargs):
+ path = self._strip_protocol(path)
+ sha = self._mapper(path)
+
+ if "r" not in mode:
+ fn = os.path.join(self.storage[-1], sha)
+ user_specified_kwargs = {
+ k: v
+ for k, v in kwargs.items()
+ if k not in ["autocommit", "block_size", "cache_options"]
+ } # those were added by open()
+ return LocalTempFile(
+ self,
+ path,
+ mode=mode,
+ autocommit=not self._intrans,
+ fn=fn,
+ **user_specified_kwargs,
+ )
+ fn = self._check_file(path)
+ if fn:
+ return open(fn, mode)
+
+ fn = os.path.join(self.storage[-1], sha)
+ logger.debug("Copying %s to local cache", path)
+ kwargs["mode"] = mode
+
+ self._mkcache()
+ self._cache_size = None
+ if self.compression:
+ with self.fs._open(path, **kwargs) as f, open(fn, "wb") as f2:
+ if isinstance(f, AbstractBufferedFile):
+ # want no type of caching if just downloading whole thing
+ f.cache = BaseCache(0, f.cache.fetcher, f.size)
+ comp = (
+ infer_compression(path)
+ if self.compression == "infer"
+ else self.compression
+ )
+ f = compr[comp](f, mode="rb")
+ data = True
+ while data:
+ block = getattr(f, "blocksize", 5 * 2**20)
+ data = f.read(block)
+ f2.write(data)
+ else:
+ self.fs.get_file(path, fn)
+ return self._open(path, mode)
+
+
+class LocalTempFile:
+ """A temporary local file, which will be uploaded on commit"""
+
+ def __init__(self, fs, path, fn, mode="wb", autocommit=True, seek=0, **kwargs):
+ self.fn = fn
+ self.fh = open(fn, mode)
+ self.mode = mode
+ if seek:
+ self.fh.seek(seek)
+ self.path = path
+ self.size = None
+ self.fs = fs
+ self.closed = False
+ self.autocommit = autocommit
+ self.kwargs = kwargs
+
+ def __reduce__(self):
+ # always open in r+b to allow continuing writing at a location
+ return (
+ LocalTempFile,
+ (self.fs, self.path, self.fn, "r+b", self.autocommit, self.tell()),
+ )
+
+ def __enter__(self):
+ return self.fh
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+
+ def close(self):
+ # self.size = self.fh.tell()
+ if self.closed:
+ return
+ self.fh.close()
+ self.closed = True
+ if self.autocommit:
+ self.commit()
+
+ def discard(self):
+ self.fh.close()
+ os.remove(self.fn)
+
+ def commit(self):
+ self.fs.put(self.fn, self.path, **self.kwargs)
+ # we do not delete local copy - it's still in the cache
+
+ @property
+ def name(self):
+ return self.fn
+
+ def __repr__(self) -> str:
+ return f"LocalTempFile: {self.path}"
+
+ def __getattr__(self, item):
+ return getattr(self.fh, item)