diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/spec.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/fsspec/spec.py | 2242 |
1 files changed, 2242 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/spec.py b/.venv/lib/python3.12/site-packages/fsspec/spec.py new file mode 100644 index 00000000..7daf850e --- /dev/null +++ b/.venv/lib/python3.12/site-packages/fsspec/spec.py @@ -0,0 +1,2242 @@ +from __future__ import annotations + +import io +import json +import logging +import os +import threading +import warnings +import weakref +from errno import ESPIPE +from glob import has_magic +from hashlib import sha256 +from typing import Any, ClassVar + +from .callbacks import DEFAULT_CALLBACK +from .config import apply_config, conf +from .dircache import DirCache +from .transaction import Transaction +from .utils import ( + _unstrip_protocol, + glob_translate, + isfilelike, + other_paths, + read_block, + stringify_path, + tokenize, +) + +logger = logging.getLogger("fsspec") + + +def make_instance(cls, args, kwargs): + return cls(*args, **kwargs) + + +class _Cached(type): + """ + Metaclass for caching file system instances. + + Notes + ----- + Instances are cached according to + + * The values of the class attributes listed in `_extra_tokenize_attributes` + * The arguments passed to ``__init__``. + + This creates an additional reference to the filesystem, which prevents the + filesystem from being garbage collected when all *user* references go away. + A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also* + be made for a filesystem instance to be garbage collected. + """ + + def __init__(cls, *args, **kwargs): + super().__init__(*args, **kwargs) + # Note: we intentionally create a reference here, to avoid garbage + # collecting instances when all other references are gone. To really + # delete a FileSystem, the cache must be cleared. + if conf.get("weakref_instance_cache"): # pragma: no cover + # debug option for analysing fork/spawn conditions + cls._cache = weakref.WeakValueDictionary() + else: + cls._cache = {} + cls._pid = os.getpid() + + def __call__(cls, *args, **kwargs): + kwargs = apply_config(cls, kwargs) + extra_tokens = tuple( + getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes + ) + token = tokenize( + cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs + ) + skip = kwargs.pop("skip_instance_cache", False) + if os.getpid() != cls._pid: + cls._cache.clear() + cls._pid = os.getpid() + if not skip and cls.cachable and token in cls._cache: + cls._latest = token + return cls._cache[token] + else: + obj = super().__call__(*args, **kwargs) + # Setting _fs_token here causes some static linters to complain. + obj._fs_token_ = token + obj.storage_args = args + obj.storage_options = kwargs + if obj.async_impl and obj.mirror_sync_methods: + from .asyn import mirror_sync_methods + + mirror_sync_methods(obj) + + if cls.cachable and not skip: + cls._latest = token + cls._cache[token] = obj + return obj + + +class AbstractFileSystem(metaclass=_Cached): + """ + An abstract super-class for pythonic file-systems + + Implementations are expected to be compatible with or, better, subclass + from here. + """ + + cachable = True # this class can be cached, instances reused + _cached = False + blocksize = 2**22 + sep = "/" + protocol: ClassVar[str | tuple[str, ...]] = "abstract" + _latest = None + async_impl = False + mirror_sync_methods = False + root_marker = "" # For some FSs, may require leading '/' or other character + transaction_type = Transaction + + #: Extra *class attributes* that should be considered when hashing. + _extra_tokenize_attributes = () + + # Set by _Cached metaclass + storage_args: tuple[Any, ...] + storage_options: dict[str, Any] + + def __init__(self, *args, **storage_options): + """Create and configure file-system instance + + Instances may be cachable, so if similar enough arguments are seen + a new instance is not required. The token attribute exists to allow + implementations to cache instances if they wish. + + A reasonable default should be provided if there are no arguments. + + Subclasses should call this method. + + Parameters + ---------- + use_listings_cache, listings_expiry_time, max_paths: + passed to ``DirCache``, if the implementation supports + directory listing caching. Pass use_listings_cache=False + to disable such caching. + skip_instance_cache: bool + If this is a cachable implementation, pass True here to force + creating a new instance even if a matching instance exists, and prevent + storing this instance. + asynchronous: bool + loop: asyncio-compatible IOLoop or None + """ + if self._cached: + # reusing instance, don't change + return + self._cached = True + self._intrans = False + self._transaction = None + self._invalidated_caches_in_transaction = [] + self.dircache = DirCache(**storage_options) + + if storage_options.pop("add_docs", None): + warnings.warn("add_docs is no longer supported.", FutureWarning) + + if storage_options.pop("add_aliases", None): + warnings.warn("add_aliases has been removed.", FutureWarning) + # This is set in _Cached + self._fs_token_ = None + + @property + def fsid(self): + """Persistent filesystem id that can be used to compare filesystems + across sessions. + """ + raise NotImplementedError + + @property + def _fs_token(self): + return self._fs_token_ + + def __dask_tokenize__(self): + return self._fs_token + + def __hash__(self): + return int(self._fs_token, 16) + + def __eq__(self, other): + return isinstance(other, type(self)) and self._fs_token == other._fs_token + + def __reduce__(self): + return make_instance, (type(self), self.storage_args, self.storage_options) + + @classmethod + def _strip_protocol(cls, path): + """Turn path from fully-qualified to file-system-specific + + May require FS-specific handling, e.g., for relative paths or links. + """ + if isinstance(path, list): + return [cls._strip_protocol(p) for p in path] + path = stringify_path(path) + protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol + for protocol in protos: + if path.startswith(protocol + "://"): + path = path[len(protocol) + 3 :] + elif path.startswith(protocol + "::"): + path = path[len(protocol) + 2 :] + path = path.rstrip("/") + # use of root_marker to make minimum required path, e.g., "/" + return path or cls.root_marker + + def unstrip_protocol(self, name: str) -> str: + """Format FS-specific path to generic, including protocol""" + protos = (self.protocol,) if isinstance(self.protocol, str) else self.protocol + for protocol in protos: + if name.startswith(f"{protocol}://"): + return name + return f"{protos[0]}://{name}" + + @staticmethod + def _get_kwargs_from_urls(path): + """If kwargs can be encoded in the paths, extract them here + + This should happen before instantiation of the class; incoming paths + then should be amended to strip the options in methods. + + Examples may look like an sftp path "sftp://user@host:/my/path", where + the user and host should become kwargs and later get stripped. + """ + # by default, nothing happens + return {} + + @classmethod + def current(cls): + """Return the most recently instantiated FileSystem + + If no instance has been created, then create one with defaults + """ + if cls._latest in cls._cache: + return cls._cache[cls._latest] + return cls() + + @property + def transaction(self): + """A context within which files are committed together upon exit + + Requires the file class to implement `.commit()` and `.discard()` + for the normal and exception cases. + """ + if self._transaction is None: + self._transaction = self.transaction_type(self) + return self._transaction + + def start_transaction(self): + """Begin write transaction for deferring files, non-context version""" + self._intrans = True + self._transaction = self.transaction_type(self) + return self.transaction + + def end_transaction(self): + """Finish write transaction, non-context version""" + self.transaction.complete() + self._transaction = None + # The invalid cache must be cleared after the transaction is completed. + for path in self._invalidated_caches_in_transaction: + self.invalidate_cache(path) + self._invalidated_caches_in_transaction.clear() + + def invalidate_cache(self, path=None): + """ + Discard any cached directory information + + Parameters + ---------- + path: string or None + If None, clear all listings cached else listings at or under given + path. + """ + # Not necessary to implement invalidation mechanism, may have no cache. + # But if have, you should call this method of parent class from your + # subclass to ensure expiring caches after transacations correctly. + # See the implementation of FTPFileSystem in ftp.py + if self._intrans: + self._invalidated_caches_in_transaction.append(path) + + def mkdir(self, path, create_parents=True, **kwargs): + """ + Create directory entry at path + + For systems that don't have true directories, may create an for + this instance only and not touch the real filesystem + + Parameters + ---------- + path: str + location + create_parents: bool + if True, this is equivalent to ``makedirs`` + kwargs: + may be permissions, etc. + """ + pass # not necessary to implement, may not have directories + + def makedirs(self, path, exist_ok=False): + """Recursively make directories + + Creates directory at path and any intervening required directories. + Raises exception if, for instance, the path already exists but is a + file. + + Parameters + ---------- + path: str + leaf directory name + exist_ok: bool (False) + If False, will error if the target already exists + """ + pass # not necessary to implement, may not have directories + + def rmdir(self, path): + """Remove a directory, if empty""" + pass # not necessary to implement, may not have directories + + def ls(self, path, detail=True, **kwargs): + """List objects at path. + + This should include subdirectories and files at that location. The + difference between a file and a directory must be clear when details + are requested. + + The specific keys, or perhaps a FileInfo class, or similar, is TBD, + but must be consistent across implementations. + Must include: + + - full path to the entry (without protocol) + - size of the entry, in bytes. If the value cannot be determined, will + be ``None``. + - type of entry, "file", "directory" or other + + Additional information + may be present, appropriate to the file-system, e.g., generation, + checksum, etc. + + May use refresh=True|False to allow use of self._ls_from_cache to + check for a saved listing and avoid calling the backend. This would be + common where listing may be expensive. + + Parameters + ---------- + path: str + detail: bool + if True, gives a list of dictionaries, where each is the same as + the result of ``info(path)``. If False, gives a list of paths + (str). + kwargs: may have additional backend-specific options, such as version + information + + Returns + ------- + List of strings if detail is False, or list of directory information + dicts if detail is True. + """ + raise NotImplementedError + + def _ls_from_cache(self, path): + """Check cache for listing + + Returns listing, if found (may be empty list for a directly that exists + but contains nothing), None if not in cache. + """ + parent = self._parent(path) + try: + return self.dircache[path.rstrip("/")] + except KeyError: + pass + try: + files = [ + f + for f in self.dircache[parent] + if f["name"] == path + or (f["name"] == path.rstrip("/") and f["type"] == "directory") + ] + if len(files) == 0: + # parent dir was listed but did not contain this file + raise FileNotFoundError(path) + return files + except KeyError: + pass + + def walk(self, path, maxdepth=None, topdown=True, on_error="omit", **kwargs): + """Return all files belows path + + List all files, recursing into subdirectories; output is iterator-style, + like ``os.walk()``. For a simple list of files, ``find()`` is available. + + When topdown is True, the caller can modify the dirnames list in-place (perhaps + using del or slice assignment), and walk() will + only recurse into the subdirectories whose names remain in dirnames; + this can be used to prune the search, impose a specific order of visiting, + or even to inform walk() about directories the caller creates or renames before + it resumes walk() again. + Modifying dirnames when topdown is False has no effect. (see os.walk) + + Note that the "files" outputted will include anything that is not + a directory, such as links. + + Parameters + ---------- + path: str + Root to recurse into + maxdepth: int + Maximum recursion depth. None means limitless, but not recommended + on link-based file-systems. + topdown: bool (True) + Whether to walk the directory tree from the top downwards or from + the bottom upwards. + on_error: "omit", "raise", a callable + if omit (default), path with exception will simply be empty; + If raise, an underlying exception will be raised; + if callable, it will be called with a single OSError instance as argument + kwargs: passed to ``ls`` + """ + if maxdepth is not None and maxdepth < 1: + raise ValueError("maxdepth must be at least 1") + + path = self._strip_protocol(path) + full_dirs = {} + dirs = {} + files = {} + + detail = kwargs.pop("detail", False) + try: + listing = self.ls(path, detail=True, **kwargs) + except (FileNotFoundError, OSError) as e: + if on_error == "raise": + raise + if callable(on_error): + on_error(e) + return + + for info in listing: + # each info name must be at least [path]/part , but here + # we check also for names like [path]/part/ + pathname = info["name"].rstrip("/") + name = pathname.rsplit("/", 1)[-1] + if info["type"] == "directory" and pathname != path: + # do not include "self" path + full_dirs[name] = pathname + dirs[name] = info + elif pathname == path: + # file-like with same name as give path + files[""] = info + else: + files[name] = info + + if not detail: + dirs = list(dirs) + files = list(files) + + if topdown: + # Yield before recursion if walking top down + yield path, dirs, files + + if maxdepth is not None: + maxdepth -= 1 + if maxdepth < 1: + if not topdown: + yield path, dirs, files + return + + for d in dirs: + yield from self.walk( + full_dirs[d], + maxdepth=maxdepth, + detail=detail, + topdown=topdown, + **kwargs, + ) + + if not topdown: + # Yield after recursion if walking bottom up + yield path, dirs, files + + def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs): + """List all files below path. + + Like posix ``find`` command without conditions + + Parameters + ---------- + path : str + maxdepth: int or None + If not None, the maximum number of levels to descend + withdirs: bool + Whether to include directory paths in the output. This is True + when used by glob, but users usually only want files. + kwargs are passed to ``ls``. + """ + # TODO: allow equivalent of -name parameter + path = self._strip_protocol(path) + out = {} + + # Add the root directory if withdirs is requested + # This is needed for posix glob compliance + if withdirs and path != "" and self.isdir(path): + out[path] = self.info(path) + + for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs): + if withdirs: + files.update(dirs) + out.update({info["name"]: info for name, info in files.items()}) + if not out and self.isfile(path): + # walk works on directories, but find should also return [path] + # when path happens to be a file + out[path] = {} + names = sorted(out) + if not detail: + return names + else: + return {name: out[name] for name in names} + + def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs): + """Space used by files and optionally directories within a path + + Directory size does not include the size of its contents. + + Parameters + ---------- + path: str + total: bool + Whether to sum all the file sizes + maxdepth: int or None + Maximum number of directory levels to descend, None for unlimited. + withdirs: bool + Whether to include directory paths in the output. + kwargs: passed to ``find`` + + Returns + ------- + Dict of {path: size} if total=False, or int otherwise, where numbers + refer to bytes used. + """ + sizes = {} + if withdirs and self.isdir(path): + # Include top-level directory in output + info = self.info(path) + sizes[info["name"]] = info["size"] + for f in self.find(path, maxdepth=maxdepth, withdirs=withdirs, **kwargs): + info = self.info(f) + sizes[info["name"]] = info["size"] + if total: + return sum(sizes.values()) + else: + return sizes + + def glob(self, path, maxdepth=None, **kwargs): + """ + Find files by glob-matching. + + If the path ends with '/', only folders are returned. + + We support ``"**"``, + ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation. + + The `maxdepth` option is applied on the first `**` found in the path. + + kwargs are passed to ``ls``. + """ + if maxdepth is not None and maxdepth < 1: + raise ValueError("maxdepth must be at least 1") + + import re + + seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) + ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash + path = self._strip_protocol(path) + append_slash_to_dirname = ends_with_sep or path.endswith( + tuple(sep + "**" for sep in seps) + ) + idx_star = path.find("*") if path.find("*") >= 0 else len(path) + idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) + idx_brace = path.find("[") if path.find("[") >= 0 else len(path) + + min_idx = min(idx_star, idx_qmark, idx_brace) + + detail = kwargs.pop("detail", False) + + if not has_magic(path): + if self.exists(path, **kwargs): + if not detail: + return [path] + else: + return {path: self.info(path, **kwargs)} + else: + if not detail: + return [] # glob of non-existent returns empty + else: + return {} + elif "/" in path[:min_idx]: + min_idx = path[:min_idx].rindex("/") + root = path[: min_idx + 1] + depth = path[min_idx + 1 :].count("/") + 1 + else: + root = "" + depth = path[min_idx + 1 :].count("/") + 1 + + if "**" in path: + if maxdepth is not None: + idx_double_stars = path.find("**") + depth_double_stars = path[idx_double_stars:].count("/") + 1 + depth = depth - depth_double_stars + maxdepth + else: + depth = None + + allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs) + + pattern = glob_translate(path + ("/" if ends_with_sep else "")) + pattern = re.compile(pattern) + + out = { + p: info + for p, info in sorted(allpaths.items()) + if pattern.match( + p + "/" + if append_slash_to_dirname and info["type"] == "directory" + else p + ) + } + + if detail: + return out + else: + return list(out) + + def exists(self, path, **kwargs): + """Is there a file at the given path""" + try: + self.info(path, **kwargs) + return True + except: # noqa: E722 + # any exception allowed bar FileNotFoundError? + return False + + def lexists(self, path, **kwargs): + """If there is a file at the given path (including + broken links)""" + return self.exists(path) + + def info(self, path, **kwargs): + """Give details of entry at path + + Returns a single dictionary, with exactly the same information as ``ls`` + would with ``detail=True``. + + The default implementation calls ls and could be overridden by a + shortcut. kwargs are passed on to ```ls()``. + + Some file systems might not be able to measure the file's size, in + which case, the returned dict will include ``'size': None``. + + Returns + ------- + dict with keys: name (full path in the FS), size (in bytes), type (file, + directory, or something else) and other FS-specific keys. + """ + path = self._strip_protocol(path) + out = self.ls(self._parent(path), detail=True, **kwargs) + out = [o for o in out if o["name"].rstrip("/") == path] + if out: + return out[0] + out = self.ls(path, detail=True, **kwargs) + path = path.rstrip("/") + out1 = [o for o in out if o["name"].rstrip("/") == path] + if len(out1) == 1: + if "size" not in out1[0]: + out1[0]["size"] = None + return out1[0] + elif len(out1) > 1 or out: + return {"name": path, "size": 0, "type": "directory"} + else: + raise FileNotFoundError(path) + + def checksum(self, path): + """Unique value for current version of file + + If the checksum is the same from one moment to another, the contents + are guaranteed to be the same. If the checksum changes, the contents + *might* have changed. + + This should normally be overridden; default will probably capture + creation/modification timestamp (which would be good) or maybe + access timestamp (which would be bad) + """ + return int(tokenize(self.info(path)), 16) + + def size(self, path): + """Size in bytes of file""" + return self.info(path).get("size", None) + + def sizes(self, paths): + """Size in bytes of each file in a list of paths""" + return [self.size(p) for p in paths] + + def isdir(self, path): + """Is this entry directory-like?""" + try: + return self.info(path)["type"] == "directory" + except OSError: + return False + + def isfile(self, path): + """Is this entry file-like?""" + try: + return self.info(path)["type"] == "file" + except: # noqa: E722 + return False + + def read_text(self, path, encoding=None, errors=None, newline=None, **kwargs): + """Get the contents of the file as a string. + + Parameters + ---------- + path: str + URL of file on this filesystems + encoding, errors, newline: same as `open`. + """ + with self.open( + path, + mode="r", + encoding=encoding, + errors=errors, + newline=newline, + **kwargs, + ) as f: + return f.read() + + def write_text( + self, path, value, encoding=None, errors=None, newline=None, **kwargs + ): + """Write the text to the given file. + + An existing file will be overwritten. + + Parameters + ---------- + path: str + URL of file on this filesystems + value: str + Text to write. + encoding, errors, newline: same as `open`. + """ + with self.open( + path, + mode="w", + encoding=encoding, + errors=errors, + newline=newline, + **kwargs, + ) as f: + return f.write(value) + + def cat_file(self, path, start=None, end=None, **kwargs): + """Get the content of a file + + Parameters + ---------- + path: URL of file on this filesystems + start, end: int + Bytes limits of the read. If negative, backwards from end, + like usual python slices. Either can be None for start or + end of file, respectively + kwargs: passed to ``open()``. + """ + # explicitly set buffering off? + with self.open(path, "rb", **kwargs) as f: + if start is not None: + if start >= 0: + f.seek(start) + else: + f.seek(max(0, f.size + start)) + if end is not None: + if end < 0: + end = f.size + end + return f.read(end - f.tell()) + return f.read() + + def pipe_file(self, path, value, mode="overwrite", **kwargs): + """Set the bytes of given file""" + if mode == "create" and self.exists(path): + # non-atomic but simple way; or could use "xb" in open(), which is likely + # not as well supported + raise FileExistsError + with self.open(path, "wb", **kwargs) as f: + f.write(value) + + def pipe(self, path, value=None, **kwargs): + """Put value into path + + (counterpart to ``cat``) + + Parameters + ---------- + path: string or dict(str, bytes) + If a string, a single remote location to put ``value`` bytes; if a dict, + a mapping of {path: bytesvalue}. + value: bytes, optional + If using a single path, these are the bytes to put there. Ignored if + ``path`` is a dict + """ + if isinstance(path, str): + self.pipe_file(self._strip_protocol(path), value, **kwargs) + elif isinstance(path, dict): + for k, v in path.items(): + self.pipe_file(self._strip_protocol(k), v, **kwargs) + else: + raise ValueError("path must be str or dict") + + def cat_ranges( + self, paths, starts, ends, max_gap=None, on_error="return", **kwargs + ): + """Get the contents of byte ranges from one or more files + + Parameters + ---------- + paths: list + A list of of filepaths on this filesystems + starts, ends: int or list + Bytes limits of the read. If using a single int, the same value will be + used to read all the specified files. + """ + if max_gap is not None: + raise NotImplementedError + if not isinstance(paths, list): + raise TypeError + if not isinstance(starts, list): + starts = [starts] * len(paths) + if not isinstance(ends, list): + ends = [ends] * len(paths) + if len(starts) != len(paths) or len(ends) != len(paths): + raise ValueError + out = [] + for p, s, e in zip(paths, starts, ends): + try: + out.append(self.cat_file(p, s, e)) + except Exception as e: + if on_error == "return": + out.append(e) + else: + raise + return out + + def cat(self, path, recursive=False, on_error="raise", **kwargs): + """Fetch (potentially multiple) paths' contents + + Parameters + ---------- + recursive: bool + If True, assume the path(s) are directories, and get all the + contained files + on_error : "raise", "omit", "return" + If raise, an underlying exception will be raised (converted to KeyError + if the type is in self.missing_exceptions); if omit, keys with exception + will simply not be included in the output; if "return", all keys are + included in the output, but the value will be bytes or an exception + instance. + kwargs: passed to cat_file + + Returns + ------- + dict of {path: contents} if there are multiple paths + or the path has been otherwise expanded + """ + paths = self.expand_path(path, recursive=recursive) + if ( + len(paths) > 1 + or isinstance(path, list) + or paths[0] != self._strip_protocol(path) + ): + out = {} + for path in paths: + try: + out[path] = self.cat_file(path, **kwargs) + except Exception as e: + if on_error == "raise": + raise + if on_error == "return": + out[path] = e + return out + else: + return self.cat_file(paths[0], **kwargs) + + def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, outfile=None, **kwargs): + """Copy single remote file to local""" + from .implementations.local import LocalFileSystem + + if isfilelike(lpath): + outfile = lpath + elif self.isdir(rpath): + os.makedirs(lpath, exist_ok=True) + return None + + fs = LocalFileSystem(auto_mkdir=True) + fs.makedirs(fs._parent(lpath), exist_ok=True) + + with self.open(rpath, "rb", **kwargs) as f1: + if outfile is None: + outfile = open(lpath, "wb") + + try: + callback.set_size(getattr(f1, "size", None)) + data = True + while data: + data = f1.read(self.blocksize) + segment_len = outfile.write(data) + if segment_len is None: + segment_len = len(data) + callback.relative_update(segment_len) + finally: + if not isfilelike(lpath): + outfile.close() + + def get( + self, + rpath, + lpath, + recursive=False, + callback=DEFAULT_CALLBACK, + maxdepth=None, + **kwargs, + ): + """Copy file(s) to local. + + Copies a specific file or tree of files (if recursive=True). If lpath + ends with a "/", it will be assumed to be a directory, and target files + will go within. Can submit a list of paths, which may be glob-patterns + and will be expanded. + + Calls get_file for each source. + """ + if isinstance(lpath, list) and isinstance(rpath, list): + # No need to expand paths when both source and destination + # are provided as lists + rpaths = rpath + lpaths = lpath + else: + from .implementations.local import ( + LocalFileSystem, + make_path_posix, + trailing_sep, + ) + + source_is_str = isinstance(rpath, str) + rpaths = self.expand_path(rpath, recursive=recursive, maxdepth=maxdepth) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not copy directories + rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))] + if not rpaths: + return + + if isinstance(lpath, str): + lpath = make_path_posix(lpath) + + source_is_file = len(rpaths) == 1 + dest_is_dir = isinstance(lpath, str) and ( + trailing_sep(lpath) or LocalFileSystem().isdir(lpath) + ) + + exists = source_is_str and ( + (has_magic(rpath) and source_is_file) + or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath)) + ) + lpaths = other_paths( + rpaths, + lpath, + exists=exists, + flatten=not source_is_str, + ) + + callback.set_size(len(lpaths)) + for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): + with callback.branched(rpath, lpath) as child: + self.get_file(rpath, lpath, callback=child, **kwargs) + + def put_file( + self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs + ): + """Copy single file to remote""" + if mode == "create" and self.exists(rpath): + raise FileExistsError + if os.path.isdir(lpath): + self.makedirs(rpath, exist_ok=True) + return None + + with open(lpath, "rb") as f1: + size = f1.seek(0, 2) + callback.set_size(size) + f1.seek(0) + + self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True) + with self.open(rpath, "wb", **kwargs) as f2: + while f1.tell() < size: + data = f1.read(self.blocksize) + segment_len = f2.write(data) + if segment_len is None: + segment_len = len(data) + callback.relative_update(segment_len) + + def put( + self, + lpath, + rpath, + recursive=False, + callback=DEFAULT_CALLBACK, + maxdepth=None, + **kwargs, + ): + """Copy file(s) from local. + + Copies a specific file or tree of files (if recursive=True). If rpath + ends with a "/", it will be assumed to be a directory, and target files + will go within. + + Calls put_file for each source. + """ + if isinstance(lpath, list) and isinstance(rpath, list): + # No need to expand paths when both source and destination + # are provided as lists + rpaths = rpath + lpaths = lpath + else: + from .implementations.local import ( + LocalFileSystem, + make_path_posix, + trailing_sep, + ) + + source_is_str = isinstance(lpath, str) + if source_is_str: + lpath = make_path_posix(lpath) + fs = LocalFileSystem() + lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not copy directories + lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] + if not lpaths: + return + + source_is_file = len(lpaths) == 1 + dest_is_dir = isinstance(rpath, str) and ( + trailing_sep(rpath) or self.isdir(rpath) + ) + + rpath = ( + self._strip_protocol(rpath) + if isinstance(rpath, str) + else [self._strip_protocol(p) for p in rpath] + ) + exists = source_is_str and ( + (has_magic(lpath) and source_is_file) + or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) + ) + rpaths = other_paths( + lpaths, + rpath, + exists=exists, + flatten=not source_is_str, + ) + + callback.set_size(len(rpaths)) + for lpath, rpath in callback.wrap(zip(lpaths, rpaths)): + with callback.branched(lpath, rpath) as child: + self.put_file(lpath, rpath, callback=child, **kwargs) + + def head(self, path, size=1024): + """Get the first ``size`` bytes from file""" + with self.open(path, "rb") as f: + return f.read(size) + + def tail(self, path, size=1024): + """Get the last ``size`` bytes from file""" + with self.open(path, "rb") as f: + f.seek(max(-size, -f.size), 2) + return f.read() + + def cp_file(self, path1, path2, **kwargs): + raise NotImplementedError + + def copy( + self, path1, path2, recursive=False, maxdepth=None, on_error=None, **kwargs + ): + """Copy within two locations in the filesystem + + on_error : "raise", "ignore" + If raise, any not-found exceptions will be raised; if ignore any + not-found exceptions will cause the path to be skipped; defaults to + raise unless recursive is true, where the default is ignore + """ + if on_error is None and recursive: + on_error = "ignore" + elif on_error is None: + on_error = "raise" + + if isinstance(path1, list) and isinstance(path2, list): + # No need to expand paths when both source and destination + # are provided as lists + paths1 = path1 + paths2 = path2 + else: + from .implementations.local import trailing_sep + + source_is_str = isinstance(path1, str) + paths1 = self.expand_path(path1, recursive=recursive, maxdepth=maxdepth) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not copy directories + paths1 = [p for p in paths1 if not (trailing_sep(p) or self.isdir(p))] + if not paths1: + return + + source_is_file = len(paths1) == 1 + dest_is_dir = isinstance(path2, str) and ( + trailing_sep(path2) or self.isdir(path2) + ) + + exists = source_is_str and ( + (has_magic(path1) and source_is_file) + or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) + ) + paths2 = other_paths( + paths1, + path2, + exists=exists, + flatten=not source_is_str, + ) + + for p1, p2 in zip(paths1, paths2): + try: + self.cp_file(p1, p2, **kwargs) + except FileNotFoundError: + if on_error == "raise": + raise + + def expand_path(self, path, recursive=False, maxdepth=None, **kwargs): + """Turn one or more globs or directories into a list of all matching paths + to files or directories. + + kwargs are passed to ``glob`` or ``find``, which may in turn call ``ls`` + """ + + if maxdepth is not None and maxdepth < 1: + raise ValueError("maxdepth must be at least 1") + + if isinstance(path, (str, os.PathLike)): + out = self.expand_path([path], recursive, maxdepth) + else: + out = set() + path = [self._strip_protocol(p) for p in path] + for p in path: + if has_magic(p): + bit = set(self.glob(p, maxdepth=maxdepth, **kwargs)) + out |= bit + if recursive: + # glob call above expanded one depth so if maxdepth is defined + # then decrement it in expand_path call below. If it is zero + # after decrementing then avoid expand_path call. + if maxdepth is not None and maxdepth <= 1: + continue + out |= set( + self.expand_path( + list(bit), + recursive=recursive, + maxdepth=maxdepth - 1 if maxdepth is not None else None, + **kwargs, + ) + ) + continue + elif recursive: + rec = set( + self.find( + p, maxdepth=maxdepth, withdirs=True, detail=False, **kwargs + ) + ) + out |= rec + if p not in out and (recursive is False or self.exists(p)): + # should only check once, for the root + out.add(p) + if not out: + raise FileNotFoundError(path) + return sorted(out) + + def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs): + """Move file(s) from one location to another""" + if path1 == path2: + logger.debug("%s mv: The paths are the same, so no files were moved.", self) + else: + # explicitly raise exception to prevent data corruption + self.copy( + path1, path2, recursive=recursive, maxdepth=maxdepth, onerror="raise" + ) + self.rm(path1, recursive=recursive) + + def rm_file(self, path): + """Delete a file""" + self._rm(path) + + def _rm(self, path): + """Delete one file""" + # this is the old name for the method, prefer rm_file + raise NotImplementedError + + def rm(self, path, recursive=False, maxdepth=None): + """Delete files. + + Parameters + ---------- + path: str or list of str + File(s) to delete. + recursive: bool + If file(s) are directories, recursively delete contents and then + also remove the directory + maxdepth: int or None + Depth to pass to walk for finding files to delete, if recursive. + If None, there will be no limit and infinite recursion may be + possible. + """ + path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth) + for p in reversed(path): + self.rm_file(p) + + @classmethod + def _parent(cls, path): + path = cls._strip_protocol(path) + if "/" in path: + parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker) + return cls.root_marker + parent + else: + return cls.root_marker + + def _open( + self, + path, + mode="rb", + block_size=None, + autocommit=True, + cache_options=None, + **kwargs, + ): + """Return raw bytes-mode file-like from the file-system""" + return AbstractBufferedFile( + self, + path, + mode, + block_size, + autocommit, + cache_options=cache_options, + **kwargs, + ) + + def open( + self, + path, + mode="rb", + block_size=None, + cache_options=None, + compression=None, + **kwargs, + ): + """ + Return a file-like object from the filesystem + + The resultant instance must function correctly in a context ``with`` + block. + + Parameters + ---------- + path: str + Target file + mode: str like 'rb', 'w' + See builtin ``open()`` + Mode "x" (exclusive write) may be implemented by the backend. Even if + it is, whether it is checked up front or on commit, and whether it is + atomic is implementation-dependent. + block_size: int + Some indication of buffering - this is a value in bytes + cache_options : dict, optional + Extra arguments to pass through to the cache. + compression: string or None + If given, open file using compression codec. Can either be a compression + name (a key in ``fsspec.compression.compr``) or "infer" to guess the + compression from the filename suffix. + encoding, errors, newline: passed on to TextIOWrapper for text mode + """ + import io + + path = self._strip_protocol(path) + if "b" not in mode: + mode = mode.replace("t", "") + "b" + + text_kwargs = { + k: kwargs.pop(k) + for k in ["encoding", "errors", "newline"] + if k in kwargs + } + return io.TextIOWrapper( + self.open( + path, + mode, + block_size=block_size, + cache_options=cache_options, + compression=compression, + **kwargs, + ), + **text_kwargs, + ) + else: + ac = kwargs.pop("autocommit", not self._intrans) + f = self._open( + path, + mode=mode, + block_size=block_size, + autocommit=ac, + cache_options=cache_options, + **kwargs, + ) + if compression is not None: + from fsspec.compression import compr + from fsspec.core import get_compression + + compression = get_compression(path, compression) + compress = compr[compression] + f = compress(f, mode=mode[0]) + + if not ac and "r" not in mode: + self.transaction.files.append(f) + return f + + def touch(self, path, truncate=True, **kwargs): + """Create empty file, or update timestamp + + Parameters + ---------- + path: str + file location + truncate: bool + If True, always set file size to 0; if False, update timestamp and + leave file unchanged, if backend allows this + """ + if truncate or not self.exists(path): + with self.open(path, "wb", **kwargs): + pass + else: + raise NotImplementedError # update timestamp, if possible + + def ukey(self, path): + """Hash of file properties, to tell if it has changed""" + return sha256(str(self.info(path)).encode()).hexdigest() + + def read_block(self, fn, offset, length, delimiter=None): + """Read a block of bytes from + + Starting at ``offset`` of the file, read ``length`` bytes. If + ``delimiter`` is set then we ensure that the read starts and stops at + delimiter boundaries that follow the locations ``offset`` and ``offset + + length``. If ``offset`` is zero then we start at zero. The + bytestring returned WILL include the end delimiter string. + + If offset+length is beyond the eof, reads to eof. + + Parameters + ---------- + fn: string + Path to filename + offset: int + Byte offset to start read + length: int + Number of bytes to read. If None, read to end. + delimiter: bytes (optional) + Ensure reading starts and stops at delimiter bytestring + + Examples + -------- + >>> fs.read_block('data/file.csv', 0, 13) # doctest: +SKIP + b'Alice, 100\\nBo' + >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP + b'Alice, 100\\nBob, 200\\n' + + Use ``length=None`` to read to the end of the file. + >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP + b'Alice, 100\\nBob, 200\\nCharlie, 300' + + See Also + -------- + :func:`fsspec.utils.read_block` + """ + with self.open(fn, "rb") as f: + size = f.size + if length is None: + length = size + if size is not None and offset + length > size: + length = size - offset + return read_block(f, offset, length, delimiter) + + def to_json(self, *, include_password: bool = True) -> str: + """ + JSON representation of this filesystem instance. + + Parameters + ---------- + include_password: bool, default True + Whether to include the password (if any) in the output. + + Returns + ------- + JSON string with keys ``cls`` (the python location of this class), + protocol (text name of this class's protocol, first one in case of + multiple), ``args`` (positional args, usually empty), and all other + keyword arguments as their own keys. + + Warnings + -------- + Serialized filesystems may contain sensitive information which have been + passed to the constructor, such as passwords and tokens. Make sure you + store and send them in a secure environment! + """ + from .json import FilesystemJSONEncoder + + return json.dumps( + self, + cls=type( + "_FilesystemJSONEncoder", + (FilesystemJSONEncoder,), + {"include_password": include_password}, + ), + ) + + @staticmethod + def from_json(blob: str) -> AbstractFileSystem: + """ + Recreate a filesystem instance from JSON representation. + + See ``.to_json()`` for the expected structure of the input. + + Parameters + ---------- + blob: str + + Returns + ------- + file system instance, not necessarily of this particular class. + + Warnings + -------- + This can import arbitrary modules (as determined by the ``cls`` key). + Make sure you haven't installed any modules that may execute malicious code + at import time. + """ + from .json import FilesystemJSONDecoder + + return json.loads(blob, cls=FilesystemJSONDecoder) + + def to_dict(self, *, include_password: bool = True) -> dict[str, Any]: + """ + JSON-serializable dictionary representation of this filesystem instance. + + Parameters + ---------- + include_password: bool, default True + Whether to include the password (if any) in the output. + + Returns + ------- + Dictionary with keys ``cls`` (the python location of this class), + protocol (text name of this class's protocol, first one in case of + multiple), ``args`` (positional args, usually empty), and all other + keyword arguments as their own keys. + + Warnings + -------- + Serialized filesystems may contain sensitive information which have been + passed to the constructor, such as passwords and tokens. Make sure you + store and send them in a secure environment! + """ + from .json import FilesystemJSONEncoder + + json_encoder = FilesystemJSONEncoder() + + cls = type(self) + proto = self.protocol + + storage_options = dict(self.storage_options) + if not include_password: + storage_options.pop("password", None) + + return dict( + cls=f"{cls.__module__}:{cls.__name__}", + protocol=proto[0] if isinstance(proto, (tuple, list)) else proto, + args=json_encoder.make_serializable(self.storage_args), + **json_encoder.make_serializable(storage_options), + ) + + @staticmethod + def from_dict(dct: dict[str, Any]) -> AbstractFileSystem: + """ + Recreate a filesystem instance from dictionary representation. + + See ``.to_dict()`` for the expected structure of the input. + + Parameters + ---------- + dct: Dict[str, Any] + + Returns + ------- + file system instance, not necessarily of this particular class. + + Warnings + -------- + This can import arbitrary modules (as determined by the ``cls`` key). + Make sure you haven't installed any modules that may execute malicious code + at import time. + """ + from .json import FilesystemJSONDecoder + + json_decoder = FilesystemJSONDecoder() + + dct = dict(dct) # Defensive copy + + cls = FilesystemJSONDecoder.try_resolve_fs_cls(dct) + if cls is None: + raise ValueError("Not a serialized AbstractFileSystem") + + dct.pop("cls", None) + dct.pop("protocol", None) + + return cls( + *json_decoder.unmake_serializable(dct.pop("args", ())), + **json_decoder.unmake_serializable(dct), + ) + + def _get_pyarrow_filesystem(self): + """ + Make a version of the FS instance which will be acceptable to pyarrow + """ + # all instances already also derive from pyarrow + return self + + def get_mapper(self, root="", check=False, create=False, missing_exceptions=None): + """Create key/value store based on this file-system + + Makes a MutableMapping interface to the FS at the given root path. + See ``fsspec.mapping.FSMap`` for further details. + """ + from .mapping import FSMap + + return FSMap( + root, + self, + check=check, + create=create, + missing_exceptions=missing_exceptions, + ) + + @classmethod + def clear_instance_cache(cls): + """ + Clear the cache of filesystem instances. + + Notes + ----- + Unless overridden by setting the ``cachable`` class attribute to False, + the filesystem class stores a reference to newly created instances. This + prevents Python's normal rules around garbage collection from working, + since the instances refcount will not drop to zero until + ``clear_instance_cache`` is called. + """ + cls._cache.clear() + + def created(self, path): + """Return the created timestamp of a file as a datetime.datetime""" + raise NotImplementedError + + def modified(self, path): + """Return the modified timestamp of a file as a datetime.datetime""" + raise NotImplementedError + + def tree( + self, + path: str = "/", + recursion_limit: int = 2, + max_display: int = 25, + display_size: bool = False, + prefix: str = "", + is_last: bool = True, + first: bool = True, + indent_size: int = 4, + ) -> str: + """ + Return a tree-like structure of the filesystem starting from the given path as a string. + + Parameters + ---------- + path: Root path to start traversal from + recursion_limit: Maximum depth of directory traversal + max_display: Maximum number of items to display per directory + display_size: Whether to display file sizes + prefix: Current line prefix for visual tree structure + is_last: Whether current item is last in its level + first: Whether this is the first call (displays root path) + indent_size: Number of spaces by indent + + Returns + ------- + str: A string representing the tree structure. + + Example + ------- + >>> from fsspec import filesystem + + >>> fs = filesystem('ftp', host='test.rebex.net', user='demo', password='password') + >>> tree = fs.tree(display_size=True, recursion_limit=3, indent_size=8, max_display=10) + >>> print(tree) + """ + + def format_bytes(n: int) -> str: + """Format bytes as text.""" + for prefix, k in ( + ("P", 2**50), + ("T", 2**40), + ("G", 2**30), + ("M", 2**20), + ("k", 2**10), + ): + if n >= 0.9 * k: + return f"{n / k:.2f} {prefix}b" + return f"{n}B" + + result = [] + + if first: + result.append(path) + + if recursion_limit: + indent = " " * indent_size + contents = self.ls(path, detail=True) + contents.sort( + key=lambda x: (x.get("type") != "directory", x.get("name", "")) + ) + + if max_display is not None and len(contents) > max_display: + displayed_contents = contents[:max_display] + remaining_count = len(contents) - max_display + else: + displayed_contents = contents + remaining_count = 0 + + for i, item in enumerate(displayed_contents): + is_last_item = (i == len(displayed_contents) - 1) and ( + remaining_count == 0 + ) + + branch = ( + "└" + ("─" * (indent_size - 2)) + if is_last_item + else "├" + ("─" * (indent_size - 2)) + ) + branch += " " + new_prefix = prefix + ( + indent if is_last_item else "│" + " " * (indent_size - 1) + ) + + name = os.path.basename(item.get("name", "")) + + if display_size and item.get("type") == "directory": + sub_contents = self.ls(item.get("name", ""), detail=True) + num_files = sum( + 1 for sub_item in sub_contents if sub_item.get("type") == "file" + ) + num_folders = sum( + 1 + for sub_item in sub_contents + if sub_item.get("type") == "directory" + ) + + if num_files == 0 and num_folders == 0: + size = " (empty folder)" + elif num_files == 0: + size = f" ({num_folders} subfolder{'s' if num_folders > 1 else ''})" + elif num_folders == 0: + size = f" ({num_files} file{'s' if num_files > 1 else ''})" + else: + size = f" ({num_files} file{'s' if num_files > 1 else ''}, {num_folders} subfolder{'s' if num_folders > 1 else ''})" + elif display_size and item.get("type") == "file": + size = f" ({format_bytes(item.get('size', 0))})" + else: + size = "" + + result.append(f"{prefix}{branch}{name}{size}") + + if item.get("type") == "directory" and recursion_limit > 0: + result.append( + self.tree( + path=item.get("name", ""), + recursion_limit=recursion_limit - 1, + max_display=max_display, + display_size=display_size, + prefix=new_prefix, + is_last=is_last_item, + first=False, + indent_size=indent_size, + ) + ) + + if remaining_count > 0: + more_message = f"{remaining_count} more item(s) not displayed." + result.append( + f"{prefix}{'└' + ('─' * (indent_size - 2))} {more_message}" + ) + + return "\n".join(_ for _ in result if _) + + # ------------------------------------------------------------------------ + # Aliases + + def read_bytes(self, path, start=None, end=None, **kwargs): + """Alias of `AbstractFileSystem.cat_file`.""" + return self.cat_file(path, start=start, end=end, **kwargs) + + def write_bytes(self, path, value, **kwargs): + """Alias of `AbstractFileSystem.pipe_file`.""" + self.pipe_file(path, value, **kwargs) + + def makedir(self, path, create_parents=True, **kwargs): + """Alias of `AbstractFileSystem.mkdir`.""" + return self.mkdir(path, create_parents=create_parents, **kwargs) + + def mkdirs(self, path, exist_ok=False): + """Alias of `AbstractFileSystem.makedirs`.""" + return self.makedirs(path, exist_ok=exist_ok) + + def listdir(self, path, detail=True, **kwargs): + """Alias of `AbstractFileSystem.ls`.""" + return self.ls(path, detail=detail, **kwargs) + + def cp(self, path1, path2, **kwargs): + """Alias of `AbstractFileSystem.copy`.""" + return self.copy(path1, path2, **kwargs) + + def move(self, path1, path2, **kwargs): + """Alias of `AbstractFileSystem.mv`.""" + return self.mv(path1, path2, **kwargs) + + def stat(self, path, **kwargs): + """Alias of `AbstractFileSystem.info`.""" + return self.info(path, **kwargs) + + def disk_usage(self, path, total=True, maxdepth=None, **kwargs): + """Alias of `AbstractFileSystem.du`.""" + return self.du(path, total=total, maxdepth=maxdepth, **kwargs) + + def rename(self, path1, path2, **kwargs): + """Alias of `AbstractFileSystem.mv`.""" + return self.mv(path1, path2, **kwargs) + + def delete(self, path, recursive=False, maxdepth=None): + """Alias of `AbstractFileSystem.rm`.""" + return self.rm(path, recursive=recursive, maxdepth=maxdepth) + + def upload(self, lpath, rpath, recursive=False, **kwargs): + """Alias of `AbstractFileSystem.put`.""" + return self.put(lpath, rpath, recursive=recursive, **kwargs) + + def download(self, rpath, lpath, recursive=False, **kwargs): + """Alias of `AbstractFileSystem.get`.""" + return self.get(rpath, lpath, recursive=recursive, **kwargs) + + def sign(self, path, expiration=100, **kwargs): + """Create a signed URL representing the given path + + Some implementations allow temporary URLs to be generated, as a + way of delegating credentials. + + Parameters + ---------- + path : str + The path on the filesystem + expiration : int + Number of seconds to enable the URL for (if supported) + + Returns + ------- + URL : str + The signed URL + + Raises + ------ + NotImplementedError : if method is not implemented for a filesystem + """ + raise NotImplementedError("Sign is not implemented for this filesystem") + + def _isfilestore(self): + # Originally inherited from pyarrow DaskFileSystem. Keeping this + # here for backwards compatibility as long as pyarrow uses its + # legacy fsspec-compatible filesystems and thus accepts fsspec + # filesystems as well + return False + + +class AbstractBufferedFile(io.IOBase): + """Convenient class to derive from to provide buffering + + In the case that the backend does not provide a pythonic file-like object + already, this class contains much of the logic to build one. The only + methods that need to be overridden are ``_upload_chunk``, + ``_initiate_upload`` and ``_fetch_range``. + """ + + DEFAULT_BLOCK_SIZE = 5 * 2**20 + _details = None + + def __init__( + self, + fs, + path, + mode="rb", + block_size="default", + autocommit=True, + cache_type="readahead", + cache_options=None, + size=None, + **kwargs, + ): + """ + Template for files with buffered reading and writing + + Parameters + ---------- + fs: instance of FileSystem + path: str + location in file-system + mode: str + Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file + systems may be read-only, and some may not support append. + block_size: int + Buffer size for reading or writing, 'default' for class default + autocommit: bool + Whether to write to final destination; may only impact what + happens when file is being closed. + cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead" + Caching policy in read mode. See the definitions in ``core``. + cache_options : dict + Additional options passed to the constructor for the cache specified + by `cache_type`. + size: int + If given and in read mode, suppressed having to look up the file size + kwargs: + Gets stored as self.kwargs + """ + from .core import caches + + self.path = path + self.fs = fs + self.mode = mode + self.blocksize = ( + self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size + ) + self.loc = 0 + self.autocommit = autocommit + self.end = None + self.start = None + self.closed = False + + if cache_options is None: + cache_options = {} + + if "trim" in kwargs: + warnings.warn( + "Passing 'trim' to control the cache behavior has been deprecated. " + "Specify it within the 'cache_options' argument instead.", + FutureWarning, + ) + cache_options["trim"] = kwargs.pop("trim") + + self.kwargs = kwargs + + if mode not in {"ab", "rb", "wb", "xb"}: + raise NotImplementedError("File mode not supported") + if mode == "rb": + if size is not None: + self.size = size + else: + self.size = self.details["size"] + self.cache = caches[cache_type]( + self.blocksize, self._fetch_range, self.size, **cache_options + ) + else: + self.buffer = io.BytesIO() + self.offset = None + self.forced = False + self.location = None + + @property + def details(self): + if self._details is None: + self._details = self.fs.info(self.path) + return self._details + + @details.setter + def details(self, value): + self._details = value + self.size = value["size"] + + @property + def full_name(self): + return _unstrip_protocol(self.path, self.fs) + + @property + def closed(self): + # get around this attr being read-only in IOBase + # use getattr here, since this can be called during del + return getattr(self, "_closed", True) + + @closed.setter + def closed(self, c): + self._closed = c + + def __hash__(self): + if "w" in self.mode: + return id(self) + else: + return int(tokenize(self.details), 16) + + def __eq__(self, other): + """Files are equal if they have the same checksum, only in read mode""" + if self is other: + return True + return ( + isinstance(other, type(self)) + and self.mode == "rb" + and other.mode == "rb" + and hash(self) == hash(other) + ) + + def commit(self): + """Move from temp to final destination""" + + def discard(self): + """Throw away temporary file""" + + def info(self): + """File information about this path""" + if self.readable(): + return self.details + else: + raise ValueError("Info not available while writing") + + def tell(self): + """Current file location""" + return self.loc + + def seek(self, loc, whence=0): + """Set current file location + + Parameters + ---------- + loc: int + byte location + whence: {0, 1, 2} + from start of file, current location or end of file, resp. + """ + loc = int(loc) + if not self.mode == "rb": + raise OSError(ESPIPE, "Seek only available in read mode") + if whence == 0: + nloc = loc + elif whence == 1: + nloc = self.loc + loc + elif whence == 2: + nloc = self.size + loc + else: + raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)") + if nloc < 0: + raise ValueError("Seek before start of file") + self.loc = nloc + return self.loc + + def write(self, data): + """ + Write data to buffer. + + Buffer only sent on flush() or if buffer is greater than + or equal to blocksize. + + Parameters + ---------- + data: bytes + Set of bytes to be written. + """ + if not self.writable(): + raise ValueError("File not in write mode") + if self.closed: + raise ValueError("I/O operation on closed file.") + if self.forced: + raise ValueError("This file has been force-flushed, can only close") + out = self.buffer.write(data) + self.loc += out + if self.buffer.tell() >= self.blocksize: + self.flush() + return out + + def flush(self, force=False): + """ + Write buffered data to backend store. + + Writes the current buffer, if it is larger than the block-size, or if + the file is being closed. + + Parameters + ---------- + force: bool + When closing, write the last block even if it is smaller than + blocks are allowed to be. Disallows further writing to this file. + """ + + if self.closed: + raise ValueError("Flush on closed file") + if force and self.forced: + raise ValueError("Force flush cannot be called more than once") + if force: + self.forced = True + + if self.readable(): + # no-op to flush on read-mode + return + + if not force and self.buffer.tell() < self.blocksize: + # Defer write on small block + return + + if self.offset is None: + # Initialize a multipart upload + self.offset = 0 + try: + self._initiate_upload() + except: + self.closed = True + raise + + if self._upload_chunk(final=force) is not False: + self.offset += self.buffer.seek(0, 2) + self.buffer = io.BytesIO() + + def _upload_chunk(self, final=False): + """Write one part of a multi-block file upload + + Parameters + ========== + final: bool + This is the last block, so should complete file, if + self.autocommit is True. + """ + # may not yet have been initialized, may need to call _initialize_upload + + def _initiate_upload(self): + """Create remote file/upload""" + pass + + def _fetch_range(self, start, end): + """Get the specified set of bytes from remote""" + return self.fs.cat_file(self.path, start=start, end=end) + + def read(self, length=-1): + """ + Return data from cache, or fetch pieces as necessary + + Parameters + ---------- + length: int (-1) + Number of bytes to read; if <0, all remaining bytes. + """ + length = -1 if length is None else int(length) + if self.mode != "rb": + raise ValueError("File not in read mode") + if length < 0: + length = self.size - self.loc + if self.closed: + raise ValueError("I/O operation on closed file.") + if length == 0: + # don't even bother calling fetch + return b"" + out = self.cache._fetch(self.loc, self.loc + length) + + logger.debug( + "%s read: %i - %i %s", + self, + self.loc, + self.loc + length, + self.cache._log_stats(), + ) + self.loc += len(out) + return out + + def readinto(self, b): + """mirrors builtin file's readinto method + + https://docs.python.org/3/library/io.html#io.RawIOBase.readinto + """ + out = memoryview(b).cast("B") + data = self.read(out.nbytes) + out[: len(data)] = data + return len(data) + + def readuntil(self, char=b"\n", blocks=None): + """Return data between current position and first occurrence of char + + char is included in the output, except if the end of the tile is + encountered first. + + Parameters + ---------- + char: bytes + Thing to find + blocks: None or int + How much to read in each go. Defaults to file blocksize - which may + mean a new read on every call. + """ + out = [] + while True: + start = self.tell() + part = self.read(blocks or self.blocksize) + if len(part) == 0: + break + found = part.find(char) + if found > -1: + out.append(part[: found + len(char)]) + self.seek(start + found + len(char)) + break + out.append(part) + return b"".join(out) + + def readline(self): + """Read until first occurrence of newline character + + Note that, because of character encoding, this is not necessarily a + true line ending. + """ + return self.readuntil(b"\n") + + def __next__(self): + out = self.readline() + if out: + return out + raise StopIteration + + def __iter__(self): + return self + + def readlines(self): + """Return all data, split by the newline character""" + data = self.read() + lines = data.split(b"\n") + out = [l + b"\n" for l in lines[:-1]] + if data.endswith(b"\n"): + return out + else: + return out + [lines[-1]] + # return list(self) ??? + + def readinto1(self, b): + return self.readinto(b) + + def close(self): + """Close file + + Finalizes writes, discards cache + """ + if getattr(self, "_unclosable", False): + return + if self.closed: + return + try: + if self.mode == "rb": + self.cache = None + else: + if not self.forced: + self.flush(force=True) + + if self.fs is not None: + self.fs.invalidate_cache(self.path) + self.fs.invalidate_cache(self.fs._parent(self.path)) + finally: + self.closed = True + + def readable(self): + """Whether opened for reading""" + return "r" in self.mode and not self.closed + + def seekable(self): + """Whether is seekable (only in read mode)""" + return self.readable() + + def writable(self): + """Whether opened for writing""" + return self.mode in {"wb", "ab", "xb"} and not self.closed + + def __reduce__(self): + if self.mode != "rb": + raise RuntimeError("Pickling a writeable file is not supported") + + return reopen, ( + self.fs, + self.path, + self.mode, + self.blocksize, + self.loc, + self.size, + self.autocommit, + self.cache.name if self.cache else "none", + self.kwargs, + ) + + def __del__(self): + if not self.closed: + self.close() + + def __str__(self): + return f"<File-like object {type(self.fs).__name__}, {self.path}>" + + __repr__ = __str__ + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + +def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs): + file = fs.open( + path, + mode=mode, + block_size=blocksize, + autocommit=autocommit, + cache_type=cache_type, + size=size, + **kwargs, + ) + if loc > 0: + file.seek(loc) + return file |