aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/fsspec/generic.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/generic.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/generic.py411
1 files changed, 411 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/generic.py b/.venv/lib/python3.12/site-packages/fsspec/generic.py
new file mode 100644
index 00000000..9bad0f04
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/generic.py
@@ -0,0 +1,411 @@
+from __future__ import annotations
+
+import inspect
+import logging
+import os
+import shutil
+import uuid
+from typing import Optional
+
+from .asyn import AsyncFileSystem, _run_coros_in_chunks, sync_wrapper
+from .callbacks import DEFAULT_CALLBACK
+from .core import filesystem, get_filesystem_class, split_protocol, url_to_fs
+
+_generic_fs = {}
+logger = logging.getLogger("fsspec.generic")
+
+
+def set_generic_fs(protocol, **storage_options):
+ _generic_fs[protocol] = filesystem(protocol, **storage_options)
+
+
+default_method = "default"
+
+
+def _resolve_fs(url, method=None, protocol=None, storage_options=None):
+ """Pick instance of backend FS"""
+ method = method or default_method
+ protocol = protocol or split_protocol(url)[0]
+ storage_options = storage_options or {}
+ if method == "default":
+ return filesystem(protocol)
+ if method == "generic":
+ return _generic_fs[protocol]
+ if method == "current":
+ cls = get_filesystem_class(protocol)
+ return cls.current()
+ if method == "options":
+ fs, _ = url_to_fs(url, **storage_options.get(protocol, {}))
+ return fs
+ raise ValueError(f"Unknown FS resolution method: {method}")
+
+
+def rsync(
+ source,
+ destination,
+ delete_missing=False,
+ source_field="size",
+ dest_field="size",
+ update_cond="different",
+ inst_kwargs=None,
+ fs=None,
+ **kwargs,
+):
+ """Sync files between two directory trees
+
+ (experimental)
+
+ Parameters
+ ----------
+ source: str
+ Root of the directory tree to take files from. This must be a directory, but
+ do not include any terminating "/" character
+ destination: str
+ Root path to copy into. The contents of this location should be
+ identical to the contents of ``source`` when done. This will be made a
+ directory, and the terminal "/" should not be included.
+ delete_missing: bool
+ If there are paths in the destination that don't exist in the
+ source and this is True, delete them. Otherwise, leave them alone.
+ source_field: str | callable
+ If ``update_field`` is "different", this is the key in the info
+ of source files to consider for difference. Maybe a function of the
+ info dict.
+ dest_field: str | callable
+ If ``update_field`` is "different", this is the key in the info
+ of destination files to consider for difference. May be a function of
+ the info dict.
+ update_cond: "different"|"always"|"never"
+ If "always", every file is copied, regardless of whether it exists in
+ the destination. If "never", files that exist in the destination are
+ not copied again. If "different" (default), only copy if the info
+ fields given by ``source_field`` and ``dest_field`` (usually "size")
+ are different. Other comparisons may be added in the future.
+ inst_kwargs: dict|None
+ If ``fs`` is None, use this set of keyword arguments to make a
+ GenericFileSystem instance
+ fs: GenericFileSystem|None
+ Instance to use if explicitly given. The instance defines how to
+ to make downstream file system instances from paths.
+
+ Returns
+ -------
+ dict of the copy operations that were performed, {source: destination}
+ """
+ fs = fs or GenericFileSystem(**(inst_kwargs or {}))
+ source = fs._strip_protocol(source)
+ destination = fs._strip_protocol(destination)
+ allfiles = fs.find(source, withdirs=True, detail=True)
+ if not fs.isdir(source):
+ raise ValueError("Can only rsync on a directory")
+ otherfiles = fs.find(destination, withdirs=True, detail=True)
+ dirs = [
+ a
+ for a, v in allfiles.items()
+ if v["type"] == "directory" and a.replace(source, destination) not in otherfiles
+ ]
+ logger.debug(f"{len(dirs)} directories to create")
+ if dirs:
+ fs.make_many_dirs(
+ [dirn.replace(source, destination) for dirn in dirs], exist_ok=True
+ )
+ allfiles = {a: v for a, v in allfiles.items() if v["type"] == "file"}
+ logger.debug(f"{len(allfiles)} files to consider for copy")
+ to_delete = [
+ o
+ for o, v in otherfiles.items()
+ if o.replace(destination, source) not in allfiles and v["type"] == "file"
+ ]
+ for k, v in allfiles.copy().items():
+ otherfile = k.replace(source, destination)
+ if otherfile in otherfiles:
+ if update_cond == "always":
+ allfiles[k] = otherfile
+ elif update_cond == "different":
+ inf1 = source_field(v) if callable(source_field) else v[source_field]
+ v2 = otherfiles[otherfile]
+ inf2 = dest_field(v2) if callable(dest_field) else v2[dest_field]
+ if inf1 != inf2:
+ # details mismatch, make copy
+ allfiles[k] = otherfile
+ else:
+ # details match, don't copy
+ allfiles.pop(k)
+ else:
+ # file not in target yet
+ allfiles[k] = otherfile
+ logger.debug(f"{len(allfiles)} files to copy")
+ if allfiles:
+ source_files, target_files = zip(*allfiles.items())
+ fs.cp(source_files, target_files, **kwargs)
+ logger.debug(f"{len(to_delete)} files to delete")
+ if delete_missing and to_delete:
+ fs.rm(to_delete)
+ return allfiles
+
+
+class GenericFileSystem(AsyncFileSystem):
+ """Wrapper over all other FS types
+
+ <experimental!>
+
+ This implementation is a single unified interface to be able to run FS operations
+ over generic URLs, and dispatch to the specific implementations using the URL
+ protocol prefix.
+
+ Note: instances of this FS are always async, even if you never use it with any async
+ backend.
+ """
+
+ protocol = "generic" # there is no real reason to ever use a protocol with this FS
+
+ def __init__(self, default_method="default", **kwargs):
+ """
+
+ Parameters
+ ----------
+ default_method: str (optional)
+ Defines how to configure backend FS instances. Options are:
+ - "default": instantiate like FSClass(), with no
+ extra arguments; this is the default instance of that FS, and can be
+ configured via the config system
+ - "generic": takes instances from the `_generic_fs` dict in this module,
+ which you must populate before use. Keys are by protocol
+ - "current": takes the most recently instantiated version of each FS
+ """
+ self.method = default_method
+ super().__init__(**kwargs)
+
+ def _parent(self, path):
+ fs = _resolve_fs(path, self.method)
+ return fs.unstrip_protocol(fs._parent(path))
+
+ def _strip_protocol(self, path):
+ # normalization only
+ fs = _resolve_fs(path, self.method)
+ return fs.unstrip_protocol(fs._strip_protocol(path))
+
+ async def _find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
+ fs = _resolve_fs(path, self.method)
+ if fs.async_impl:
+ out = await fs._find(
+ path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs
+ )
+ else:
+ out = fs.find(
+ path, maxdepth=maxdepth, withdirs=withdirs, detail=True, **kwargs
+ )
+ result = {}
+ for k, v in out.items():
+ v = v.copy() # don't corrupt target FS dircache
+ name = fs.unstrip_protocol(k)
+ v["name"] = name
+ result[name] = v
+ if detail:
+ return result
+ return list(result)
+
+ async def _info(self, url, **kwargs):
+ fs = _resolve_fs(url, self.method)
+ if fs.async_impl:
+ out = await fs._info(url, **kwargs)
+ else:
+ out = fs.info(url, **kwargs)
+ out = out.copy() # don't edit originals
+ out["name"] = fs.unstrip_protocol(out["name"])
+ return out
+
+ async def _ls(
+ self,
+ url,
+ detail=True,
+ **kwargs,
+ ):
+ fs = _resolve_fs(url, self.method)
+ if fs.async_impl:
+ out = await fs._ls(url, detail=True, **kwargs)
+ else:
+ out = fs.ls(url, detail=True, **kwargs)
+ out = [o.copy() for o in out] # don't edit originals
+ for o in out:
+ o["name"] = fs.unstrip_protocol(o["name"])
+ if detail:
+ return out
+ else:
+ return [o["name"] for o in out]
+
+ async def _cat_file(
+ self,
+ url,
+ **kwargs,
+ ):
+ fs = _resolve_fs(url, self.method)
+ if fs.async_impl:
+ return await fs._cat_file(url, **kwargs)
+ else:
+ return fs.cat_file(url, **kwargs)
+
+ async def _pipe_file(
+ self,
+ path,
+ value,
+ **kwargs,
+ ):
+ fs = _resolve_fs(path, self.method)
+ if fs.async_impl:
+ return await fs._pipe_file(path, value, **kwargs)
+ else:
+ return fs.pipe_file(path, value, **kwargs)
+
+ async def _rm(self, url, **kwargs):
+ urls = url
+ if isinstance(urls, str):
+ urls = [urls]
+ fs = _resolve_fs(urls[0], self.method)
+ if fs.async_impl:
+ await fs._rm(urls, **kwargs)
+ else:
+ fs.rm(url, **kwargs)
+
+ async def _makedirs(self, path, exist_ok=False):
+ logger.debug("Make dir %s", path)
+ fs = _resolve_fs(path, self.method)
+ if fs.async_impl:
+ await fs._makedirs(path, exist_ok=exist_ok)
+ else:
+ fs.makedirs(path, exist_ok=exist_ok)
+
+ def rsync(self, source, destination, **kwargs):
+ """Sync files between two directory trees
+
+ See `func:rsync` for more details.
+ """
+ rsync(source, destination, fs=self, **kwargs)
+
+ async def _cp_file(
+ self,
+ url,
+ url2,
+ blocksize=2**20,
+ callback=DEFAULT_CALLBACK,
+ **kwargs,
+ ):
+ fs = _resolve_fs(url, self.method)
+ fs2 = _resolve_fs(url2, self.method)
+ if fs is fs2:
+ # pure remote
+ if fs.async_impl:
+ return await fs._cp_file(url, url2, **kwargs)
+ else:
+ return fs.cp_file(url, url2, **kwargs)
+ kw = {"blocksize": 0, "cache_type": "none"}
+ try:
+ f1 = (
+ await fs.open_async(url, "rb")
+ if hasattr(fs, "open_async")
+ else fs.open(url, "rb", **kw)
+ )
+ callback.set_size(await maybe_await(f1.size))
+ f2 = (
+ await fs2.open_async(url2, "wb")
+ if hasattr(fs2, "open_async")
+ else fs2.open(url2, "wb", **kw)
+ )
+ while f1.size is None or f2.tell() < f1.size:
+ data = await maybe_await(f1.read(blocksize))
+ if f1.size is None and not data:
+ break
+ await maybe_await(f2.write(data))
+ callback.absolute_update(f2.tell())
+ finally:
+ try:
+ await maybe_await(f2.close())
+ await maybe_await(f1.close())
+ except NameError:
+ # fail while opening f1 or f2
+ pass
+
+ async def _make_many_dirs(self, urls, exist_ok=True):
+ fs = _resolve_fs(urls[0], self.method)
+ if fs.async_impl:
+ coros = [fs._makedirs(u, exist_ok=exist_ok) for u in urls]
+ await _run_coros_in_chunks(coros)
+ else:
+ for u in urls:
+ fs.makedirs(u, exist_ok=exist_ok)
+
+ make_many_dirs = sync_wrapper(_make_many_dirs)
+
+ async def _copy(
+ self,
+ path1: list[str],
+ path2: list[str],
+ recursive: bool = False,
+ on_error: str = "ignore",
+ maxdepth: Optional[int] = None,
+ batch_size: Optional[int] = None,
+ tempdir: Optional[str] = None,
+ **kwargs,
+ ):
+ if recursive:
+ raise NotImplementedError
+ fs = _resolve_fs(path1[0], self.method)
+ fs2 = _resolve_fs(path2[0], self.method)
+ # not expanding paths atm., assume call is from rsync()
+ if fs is fs2:
+ # pure remote
+ if fs.async_impl:
+ return await fs._copy(path1, path2, **kwargs)
+ else:
+ return fs.copy(path1, path2, **kwargs)
+ await copy_file_op(
+ fs, path1, fs2, path2, tempdir, batch_size, on_error=on_error
+ )
+
+
+async def copy_file_op(
+ fs1, url1, fs2, url2, tempdir=None, batch_size=20, on_error="ignore"
+):
+ import tempfile
+
+ tempdir = tempdir or tempfile.mkdtemp()
+ try:
+ coros = [
+ _copy_file_op(
+ fs1,
+ u1,
+ fs2,
+ u2,
+ os.path.join(tempdir, uuid.uuid4().hex),
+ on_error=on_error,
+ )
+ for u1, u2 in zip(url1, url2)
+ ]
+ await _run_coros_in_chunks(coros, batch_size=batch_size)
+ finally:
+ shutil.rmtree(tempdir)
+
+
+async def _copy_file_op(fs1, url1, fs2, url2, local, on_error="ignore"):
+ ex = () if on_error == "raise" else Exception
+ logger.debug("Copy %s -> %s", url1, url2)
+ try:
+ if fs1.async_impl:
+ await fs1._get_file(url1, local)
+ else:
+ fs1.get_file(url1, local)
+ if fs2.async_impl:
+ await fs2._put_file(local, url2)
+ else:
+ fs2.put_file(local, url2)
+ os.unlink(local)
+ logger.debug("Copy %s -> %s; done", url1, url2)
+ except ex as e:
+ logger.debug("ignoring cp exception for %s: %s", url1, e)
+
+
+async def maybe_await(cor):
+ if inspect.iscoroutine(cor):
+ return await cor
+ else:
+ return cor