diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/generic.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/fsspec/generic.py | 411 |
1 files changed, 411 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/generic.py b/.venv/lib/python3.12/site-packages/fsspec/generic.py new file mode 100644 index 00000000..9bad0f04 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/fsspec/generic.py @@ -0,0 +1,411 @@ +from __future__ import annotations + +import inspect +import logging +import os +import shutil +import uuid +from typing import Optional + +from .asyn import AsyncFileSystem, _run_coros_in_chunks, sync_wrapper +from .callbacks import DEFAULT_CALLBACK +from .core import filesystem, get_filesystem_class, split_protocol, url_to_fs + +_generic_fs = {} +logger = logging.getLogger("fsspec.generic") + + +def set_generic_fs(protocol, **storage_options): + _generic_fs[protocol] = filesystem(protocol, **storage_options) + + +default_method = "default" + + +def _resolve_fs(url, method=None, protocol=None, storage_options=None): + """Pick instance of backend FS""" + method = method or default_method + protocol = protocol or split_protocol(url)[0] + storage_options = storage_options or {} + if method == "default": + return filesystem(protocol) + if method == "generic": + return _generic_fs[protocol] + if method == "current": + cls = get_filesystem_class(protocol) + return cls.current() + if method == "options": + fs, _ = url_to_fs(url, **storage_options.get(protocol, {})) + return fs + raise ValueError(f"Unknown FS resolution method: {method}") + + +def rsync( + source, + destination, + delete_missing=False, + source_field="size", + dest_field="size", + update_cond="different", + inst_kwargs=None, + fs=None, + **kwargs, +): + """Sync files between two directory trees + + (experimental) + + Parameters + ---------- + source: str + Root of the directory tree to take files from. This must be a directory, but + do not include any terminating "/" character + destination: str + Root path to copy into. The contents of this location should be + identical to the contents of ``source`` when done. This will be made a + directory, and the terminal "/" should not be included. + delete_missing: bool + If there are paths in the destination that don't exist in the + source and this is True, delete them. Otherwise, leave them alone. + source_field: str | callable + If ``update_field`` is "different", this is the key in the info + of source files to consider for difference. Maybe a function of the + info dict. + dest_field: str | callable + If ``update_field`` is "different", this is the key in the info + of destination files to consider for difference. May be a function of + the info dict. + update_cond: "different"|"always"|"never" + If "always", every file is copied, regardless of whether it exists in + the destination. If "never", files that exist in the destination are + not copied again. If "different" (default), only copy if the info + fields given by ``source_field`` and ``dest_field`` (usually "size") + are different. Other comparisons may be added in the future. + inst_kwargs: dict|None + If ``fs`` is None, use this set of keyword arguments to make a + GenericFileSystem instance + fs: GenericFileSystem|None + Instance to use if explicitly given. The instance defines how to + to make downstream file system instances from paths. + + Returns + ------- + dict of the copy operations that were performed, {source: destination} + """ + fs = fs or GenericFileSystem(**(inst_kwargs or {})) + source = fs._strip_protocol(source) + destination = fs._strip_protocol(destination) + allfiles = fs.find(source, withdirs=True, detail=True) + if not fs.isdir(source): + raise ValueError("Can only rsync on a directory") + otherfiles = fs.find(destination, withdirs=True, detail=True) + dirs = [ + a + for a, v in allfiles.items() + if v["type"] == "directory" and a.replace(source, destination) not in otherfiles + ] + logger.debug(f"{len(dirs)} directories to create") + if dirs: + fs.make_many_dirs( + [dirn.replace(source, destination) for dirn in dirs], exist_ok=True + ) + allfiles = {a: v for a, v in allfiles.items() if v["type"] == "file"} + logger.debug(f"{len(allfiles)} files to consider for copy") + to_delete = [ + o + for o, v in otherfiles.items() + if o.replace(destination, source) not in allfiles and v["type"] == "file" + ] + for k, v in allfiles.copy().items(): + otherfile = k.replace(source, destination) + if otherfile in otherfiles: + if update_cond == "always": + allfiles[k] = otherfile + elif update_cond == "different": + inf1 = source_field(v) if callable(source_field) else v[source_field] + v2 = otherfiles[otherfile] + inf2 = dest_field(v2) if callable(dest_field) else v2[dest_field] + if inf1 != inf2: + # details mismatch, make copy + allfiles[k] = otherfile + else: + # details match, don't copy + allfiles.pop(k) + else: + # file not in target yet + allfiles[k] = otherfile + logger.debug(f"{len(allfiles)} files to copy") + if allfiles: + source_files, target_files = zip(*allfiles.items()) + fs.cp(source_files, target_files, **kwargs) + logger.debug(f"{len(to_delete)} files to delete") + if delete_missing and to_delete: + fs.rm(to_delete) + return allfiles + + +class GenericFileSystem(AsyncFileSystem): + """Wrapper over all other FS types + + <experimental!> + + This implementation is a single unified interface to be able to run FS operations + over generic URLs, and dispatch to the specific implementations using the URL + protocol prefix. + + Note: instances of this FS are always async, even if you never use it with any async + backend. + """ + + protocol = "generic" # there is no real reason to ever use a protocol with this FS + + def __init__(self, default_method="default", **kwargs): + """ + + Parameters + ---------- + default_method: str (optional) + Defines how to configure backend FS instances. Options are: + - "default": instantiate like FSClass(), with no + extra arguments; this is the default instance of that FS, and can be + configured via the config system + - "generic": takes instances from the `_generic_fs` dict in this module, + which you must populate before use. Keys are by protocol + - "current": takes the most recently instantiated version of each FS + """ + self.method = default_method + super().__init__(**kwargs) + + def _parent(self, path): + fs = _resolve_fs(path, self.method) + return fs.unstrip_protocol(fs._parent(path)) + + def _strip_protocol(self, path): + # normalization only + fs = _resolve_fs(path, self.method) + return fs.unstrip_protocol(fs._strip_protocol(path)) + + async def _find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs): + fs = _resolve_fs(path, self.method) + if fs.async_impl: + out = await fs._find( + path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs + ) + else: + out = fs.find( + path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs + ) + result = {} + for k, v in out.items(): + v = v.copy() # don't corrupt target FS dircache + name = fs.unstrip_protocol(k) + v["name"] = name + result[name] = v + if detail: + return result + return list(result) + + async def _info(self, url, **kwargs): + fs = _resolve_fs(url, self.method) + if fs.async_impl: + out = await fs._info(url, **kwargs) + else: + out = fs.info(url, **kwargs) + out = out.copy() # don't edit originals + out["name"] = fs.unstrip_protocol(out["name"]) + return out + + async def _ls( + self, + url, + detail=True, + **kwargs, + ): + fs = _resolve_fs(url, self.method) + if fs.async_impl: + out = await fs._ls(url, detail=True, **kwargs) + else: + out = fs.ls(url, detail=True, **kwargs) + out = [o.copy() for o in out] # don't edit originals + for o in out: + o["name"] = fs.unstrip_protocol(o["name"]) + if detail: + return out + else: + return [o["name"] for o in out] + + async def _cat_file( + self, + url, + **kwargs, + ): + fs = _resolve_fs(url, self.method) + if fs.async_impl: + return await fs._cat_file(url, **kwargs) + else: + return fs.cat_file(url, **kwargs) + + async def _pipe_file( + self, + path, + value, + **kwargs, + ): + fs = _resolve_fs(path, self.method) + if fs.async_impl: + return await fs._pipe_file(path, value, **kwargs) + else: + return fs.pipe_file(path, value, **kwargs) + + async def _rm(self, url, **kwargs): + urls = url + if isinstance(urls, str): + urls = [urls] + fs = _resolve_fs(urls[0], self.method) + if fs.async_impl: + await fs._rm(urls, **kwargs) + else: + fs.rm(url, **kwargs) + + async def _makedirs(self, path, exist_ok=False): + logger.debug("Make dir %s", path) + fs = _resolve_fs(path, self.method) + if fs.async_impl: + await fs._makedirs(path, exist_ok=exist_ok) + else: + fs.makedirs(path, exist_ok=exist_ok) + + def rsync(self, source, destination, **kwargs): + """Sync files between two directory trees + + See `func:rsync` for more details. + """ + rsync(source, destination, fs=self, **kwargs) + + async def _cp_file( + self, + url, + url2, + blocksize=2**20, + callback=DEFAULT_CALLBACK, + **kwargs, + ): + fs = _resolve_fs(url, self.method) + fs2 = _resolve_fs(url2, self.method) + if fs is fs2: + # pure remote + if fs.async_impl: + return await fs._cp_file(url, url2, **kwargs) + else: + return fs.cp_file(url, url2, **kwargs) + kw = {"blocksize": 0, "cache_type": "none"} + try: + f1 = ( + await fs.open_async(url, "rb") + if hasattr(fs, "open_async") + else fs.open(url, "rb", **kw) + ) + callback.set_size(await maybe_await(f1.size)) + f2 = ( + await fs2.open_async(url2, "wb") + if hasattr(fs2, "open_async") + else fs2.open(url2, "wb", **kw) + ) + while f1.size is None or f2.tell() < f1.size: + data = await maybe_await(f1.read(blocksize)) + if f1.size is None and not data: + break + await maybe_await(f2.write(data)) + callback.absolute_update(f2.tell()) + finally: + try: + await maybe_await(f2.close()) + await maybe_await(f1.close()) + except NameError: + # fail while opening f1 or f2 + pass + + async def _make_many_dirs(self, urls, exist_ok=True): + fs = _resolve_fs(urls[0], self.method) + if fs.async_impl: + coros = [fs._makedirs(u, exist_ok=exist_ok) for u in urls] + await _run_coros_in_chunks(coros) + else: + for u in urls: + fs.makedirs(u, exist_ok=exist_ok) + + make_many_dirs = sync_wrapper(_make_many_dirs) + + async def _copy( + self, + path1: list[str], + path2: list[str], + recursive: bool = False, + on_error: str = "ignore", + maxdepth: Optional[int] = None, + batch_size: Optional[int] = None, + tempdir: Optional[str] = None, + **kwargs, + ): + if recursive: + raise NotImplementedError + fs = _resolve_fs(path1[0], self.method) + fs2 = _resolve_fs(path2[0], self.method) + # not expanding paths atm., assume call is from rsync() + if fs is fs2: + # pure remote + if fs.async_impl: + return await fs._copy(path1, path2, **kwargs) + else: + return fs.copy(path1, path2, **kwargs) + await copy_file_op( + fs, path1, fs2, path2, tempdir, batch_size, on_error=on_error + ) + + +async def copy_file_op( + fs1, url1, fs2, url2, tempdir=None, batch_size=20, on_error="ignore" +): + import tempfile + + tempdir = tempdir or tempfile.mkdtemp() + try: + coros = [ + _copy_file_op( + fs1, + u1, + fs2, + u2, + os.path.join(tempdir, uuid.uuid4().hex), + on_error=on_error, + ) + for u1, u2 in zip(url1, url2) + ] + await _run_coros_in_chunks(coros, batch_size=batch_size) + finally: + shutil.rmtree(tempdir) + + +async def _copy_file_op(fs1, url1, fs2, url2, local, on_error="ignore"): + ex = () if on_error == "raise" else Exception + logger.debug("Copy %s -> %s", url1, url2) + try: + if fs1.async_impl: + await fs1._get_file(url1, local) + else: + fs1.get_file(url1, local) + if fs2.async_impl: + await fs2._put_file(local, url2) + else: + fs2.put_file(local, url2) + os.unlink(local) + logger.debug("Copy %s -> %s; done", url1, url2) + except ex as e: + logger.debug("ignoring cp exception for %s: %s", url1, e) + + +async def maybe_await(cor): + if inspect.iscoroutine(cor): + return await cor + else: + return cor |