aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/fsspec/asyn.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/asyn.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/asyn.py1098
1 files changed, 1098 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/asyn.py b/.venv/lib/python3.12/site-packages/fsspec/asyn.py
new file mode 100644
index 00000000..adeb88fb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/asyn.py
@@ -0,0 +1,1098 @@
+import asyncio
+import asyncio.events
+import functools
+import inspect
+import io
+import numbers
+import os
+import re
+import threading
+from contextlib import contextmanager
+from glob import has_magic
+from typing import TYPE_CHECKING, Iterable
+
+from .callbacks import DEFAULT_CALLBACK
+from .exceptions import FSTimeoutError
+from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep
+from .spec import AbstractBufferedFile, AbstractFileSystem
+from .utils import glob_translate, is_exception, other_paths
+
+private = re.compile("_[^_]")
+iothread = [None] # dedicated fsspec IO thread
+loop = [None] # global event loop for any non-async instance
+_lock = None # global lock placeholder
+get_running_loop = asyncio.get_running_loop
+
+
+def get_lock():
+ """Allocate or return a threading lock.
+
+ The lock is allocated on first use to allow setting one lock per forked process.
+ """
+ global _lock
+ if not _lock:
+ _lock = threading.Lock()
+ return _lock
+
+
+def reset_lock():
+ """Reset the global lock.
+
+ This should be called only on the init of a forked process to reset the lock to
+ None, enabling the new forked process to get a new lock.
+ """
+ global _lock
+
+ iothread[0] = None
+ loop[0] = None
+ _lock = None
+
+
+async def _runner(event, coro, result, timeout=None):
+ timeout = timeout if timeout else None # convert 0 or 0.0 to None
+ if timeout is not None:
+ coro = asyncio.wait_for(coro, timeout=timeout)
+ try:
+ result[0] = await coro
+ except Exception as ex:
+ result[0] = ex
+ finally:
+ event.set()
+
+
+def sync(loop, func, *args, timeout=None, **kwargs):
+ """
+ Make loop run coroutine until it returns. Runs in other thread
+
+ Examples
+ --------
+ >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
+ timeout=timeout, **kwargs)
+ """
+ timeout = timeout if timeout else None # convert 0 or 0.0 to None
+ # NB: if the loop is not running *yet*, it is OK to submit work
+ # and we will wait for it
+ if loop is None or loop.is_closed():
+ raise RuntimeError("Loop is not running")
+ try:
+ loop0 = asyncio.events.get_running_loop()
+ if loop0 is loop:
+ raise NotImplementedError("Calling sync() from within a running loop")
+ except NotImplementedError:
+ raise
+ except RuntimeError:
+ pass
+ coro = func(*args, **kwargs)
+ result = [None]
+ event = threading.Event()
+ asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
+ while True:
+ # this loops allows thread to get interrupted
+ if event.wait(1):
+ break
+ if timeout is not None:
+ timeout -= 1
+ if timeout < 0:
+ raise FSTimeoutError
+
+ return_result = result[0]
+ if isinstance(return_result, asyncio.TimeoutError):
+ # suppress asyncio.TimeoutError, raise FSTimeoutError
+ raise FSTimeoutError from return_result
+ elif isinstance(return_result, BaseException):
+ raise return_result
+ else:
+ return return_result
+
+
+def sync_wrapper(func, obj=None):
+ """Given a function, make so can be called in blocking contexts
+
+ Leave obj=None if defining within a class. Pass the instance if attaching
+ as an attribute of the instance.
+ """
+
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ self = obj or args[0]
+ return sync(self.loop, func, *args, **kwargs)
+
+ return wrapper
+
+
+@contextmanager
+def _selector_policy():
+ original_policy = asyncio.get_event_loop_policy()
+ try:
+ if os.name == "nt" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
+ asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
+
+ yield
+ finally:
+ asyncio.set_event_loop_policy(original_policy)
+
+
+def get_loop():
+ """Create or return the default fsspec IO loop
+
+ The loop will be running on a separate thread.
+ """
+ if loop[0] is None:
+ with get_lock():
+ # repeat the check just in case the loop got filled between the
+ # previous two calls from another thread
+ if loop[0] is None:
+ with _selector_policy():
+ loop[0] = asyncio.new_event_loop()
+ th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
+ th.daemon = True
+ th.start()
+ iothread[0] = th
+ return loop[0]
+
+
+if TYPE_CHECKING:
+ import resource
+
+ ResourceError = resource.error
+else:
+ try:
+ import resource
+ except ImportError:
+ resource = None
+ ResourceError = OSError
+ else:
+ ResourceError = getattr(resource, "error", OSError)
+
+_DEFAULT_BATCH_SIZE = 128
+_NOFILES_DEFAULT_BATCH_SIZE = 1280
+
+
+def _get_batch_size(nofiles=False):
+ from fsspec.config import conf
+
+ if nofiles:
+ if "nofiles_gather_batch_size" in conf:
+ return conf["nofiles_gather_batch_size"]
+ else:
+ if "gather_batch_size" in conf:
+ return conf["gather_batch_size"]
+ if nofiles:
+ return _NOFILES_DEFAULT_BATCH_SIZE
+ if resource is None:
+ return _DEFAULT_BATCH_SIZE
+
+ try:
+ soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
+ except (ImportError, ValueError, ResourceError):
+ return _DEFAULT_BATCH_SIZE
+
+ if soft_limit == resource.RLIM_INFINITY:
+ return -1
+ else:
+ return soft_limit // 8
+
+
+def running_async() -> bool:
+ """Being executed by an event loop?"""
+ try:
+ asyncio.get_running_loop()
+ return True
+ except RuntimeError:
+ return False
+
+
+async def _run_coros_in_chunks(
+ coros,
+ batch_size=None,
+ callback=DEFAULT_CALLBACK,
+ timeout=None,
+ return_exceptions=False,
+ nofiles=False,
+):
+ """Run the given coroutines in chunks.
+
+ Parameters
+ ----------
+ coros: list of coroutines to run
+ batch_size: int or None
+ Number of coroutines to submit/wait on simultaneously.
+ If -1, then it will not be any throttling. If
+ None, it will be inferred from _get_batch_size()
+ callback: fsspec.callbacks.Callback instance
+ Gets a relative_update when each coroutine completes
+ timeout: number or None
+ If given, each coroutine times out after this time. Note that, since
+ there are multiple batches, the total run time of this function will in
+ general be longer
+ return_exceptions: bool
+ Same meaning as in asyncio.gather
+ nofiles: bool
+ If inferring the batch_size, does this operation involve local files?
+ If yes, you normally expect smaller batches.
+ """
+
+ if batch_size is None:
+ batch_size = _get_batch_size(nofiles=nofiles)
+
+ if batch_size == -1:
+ batch_size = len(coros)
+
+ assert batch_size > 0
+
+ async def _run_coro(coro, i):
+ try:
+ return await asyncio.wait_for(coro, timeout=timeout), i
+ except Exception as e:
+ if not return_exceptions:
+ raise
+ return e, i
+ finally:
+ callback.relative_update(1)
+
+ i = 0
+ n = len(coros)
+ results = [None] * n
+ pending = set()
+
+ while pending or i < n:
+ while len(pending) < batch_size and i < n:
+ pending.add(asyncio.ensure_future(_run_coro(coros[i], i)))
+ i += 1
+
+ if not pending:
+ break
+
+ done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
+ while done:
+ result, k = await done.pop()
+ results[k] = result
+
+ return results
+
+
+# these methods should be implemented as async by any async-able backend
+async_methods = [
+ "_ls",
+ "_cat_file",
+ "_get_file",
+ "_put_file",
+ "_rm_file",
+ "_cp_file",
+ "_pipe_file",
+ "_expand_path",
+ "_info",
+ "_isfile",
+ "_isdir",
+ "_exists",
+ "_walk",
+ "_glob",
+ "_find",
+ "_du",
+ "_size",
+ "_mkdir",
+ "_makedirs",
+]
+
+
+class AsyncFileSystem(AbstractFileSystem):
+ """Async file operations, default implementations
+
+ Passes bulk operations to asyncio.gather for concurrent operation.
+
+ Implementations that have concurrent batch operations and/or async methods
+ should inherit from this class instead of AbstractFileSystem. Docstrings are
+ copied from the un-underscored method in AbstractFileSystem, if not given.
+ """
+
+ # note that methods do not have docstring here; they will be copied
+ # for _* methods and inferred for overridden methods.
+
+ async_impl = True
+ mirror_sync_methods = True
+ disable_throttling = False
+
+ def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
+ self.asynchronous = asynchronous
+ self._pid = os.getpid()
+ if not asynchronous:
+ self._loop = loop or get_loop()
+ else:
+ self._loop = None
+ self.batch_size = batch_size
+ super().__init__(*args, **kwargs)
+
+ @property
+ def loop(self):
+ if self._pid != os.getpid():
+ raise RuntimeError("This class is not fork-safe")
+ return self._loop
+
+ async def _rm_file(self, path, **kwargs):
+ raise NotImplementedError
+
+ async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
+ # TODO: implement on_error
+ batch_size = batch_size or self.batch_size
+ path = await self._expand_path(path, recursive=recursive)
+ return await _run_coros_in_chunks(
+ [self._rm_file(p, **kwargs) for p in reversed(path)],
+ batch_size=batch_size,
+ nofiles=True,
+ )
+
+ async def _cp_file(self, path1, path2, **kwargs):
+ raise NotImplementedError
+
+ async def _mv_file(self, path1, path2):
+ await self._cp_file(path1, path2)
+ await self._rm_file(path1)
+
+ async def _copy(
+ self,
+ path1,
+ path2,
+ recursive=False,
+ on_error=None,
+ maxdepth=None,
+ batch_size=None,
+ **kwargs,
+ ):
+ if on_error is None and recursive:
+ on_error = "ignore"
+ elif on_error is None:
+ on_error = "raise"
+
+ if isinstance(path1, list) and isinstance(path2, list):
+ # No need to expand paths when both source and destination
+ # are provided as lists
+ paths1 = path1
+ paths2 = path2
+ else:
+ source_is_str = isinstance(path1, str)
+ paths1 = await self._expand_path(
+ path1, maxdepth=maxdepth, recursive=recursive
+ )
+ if source_is_str and (not recursive or maxdepth is not None):
+ # Non-recursive glob does not copy directories
+ paths1 = [
+ p for p in paths1 if not (trailing_sep(p) or await self._isdir(p))
+ ]
+ if not paths1:
+ return
+
+ source_is_file = len(paths1) == 1
+ dest_is_dir = isinstance(path2, str) and (
+ trailing_sep(path2) or await self._isdir(path2)
+ )
+
+ exists = source_is_str and (
+ (has_magic(path1) and source_is_file)
+ or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
+ )
+ paths2 = other_paths(
+ paths1,
+ path2,
+ exists=exists,
+ flatten=not source_is_str,
+ )
+
+ batch_size = batch_size or self.batch_size
+ coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)]
+ result = await _run_coros_in_chunks(
+ coros, batch_size=batch_size, return_exceptions=True, nofiles=True
+ )
+
+ for ex in filter(is_exception, result):
+ if on_error == "ignore" and isinstance(ex, FileNotFoundError):
+ continue
+ raise ex
+
+ async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
+ raise NotImplementedError
+
+ async def _pipe(self, path, value=None, batch_size=None, **kwargs):
+ if isinstance(path, str):
+ path = {path: value}
+ batch_size = batch_size or self.batch_size
+ return await _run_coros_in_chunks(
+ [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
+ batch_size=batch_size,
+ nofiles=True,
+ )
+
+ async def _process_limits(self, url, start, end):
+ """Helper for "Range"-based _cat_file"""
+ size = None
+ suff = False
+ if start is not None and start < 0:
+ # if start is negative and end None, end is the "suffix length"
+ if end is None:
+ end = -start
+ start = ""
+ suff = True
+ else:
+ size = size or (await self._info(url))["size"]
+ start = size + start
+ elif start is None:
+ start = 0
+ if not suff:
+ if end is not None and end < 0:
+ if start is not None:
+ size = size or (await self._info(url))["size"]
+ end = size + end
+ elif end is None:
+ end = ""
+ if isinstance(end, numbers.Integral):
+ end -= 1 # bytes range is inclusive
+ return f"bytes={start}-{end}"
+
+ async def _cat_file(self, path, start=None, end=None, **kwargs):
+ raise NotImplementedError
+
+ async def _cat(
+ self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
+ ):
+ paths = await self._expand_path(path, recursive=recursive)
+ coros = [self._cat_file(path, **kwargs) for path in paths]
+ batch_size = batch_size or self.batch_size
+ out = await _run_coros_in_chunks(
+ coros, batch_size=batch_size, nofiles=True, return_exceptions=True
+ )
+ if on_error == "raise":
+ ex = next(filter(is_exception, out), False)
+ if ex:
+ raise ex
+ if (
+ len(paths) > 1
+ or isinstance(path, list)
+ or paths[0] != self._strip_protocol(path)
+ ):
+ return {
+ k: v
+ for k, v in zip(paths, out)
+ if on_error != "omit" or not is_exception(v)
+ }
+ else:
+ return out[0]
+
+ async def _cat_ranges(
+ self,
+ paths,
+ starts,
+ ends,
+ max_gap=None,
+ batch_size=None,
+ on_error="return",
+ **kwargs,
+ ):
+ """Get the contents of byte ranges from one or more files
+
+ Parameters
+ ----------
+ paths: list
+ A list of of filepaths on this filesystems
+ starts, ends: int or list
+ Bytes limits of the read. If using a single int, the same value will be
+ used to read all the specified files.
+ """
+ # TODO: on_error
+ if max_gap is not None:
+ # use utils.merge_offset_ranges
+ raise NotImplementedError
+ if not isinstance(paths, list):
+ raise TypeError
+ if not isinstance(starts, Iterable):
+ starts = [starts] * len(paths)
+ if not isinstance(ends, Iterable):
+ ends = [ends] * len(paths)
+ if len(starts) != len(paths) or len(ends) != len(paths):
+ raise ValueError
+ coros = [
+ self._cat_file(p, start=s, end=e, **kwargs)
+ for p, s, e in zip(paths, starts, ends)
+ ]
+ batch_size = batch_size or self.batch_size
+ return await _run_coros_in_chunks(
+ coros, batch_size=batch_size, nofiles=True, return_exceptions=True
+ )
+
+ async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
+ raise NotImplementedError
+
+ async def _put(
+ self,
+ lpath,
+ rpath,
+ recursive=False,
+ callback=DEFAULT_CALLBACK,
+ batch_size=None,
+ maxdepth=None,
+ **kwargs,
+ ):
+ """Copy file(s) from local.
+
+ Copies a specific file or tree of files (if recursive=True). If rpath
+ ends with a "/", it will be assumed to be a directory, and target files
+ will go within.
+
+ The put_file method will be called concurrently on a batch of files. The
+ batch_size option can configure the amount of futures that can be executed
+ at the same time. If it is -1, then all the files will be uploaded concurrently.
+ The default can be set for this instance by passing "batch_size" in the
+ constructor, or for all instances by setting the "gather_batch_size" key
+ in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
+ """
+ if isinstance(lpath, list) and isinstance(rpath, list):
+ # No need to expand paths when both source and destination
+ # are provided as lists
+ rpaths = rpath
+ lpaths = lpath
+ else:
+ source_is_str = isinstance(lpath, str)
+ if source_is_str:
+ lpath = make_path_posix(lpath)
+ fs = LocalFileSystem()
+ lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
+ if source_is_str and (not recursive or maxdepth is not None):
+ # Non-recursive glob does not copy directories
+ lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
+ if not lpaths:
+ return
+
+ source_is_file = len(lpaths) == 1
+ dest_is_dir = isinstance(rpath, str) and (
+ trailing_sep(rpath) or await self._isdir(rpath)
+ )
+
+ rpath = self._strip_protocol(rpath)
+ exists = source_is_str and (
+ (has_magic(lpath) and source_is_file)
+ or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
+ )
+ rpaths = other_paths(
+ lpaths,
+ rpath,
+ exists=exists,
+ flatten=not source_is_str,
+ )
+
+ is_dir = {l: os.path.isdir(l) for l in lpaths}
+ rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
+ file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]]
+
+ await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
+ batch_size = batch_size or self.batch_size
+
+ coros = []
+ callback.set_size(len(file_pairs))
+ for lfile, rfile in file_pairs:
+ put_file = callback.branch_coro(self._put_file)
+ coros.append(put_file(lfile, rfile, **kwargs))
+
+ return await _run_coros_in_chunks(
+ coros, batch_size=batch_size, callback=callback
+ )
+
+ async def _get_file(self, rpath, lpath, **kwargs):
+ raise NotImplementedError
+
+ async def _get(
+ self,
+ rpath,
+ lpath,
+ recursive=False,
+ callback=DEFAULT_CALLBACK,
+ maxdepth=None,
+ **kwargs,
+ ):
+ """Copy file(s) to local.
+
+ Copies a specific file or tree of files (if recursive=True). If lpath
+ ends with a "/", it will be assumed to be a directory, and target files
+ will go within. Can submit a list of paths, which may be glob-patterns
+ and will be expanded.
+
+ The get_file method will be called concurrently on a batch of files. The
+ batch_size option can configure the amount of futures that can be executed
+ at the same time. If it is -1, then all the files will be uploaded concurrently.
+ The default can be set for this instance by passing "batch_size" in the
+ constructor, or for all instances by setting the "gather_batch_size" key
+ in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
+ """
+ if isinstance(lpath, list) and isinstance(rpath, list):
+ # No need to expand paths when both source and destination
+ # are provided as lists
+ rpaths = rpath
+ lpaths = lpath
+ else:
+ source_is_str = isinstance(rpath, str)
+ # First check for rpath trailing slash as _strip_protocol removes it.
+ source_not_trailing_sep = source_is_str and not trailing_sep(rpath)
+ rpath = self._strip_protocol(rpath)
+ rpaths = await self._expand_path(
+ rpath, recursive=recursive, maxdepth=maxdepth
+ )
+ if source_is_str and (not recursive or maxdepth is not None):
+ # Non-recursive glob does not copy directories
+ rpaths = [
+ p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
+ ]
+ if not rpaths:
+ return
+
+ lpath = make_path_posix(lpath)
+ source_is_file = len(rpaths) == 1
+ dest_is_dir = isinstance(lpath, str) and (
+ trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
+ )
+
+ exists = source_is_str and (
+ (has_magic(rpath) and source_is_file)
+ or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep)
+ )
+ lpaths = other_paths(
+ rpaths,
+ lpath,
+ exists=exists,
+ flatten=not source_is_str,
+ )
+
+ [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
+ batch_size = kwargs.pop("batch_size", self.batch_size)
+
+ coros = []
+ callback.set_size(len(lpaths))
+ for lpath, rpath in zip(lpaths, rpaths):
+ get_file = callback.branch_coro(self._get_file)
+ coros.append(get_file(rpath, lpath, **kwargs))
+ return await _run_coros_in_chunks(
+ coros, batch_size=batch_size, callback=callback
+ )
+
+ async def _isfile(self, path):
+ try:
+ return (await self._info(path))["type"] == "file"
+ except: # noqa: E722
+ return False
+
+ async def _isdir(self, path):
+ try:
+ return (await self._info(path))["type"] == "directory"
+ except OSError:
+ return False
+
+ async def _size(self, path):
+ return (await self._info(path)).get("size", None)
+
+ async def _sizes(self, paths, batch_size=None):
+ batch_size = batch_size or self.batch_size
+ return await _run_coros_in_chunks(
+ [self._size(p) for p in paths], batch_size=batch_size
+ )
+
+ async def _exists(self, path, **kwargs):
+ try:
+ await self._info(path, **kwargs)
+ return True
+ except FileNotFoundError:
+ return False
+
+ async def _info(self, path, **kwargs):
+ raise NotImplementedError
+
+ async def _ls(self, path, detail=True, **kwargs):
+ raise NotImplementedError
+
+ async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
+ if maxdepth is not None and maxdepth < 1:
+ raise ValueError("maxdepth must be at least 1")
+
+ path = self._strip_protocol(path)
+ full_dirs = {}
+ dirs = {}
+ files = {}
+
+ detail = kwargs.pop("detail", False)
+ try:
+ listing = await self._ls(path, detail=True, **kwargs)
+ except (FileNotFoundError, OSError) as e:
+ if on_error == "raise":
+ raise
+ elif callable(on_error):
+ on_error(e)
+ if detail:
+ yield path, {}, {}
+ else:
+ yield path, [], []
+ return
+
+ for info in listing:
+ # each info name must be at least [path]/part , but here
+ # we check also for names like [path]/part/
+ pathname = info["name"].rstrip("/")
+ name = pathname.rsplit("/", 1)[-1]
+ if info["type"] == "directory" and pathname != path:
+ # do not include "self" path
+ full_dirs[name] = pathname
+ dirs[name] = info
+ elif pathname == path:
+ # file-like with same name as give path
+ files[""] = info
+ else:
+ files[name] = info
+
+ if detail:
+ yield path, dirs, files
+ else:
+ yield path, list(dirs), list(files)
+
+ if maxdepth is not None:
+ maxdepth -= 1
+ if maxdepth < 1:
+ return
+
+ for d in dirs:
+ async for _ in self._walk(
+ full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs
+ ):
+ yield _
+
+ async def _glob(self, path, maxdepth=None, **kwargs):
+ if maxdepth is not None and maxdepth < 1:
+ raise ValueError("maxdepth must be at least 1")
+
+ import re
+
+ seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
+ ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
+ path = self._strip_protocol(path)
+ append_slash_to_dirname = ends_with_sep or path.endswith(
+ tuple(sep + "**" for sep in seps)
+ )
+ idx_star = path.find("*") if path.find("*") >= 0 else len(path)
+ idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
+ idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
+
+ min_idx = min(idx_star, idx_qmark, idx_brace)
+
+ detail = kwargs.pop("detail", False)
+
+ if not has_magic(path):
+ if await self._exists(path, **kwargs):
+ if not detail:
+ return [path]
+ else:
+ return {path: await self._info(path, **kwargs)}
+ else:
+ if not detail:
+ return [] # glob of non-existent returns empty
+ else:
+ return {}
+ elif "/" in path[:min_idx]:
+ min_idx = path[:min_idx].rindex("/")
+ root = path[: min_idx + 1]
+ depth = path[min_idx + 1 :].count("/") + 1
+ else:
+ root = ""
+ depth = path[min_idx + 1 :].count("/") + 1
+
+ if "**" in path:
+ if maxdepth is not None:
+ idx_double_stars = path.find("**")
+ depth_double_stars = path[idx_double_stars:].count("/") + 1
+ depth = depth - depth_double_stars + maxdepth
+ else:
+ depth = None
+
+ allpaths = await self._find(
+ root, maxdepth=depth, withdirs=True, detail=True, **kwargs
+ )
+
+ pattern = glob_translate(path + ("/" if ends_with_sep else ""))
+ pattern = re.compile(pattern)
+
+ out = {
+ p: info
+ for p, info in sorted(allpaths.items())
+ if pattern.match(
+ p + "/"
+ if append_slash_to_dirname and info["type"] == "directory"
+ else p
+ )
+ }
+
+ if detail:
+ return out
+ else:
+ return list(out)
+
+ async def _du(self, path, total=True, maxdepth=None, **kwargs):
+ sizes = {}
+ # async for?
+ for f in await self._find(path, maxdepth=maxdepth, **kwargs):
+ info = await self._info(f)
+ sizes[info["name"]] = info["size"]
+ if total:
+ return sum(sizes.values())
+ else:
+ return sizes
+
+ async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
+ path = self._strip_protocol(path)
+ out = {}
+ detail = kwargs.pop("detail", False)
+
+ # Add the root directory if withdirs is requested
+ # This is needed for posix glob compliance
+ if withdirs and path != "" and await self._isdir(path):
+ out[path] = await self._info(path)
+
+ # async for?
+ async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
+ if withdirs:
+ files.update(dirs)
+ out.update({info["name"]: info for name, info in files.items()})
+ if not out and (await self._isfile(path)):
+ # walk works on directories, but find should also return [path]
+ # when path happens to be a file
+ out[path] = {}
+ names = sorted(out)
+ if not detail:
+ return names
+ else:
+ return {name: out[name] for name in names}
+
+ async def _expand_path(self, path, recursive=False, maxdepth=None):
+ if maxdepth is not None and maxdepth < 1:
+ raise ValueError("maxdepth must be at least 1")
+
+ if isinstance(path, str):
+ out = await self._expand_path([path], recursive, maxdepth)
+ else:
+ out = set()
+ path = [self._strip_protocol(p) for p in path]
+ for p in path: # can gather here
+ if has_magic(p):
+ bit = set(await self._glob(p, maxdepth=maxdepth))
+ out |= bit
+ if recursive:
+ # glob call above expanded one depth so if maxdepth is defined
+ # then decrement it in expand_path call below. If it is zero
+ # after decrementing then avoid expand_path call.
+ if maxdepth is not None and maxdepth <= 1:
+ continue
+ out |= set(
+ await self._expand_path(
+ list(bit),
+ recursive=recursive,
+ maxdepth=maxdepth - 1 if maxdepth is not None else None,
+ )
+ )
+ continue
+ elif recursive:
+ rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
+ out |= rec
+ if p not in out and (recursive is False or (await self._exists(p))):
+ # should only check once, for the root
+ out.add(p)
+ if not out:
+ raise FileNotFoundError(path)
+ return sorted(out)
+
+ async def _mkdir(self, path, create_parents=True, **kwargs):
+ pass # not necessary to implement, may not have directories
+
+ async def _makedirs(self, path, exist_ok=False):
+ pass # not necessary to implement, may not have directories
+
+ async def open_async(self, path, mode="rb", **kwargs):
+ if "b" not in mode or kwargs.get("compression"):
+ raise ValueError
+ raise NotImplementedError
+
+
+def mirror_sync_methods(obj):
+ """Populate sync and async methods for obj
+
+ For each method will create a sync version if the name refers to an async method
+ (coroutine) and there is no override in the child class; will create an async
+ method for the corresponding sync method if there is no implementation.
+
+ Uses the methods specified in
+ - async_methods: the set that an implementation is expected to provide
+ - default_async_methods: that can be derived from their sync version in
+ AbstractFileSystem
+ - AsyncFileSystem: async-specific default coroutines
+ """
+ from fsspec import AbstractFileSystem
+
+ for method in async_methods + dir(AsyncFileSystem):
+ if not method.startswith("_"):
+ continue
+ smethod = method[1:]
+ if private.match(method):
+ isco = inspect.iscoroutinefunction(getattr(obj, method, None))
+ unsync = getattr(getattr(obj, smethod, False), "__func__", None)
+ is_default = unsync is getattr(AbstractFileSystem, smethod, "")
+ if isco and is_default:
+ mth = sync_wrapper(getattr(obj, method), obj=obj)
+ setattr(obj, smethod, mth)
+ if not mth.__doc__:
+ mth.__doc__ = getattr(
+ getattr(AbstractFileSystem, smethod, None), "__doc__", ""
+ )
+
+
+class FSSpecCoroutineCancel(Exception):
+ pass
+
+
+def _dump_running_tasks(
+ printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
+):
+ import traceback
+
+ tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
+ if printout:
+ [task.print_stack() for task in tasks]
+ out = [
+ {
+ "locals": task._coro.cr_frame.f_locals,
+ "file": task._coro.cr_frame.f_code.co_filename,
+ "firstline": task._coro.cr_frame.f_code.co_firstlineno,
+ "linelo": task._coro.cr_frame.f_lineno,
+ "stack": traceback.format_stack(task._coro.cr_frame),
+ "task": task if with_task else None,
+ }
+ for task in tasks
+ ]
+ if cancel:
+ for t in tasks:
+ cbs = t._callbacks
+ t.cancel()
+ asyncio.futures.Future.set_exception(t, exc)
+ asyncio.futures.Future.cancel(t)
+ [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures
+ try:
+ t._coro.throw(exc) # exits coro, unless explicitly handled
+ except exc:
+ pass
+ return out
+
+
+class AbstractAsyncStreamedFile(AbstractBufferedFile):
+ # no read buffering, and always auto-commit
+ # TODO: readahead might still be useful here, but needs async version
+
+ async def read(self, length=-1):
+ """
+ Return data from cache, or fetch pieces as necessary
+
+ Parameters
+ ----------
+ length: int (-1)
+ Number of bytes to read; if <0, all remaining bytes.
+ """
+ length = -1 if length is None else int(length)
+ if self.mode != "rb":
+ raise ValueError("File not in read mode")
+ if length < 0:
+ length = self.size - self.loc
+ if self.closed:
+ raise ValueError("I/O operation on closed file.")
+ if length == 0:
+ # don't even bother calling fetch
+ return b""
+ out = await self._fetch_range(self.loc, self.loc + length)
+ self.loc += len(out)
+ return out
+
+ async def write(self, data):
+ """
+ Write data to buffer.
+
+ Buffer only sent on flush() or if buffer is greater than
+ or equal to blocksize.
+
+ Parameters
+ ----------
+ data: bytes
+ Set of bytes to be written.
+ """
+ if self.mode not in {"wb", "ab"}:
+ raise ValueError("File not in write mode")
+ if self.closed:
+ raise ValueError("I/O operation on closed file.")
+ if self.forced:
+ raise ValueError("This file has been force-flushed, can only close")
+ out = self.buffer.write(data)
+ self.loc += out
+ if self.buffer.tell() >= self.blocksize:
+ await self.flush()
+ return out
+
+ async def close(self):
+ """Close file
+
+ Finalizes writes, discards cache
+ """
+ if getattr(self, "_unclosable", False):
+ return
+ if self.closed:
+ return
+ if self.mode == "rb":
+ self.cache = None
+ else:
+ if not self.forced:
+ await self.flush(force=True)
+
+ if self.fs is not None:
+ self.fs.invalidate_cache(self.path)
+ self.fs.invalidate_cache(self.fs._parent(self.path))
+
+ self.closed = True
+
+ async def flush(self, force=False):
+ if self.closed:
+ raise ValueError("Flush on closed file")
+ if force and self.forced:
+ raise ValueError("Force flush cannot be called more than once")
+ if force:
+ self.forced = True
+
+ if self.mode not in {"wb", "ab"}:
+ # no-op to flush on read-mode
+ return
+
+ if not force and self.buffer.tell() < self.blocksize:
+ # Defer write on small block
+ return
+
+ if self.offset is None:
+ # Initialize a multipart upload
+ self.offset = 0
+ try:
+ await self._initiate_upload()
+ except:
+ self.closed = True
+ raise
+
+ if await self._upload_chunk(final=force) is not False:
+ self.offset += self.buffer.seek(0, 2)
+ self.buffer = io.BytesIO()
+
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ await self.close()
+
+ async def _fetch_range(self, start, end):
+ raise NotImplementedError
+
+ async def _initiate_upload(self):
+ pass
+
+ async def _upload_chunk(self, final=False):
+ raise NotImplementedError