diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/fuse.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/fsspec/fuse.py | 324 |
1 files changed, 324 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/fuse.py b/.venv/lib/python3.12/site-packages/fsspec/fuse.py new file mode 100644 index 00000000..566d520f --- /dev/null +++ b/.venv/lib/python3.12/site-packages/fsspec/fuse.py @@ -0,0 +1,324 @@ +import argparse +import logging +import os +import stat +import threading +import time +from errno import EIO, ENOENT + +from fuse import FUSE, FuseOSError, LoggingMixIn, Operations + +from fsspec import __version__ +from fsspec.core import url_to_fs + +logger = logging.getLogger("fsspec.fuse") + + +class FUSEr(Operations): + def __init__(self, fs, path, ready_file=False): + self.fs = fs + self.cache = {} + self.root = path.rstrip("/") + "/" + self.counter = 0 + logger.info("Starting FUSE at %s", path) + self._ready_file = ready_file + + def getattr(self, path, fh=None): + logger.debug("getattr %s", path) + if self._ready_file and path in ["/.fuse_ready", ".fuse_ready"]: + return {"type": "file", "st_size": 5} + + path = "".join([self.root, path.lstrip("/")]).rstrip("/") + try: + info = self.fs.info(path) + except FileNotFoundError as exc: + raise FuseOSError(ENOENT) from exc + + data = {"st_uid": info.get("uid", 1000), "st_gid": info.get("gid", 1000)} + perm = info.get("mode", 0o777) + + if info["type"] != "file": + data["st_mode"] = stat.S_IFDIR | perm + data["st_size"] = 0 + data["st_blksize"] = 0 + else: + data["st_mode"] = stat.S_IFREG | perm + data["st_size"] = info["size"] + data["st_blksize"] = 5 * 2**20 + data["st_nlink"] = 1 + data["st_atime"] = info["atime"] if "atime" in info else time.time() + data["st_ctime"] = info["ctime"] if "ctime" in info else time.time() + data["st_mtime"] = info["mtime"] if "mtime" in info else time.time() + return data + + def readdir(self, path, fh): + logger.debug("readdir %s", path) + path = "".join([self.root, path.lstrip("/")]) + files = self.fs.ls(path, False) + files = [os.path.basename(f.rstrip("/")) for f in files] + return [".", ".."] + files + + def mkdir(self, path, mode): + path = "".join([self.root, path.lstrip("/")]) + self.fs.mkdir(path) + return 0 + + def rmdir(self, path): + path = "".join([self.root, path.lstrip("/")]) + self.fs.rmdir(path) + return 0 + + def read(self, path, size, offset, fh): + logger.debug("read %s", (path, size, offset)) + if self._ready_file and path in ["/.fuse_ready", ".fuse_ready"]: + # status indicator + return b"ready" + + f = self.cache[fh] + f.seek(offset) + out = f.read(size) + return out + + def write(self, path, data, offset, fh): + logger.debug("write %s", (path, offset)) + f = self.cache[fh] + f.seek(offset) + f.write(data) + return len(data) + + def create(self, path, flags, fi=None): + logger.debug("create %s", (path, flags)) + fn = "".join([self.root, path.lstrip("/")]) + self.fs.touch(fn) # OS will want to get attributes immediately + f = self.fs.open(fn, "wb") + self.cache[self.counter] = f + self.counter += 1 + return self.counter - 1 + + def open(self, path, flags): + logger.debug("open %s", (path, flags)) + fn = "".join([self.root, path.lstrip("/")]) + if flags % 2 == 0: + # read + mode = "rb" + else: + # write/create + mode = "wb" + self.cache[self.counter] = self.fs.open(fn, mode) + self.counter += 1 + return self.counter - 1 + + def truncate(self, path, length, fh=None): + fn = "".join([self.root, path.lstrip("/")]) + if length != 0: + raise NotImplementedError + # maybe should be no-op since open with write sets size to zero anyway + self.fs.touch(fn) + + def unlink(self, path): + fn = "".join([self.root, path.lstrip("/")]) + try: + self.fs.rm(fn, False) + except (OSError, FileNotFoundError) as exc: + raise FuseOSError(EIO) from exc + + def release(self, path, fh): + try: + if fh in self.cache: + f = self.cache[fh] + f.close() + self.cache.pop(fh) + except Exception as e: + print(e) + return 0 + + def chmod(self, path, mode): + if hasattr(self.fs, "chmod"): + path = "".join([self.root, path.lstrip("/")]) + return self.fs.chmod(path, mode) + raise NotImplementedError + + +def run( + fs, + path, + mount_point, + foreground=True, + threads=False, + ready_file=False, + ops_class=FUSEr, +): + """Mount stuff in a local directory + + This uses fusepy to make it appear as if a given path on an fsspec + instance is in fact resident within the local file-system. + + This requires that fusepy by installed, and that FUSE be available on + the system (typically requiring a package to be installed with + apt, yum, brew, etc.). + + Parameters + ---------- + fs: file-system instance + From one of the compatible implementations + path: str + Location on that file-system to regard as the root directory to + mount. Note that you typically should include the terminating "/" + character. + mount_point: str + An empty directory on the local file-system where the contents of + the remote path will appear. + foreground: bool + Whether or not calling this function will block. Operation will + typically be more stable if True. + threads: bool + Whether or not to create threads when responding to file operations + within the mounter directory. Operation will typically be more + stable if False. + ready_file: bool + Whether the FUSE process is ready. The ``.fuse_ready`` file will + exist in the ``mount_point`` directory if True. Debugging purpose. + ops_class: FUSEr or Subclass of FUSEr + To override the default behavior of FUSEr. For Example, logging + to file. + + """ + func = lambda: FUSE( + ops_class(fs, path, ready_file=ready_file), + mount_point, + nothreads=not threads, + foreground=foreground, + ) + if not foreground: + th = threading.Thread(target=func) + th.daemon = True + th.start() + return th + else: # pragma: no cover + try: + func() + except KeyboardInterrupt: + pass + + +def main(args): + """Mount filesystem from chained URL to MOUNT_POINT. + + Examples: + + python3 -m fsspec.fuse memory /usr/share /tmp/mem + + python3 -m fsspec.fuse local /tmp/source /tmp/local \\ + -l /tmp/fsspecfuse.log + + You can also mount chained-URLs and use special settings: + + python3 -m fsspec.fuse 'filecache::zip::file://data.zip' \\ + / /tmp/zip \\ + -o 'filecache-cache_storage=/tmp/simplecache' + + You can specify the type of the setting by using `[int]` or `[bool]`, + (`true`, `yes`, `1` represents the Boolean value `True`): + + python3 -m fsspec.fuse 'simplecache::ftp://ftp1.at.proftpd.org' \\ + /historic/packages/RPMS /tmp/ftp \\ + -o 'simplecache-cache_storage=/tmp/simplecache' \\ + -o 'simplecache-check_files=false[bool]' \\ + -o 'ftp-listings_expiry_time=60[int]' \\ + -o 'ftp-username=anonymous' \\ + -o 'ftp-password=xieyanbo' + """ + + class RawDescriptionArgumentParser(argparse.ArgumentParser): + def format_help(self): + usage = super().format_help() + parts = usage.split("\n\n") + parts[1] = self.description.rstrip() + return "\n\n".join(parts) + + parser = RawDescriptionArgumentParser(prog="fsspec.fuse", description=main.__doc__) + parser.add_argument("--version", action="version", version=__version__) + parser.add_argument("url", type=str, help="fs url") + parser.add_argument("source_path", type=str, help="source directory in fs") + parser.add_argument("mount_point", type=str, help="local directory") + parser.add_argument( + "-o", + "--option", + action="append", + help="Any options of protocol included in the chained URL", + ) + parser.add_argument( + "-l", "--log-file", type=str, help="Logging FUSE debug info (Default: '')" + ) + parser.add_argument( + "-f", + "--foreground", + action="store_false", + help="Running in foreground or not (Default: False)", + ) + parser.add_argument( + "-t", + "--threads", + action="store_false", + help="Running with threads support (Default: False)", + ) + parser.add_argument( + "-r", + "--ready-file", + action="store_false", + help="The `.fuse_ready` file will exist after FUSE is ready. " + "(Debugging purpose, Default: False)", + ) + args = parser.parse_args(args) + + kwargs = {} + for item in args.option or []: + key, sep, value = item.partition("=") + if not sep: + parser.error(message=f"Wrong option: {item!r}") + val = value.lower() + if val.endswith("[int]"): + value = int(value[: -len("[int]")]) + elif val.endswith("[bool]"): + value = val[: -len("[bool]")] in ["1", "yes", "true"] + + if "-" in key: + fs_name, setting_name = key.split("-", 1) + if fs_name in kwargs: + kwargs[fs_name][setting_name] = value + else: + kwargs[fs_name] = {setting_name: value} + else: + kwargs[key] = value + + if args.log_file: + logging.basicConfig( + level=logging.DEBUG, + filename=args.log_file, + format="%(asctime)s %(message)s", + ) + + class LoggingFUSEr(FUSEr, LoggingMixIn): + pass + + fuser = LoggingFUSEr + else: + fuser = FUSEr + + fs, url_path = url_to_fs(args.url, **kwargs) + logger.debug("Mounting %s to %s", url_path, str(args.mount_point)) + run( + fs, + args.source_path, + args.mount_point, + foreground=args.foreground, + threads=args.threads, + ready_file=args.ready_file, + ops_class=fuser, + ) + + +if __name__ == "__main__": + import sys + + main(sys.argv[1:]) |