about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/asyn.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/fsspec/asyn.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/asyn.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/asyn.py1098
1 files changed, 1098 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/asyn.py b/.venv/lib/python3.12/site-packages/fsspec/asyn.py
new file mode 100644
index 00000000..adeb88fb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/asyn.py
@@ -0,0 +1,1098 @@
+import asyncio
+import asyncio.events
+import functools
+import inspect
+import io
+import numbers
+import os
+import re
+import threading
+from contextlib import contextmanager
+from glob import has_magic
+from typing import TYPE_CHECKING, Iterable
+
+from .callbacks import DEFAULT_CALLBACK
+from .exceptions import FSTimeoutError
+from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep
+from .spec import AbstractBufferedFile, AbstractFileSystem
+from .utils import glob_translate, is_exception, other_paths
+
+private = re.compile("_[^_]")
+iothread = [None]  # dedicated fsspec IO thread
+loop = [None]  # global event loop for any non-async instance
+_lock = None  # global lock placeholder
+get_running_loop = asyncio.get_running_loop
+
+
+def get_lock():
+    """Allocate or return a threading lock.
+
+    The lock is allocated on first use to allow setting one lock per forked process.
+    """
+    global _lock
+    if not _lock:
+        _lock = threading.Lock()
+    return _lock
+
+
+def reset_lock():
+    """Reset the global lock.
+
+    This should be called only on the init of a forked process to reset the lock to
+    None, enabling the new forked process to get a new lock.
+    """
+    global _lock
+
+    iothread[0] = None
+    loop[0] = None
+    _lock = None
+
+
+async def _runner(event, coro, result, timeout=None):
+    timeout = timeout if timeout else None  # convert 0 or 0.0 to None
+    if timeout is not None:
+        coro = asyncio.wait_for(coro, timeout=timeout)
+    try:
+        result[0] = await coro
+    except Exception as ex:
+        result[0] = ex
+    finally:
+        event.set()
+
+
+def sync(loop, func, *args, timeout=None, **kwargs):
+    """
+    Make loop run coroutine until it returns. Runs in other thread
+
+    Examples
+    --------
+    >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args,
+                         timeout=timeout, **kwargs)
+    """
+    timeout = timeout if timeout else None  # convert 0 or 0.0 to None
+    # NB: if the loop is not running *yet*, it is OK to submit work
+    # and we will wait for it
+    if loop is None or loop.is_closed():
+        raise RuntimeError("Loop is not running")
+    try:
+        loop0 = asyncio.events.get_running_loop()
+        if loop0 is loop:
+            raise NotImplementedError("Calling sync() from within a running loop")
+    except NotImplementedError:
+        raise
+    except RuntimeError:
+        pass
+    coro = func(*args, **kwargs)
+    result = [None]
+    event = threading.Event()
+    asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop)
+    while True:
+        # this loops allows thread to get interrupted
+        if event.wait(1):
+            break
+        if timeout is not None:
+            timeout -= 1
+            if timeout < 0:
+                raise FSTimeoutError
+
+    return_result = result[0]
+    if isinstance(return_result, asyncio.TimeoutError):
+        # suppress asyncio.TimeoutError, raise FSTimeoutError
+        raise FSTimeoutError from return_result
+    elif isinstance(return_result, BaseException):
+        raise return_result
+    else:
+        return return_result
+
+
+def sync_wrapper(func, obj=None):
+    """Given a function, make so can be called in blocking contexts
+
+    Leave obj=None if defining within a class. Pass the instance if attaching
+    as an attribute of the instance.
+    """
+
+    @functools.wraps(func)
+    def wrapper(*args, **kwargs):
+        self = obj or args[0]
+        return sync(self.loop, func, *args, **kwargs)
+
+    return wrapper
+
+
+@contextmanager
+def _selector_policy():
+    original_policy = asyncio.get_event_loop_policy()
+    try:
+        if os.name == "nt" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
+            asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
+
+        yield
+    finally:
+        asyncio.set_event_loop_policy(original_policy)
+
+
+def get_loop():
+    """Create or return the default fsspec IO loop
+
+    The loop will be running on a separate thread.
+    """
+    if loop[0] is None:
+        with get_lock():
+            # repeat the check just in case the loop got filled between the
+            # previous two calls from another thread
+            if loop[0] is None:
+                with _selector_policy():
+                    loop[0] = asyncio.new_event_loop()
+                th = threading.Thread(target=loop[0].run_forever, name="fsspecIO")
+                th.daemon = True
+                th.start()
+                iothread[0] = th
+    return loop[0]
+
+
+if TYPE_CHECKING:
+    import resource
+
+    ResourceError = resource.error
+else:
+    try:
+        import resource
+    except ImportError:
+        resource = None
+        ResourceError = OSError
+    else:
+        ResourceError = getattr(resource, "error", OSError)
+
+_DEFAULT_BATCH_SIZE = 128
+_NOFILES_DEFAULT_BATCH_SIZE = 1280
+
+
+def _get_batch_size(nofiles=False):
+    from fsspec.config import conf
+
+    if nofiles:
+        if "nofiles_gather_batch_size" in conf:
+            return conf["nofiles_gather_batch_size"]
+    else:
+        if "gather_batch_size" in conf:
+            return conf["gather_batch_size"]
+    if nofiles:
+        return _NOFILES_DEFAULT_BATCH_SIZE
+    if resource is None:
+        return _DEFAULT_BATCH_SIZE
+
+    try:
+        soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE)
+    except (ImportError, ValueError, ResourceError):
+        return _DEFAULT_BATCH_SIZE
+
+    if soft_limit == resource.RLIM_INFINITY:
+        return -1
+    else:
+        return soft_limit // 8
+
+
+def running_async() -> bool:
+    """Being executed by an event loop?"""
+    try:
+        asyncio.get_running_loop()
+        return True
+    except RuntimeError:
+        return False
+
+
+async def _run_coros_in_chunks(
+    coros,
+    batch_size=None,
+    callback=DEFAULT_CALLBACK,
+    timeout=None,
+    return_exceptions=False,
+    nofiles=False,
+):
+    """Run the given coroutines in  chunks.
+
+    Parameters
+    ----------
+    coros: list of coroutines to run
+    batch_size: int or None
+        Number of coroutines to submit/wait on simultaneously.
+        If -1, then it will not be any throttling. If
+        None, it will be inferred from _get_batch_size()
+    callback: fsspec.callbacks.Callback instance
+        Gets a relative_update when each coroutine completes
+    timeout: number or None
+        If given, each coroutine times out after this time. Note that, since
+        there are multiple batches, the total run time of this function will in
+        general be longer
+    return_exceptions: bool
+        Same meaning as in asyncio.gather
+    nofiles: bool
+        If inferring the batch_size, does this operation involve local files?
+        If yes, you normally expect smaller batches.
+    """
+
+    if batch_size is None:
+        batch_size = _get_batch_size(nofiles=nofiles)
+
+    if batch_size == -1:
+        batch_size = len(coros)
+
+    assert batch_size > 0
+
+    async def _run_coro(coro, i):
+        try:
+            return await asyncio.wait_for(coro, timeout=timeout), i
+        except Exception as e:
+            if not return_exceptions:
+                raise
+            return e, i
+        finally:
+            callback.relative_update(1)
+
+    i = 0
+    n = len(coros)
+    results = [None] * n
+    pending = set()
+
+    while pending or i < n:
+        while len(pending) < batch_size and i < n:
+            pending.add(asyncio.ensure_future(_run_coro(coros[i], i)))
+            i += 1
+
+        if not pending:
+            break
+
+        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
+        while done:
+            result, k = await done.pop()
+            results[k] = result
+
+    return results
+
+
+# these methods should be implemented as async by any async-able backend
+async_methods = [
+    "_ls",
+    "_cat_file",
+    "_get_file",
+    "_put_file",
+    "_rm_file",
+    "_cp_file",
+    "_pipe_file",
+    "_expand_path",
+    "_info",
+    "_isfile",
+    "_isdir",
+    "_exists",
+    "_walk",
+    "_glob",
+    "_find",
+    "_du",
+    "_size",
+    "_mkdir",
+    "_makedirs",
+]
+
+
+class AsyncFileSystem(AbstractFileSystem):
+    """Async file operations, default implementations
+
+    Passes bulk operations to asyncio.gather for concurrent operation.
+
+    Implementations that have concurrent batch operations and/or async methods
+    should inherit from this class instead of AbstractFileSystem. Docstrings are
+    copied from the un-underscored method in AbstractFileSystem, if not given.
+    """
+
+    # note that methods do not have docstring here; they will be copied
+    # for _* methods and inferred for overridden methods.
+
+    async_impl = True
+    mirror_sync_methods = True
+    disable_throttling = False
+
+    def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs):
+        self.asynchronous = asynchronous
+        self._pid = os.getpid()
+        if not asynchronous:
+            self._loop = loop or get_loop()
+        else:
+            self._loop = None
+        self.batch_size = batch_size
+        super().__init__(*args, **kwargs)
+
+    @property
+    def loop(self):
+        if self._pid != os.getpid():
+            raise RuntimeError("This class is not fork-safe")
+        return self._loop
+
+    async def _rm_file(self, path, **kwargs):
+        raise NotImplementedError
+
+    async def _rm(self, path, recursive=False, batch_size=None, **kwargs):
+        # TODO: implement on_error
+        batch_size = batch_size or self.batch_size
+        path = await self._expand_path(path, recursive=recursive)
+        return await _run_coros_in_chunks(
+            [self._rm_file(p, **kwargs) for p in reversed(path)],
+            batch_size=batch_size,
+            nofiles=True,
+        )
+
+    async def _cp_file(self, path1, path2, **kwargs):
+        raise NotImplementedError
+
+    async def _mv_file(self, path1, path2):
+        await self._cp_file(path1, path2)
+        await self._rm_file(path1)
+
+    async def _copy(
+        self,
+        path1,
+        path2,
+        recursive=False,
+        on_error=None,
+        maxdepth=None,
+        batch_size=None,
+        **kwargs,
+    ):
+        if on_error is None and recursive:
+            on_error = "ignore"
+        elif on_error is None:
+            on_error = "raise"
+
+        if isinstance(path1, list) and isinstance(path2, list):
+            # No need to expand paths when both source and destination
+            # are provided as lists
+            paths1 = path1
+            paths2 = path2
+        else:
+            source_is_str = isinstance(path1, str)
+            paths1 = await self._expand_path(
+                path1, maxdepth=maxdepth, recursive=recursive
+            )
+            if source_is_str and (not recursive or maxdepth is not None):
+                # Non-recursive glob does not copy directories
+                paths1 = [
+                    p for p in paths1 if not (trailing_sep(p) or await self._isdir(p))
+                ]
+                if not paths1:
+                    return
+
+            source_is_file = len(paths1) == 1
+            dest_is_dir = isinstance(path2, str) and (
+                trailing_sep(path2) or await self._isdir(path2)
+            )
+
+            exists = source_is_str and (
+                (has_magic(path1) and source_is_file)
+                or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
+            )
+            paths2 = other_paths(
+                paths1,
+                path2,
+                exists=exists,
+                flatten=not source_is_str,
+            )
+
+        batch_size = batch_size or self.batch_size
+        coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)]
+        result = await _run_coros_in_chunks(
+            coros, batch_size=batch_size, return_exceptions=True, nofiles=True
+        )
+
+        for ex in filter(is_exception, result):
+            if on_error == "ignore" and isinstance(ex, FileNotFoundError):
+                continue
+            raise ex
+
+    async def _pipe_file(self, path, value, mode="overwrite", **kwargs):
+        raise NotImplementedError
+
+    async def _pipe(self, path, value=None, batch_size=None, **kwargs):
+        if isinstance(path, str):
+            path = {path: value}
+        batch_size = batch_size or self.batch_size
+        return await _run_coros_in_chunks(
+            [self._pipe_file(k, v, **kwargs) for k, v in path.items()],
+            batch_size=batch_size,
+            nofiles=True,
+        )
+
+    async def _process_limits(self, url, start, end):
+        """Helper for "Range"-based _cat_file"""
+        size = None
+        suff = False
+        if start is not None and start < 0:
+            # if start is negative and end None, end is the "suffix length"
+            if end is None:
+                end = -start
+                start = ""
+                suff = True
+            else:
+                size = size or (await self._info(url))["size"]
+                start = size + start
+        elif start is None:
+            start = 0
+        if not suff:
+            if end is not None and end < 0:
+                if start is not None:
+                    size = size or (await self._info(url))["size"]
+                    end = size + end
+            elif end is None:
+                end = ""
+            if isinstance(end, numbers.Integral):
+                end -= 1  # bytes range is inclusive
+        return f"bytes={start}-{end}"
+
+    async def _cat_file(self, path, start=None, end=None, **kwargs):
+        raise NotImplementedError
+
+    async def _cat(
+        self, path, recursive=False, on_error="raise", batch_size=None, **kwargs
+    ):
+        paths = await self._expand_path(path, recursive=recursive)
+        coros = [self._cat_file(path, **kwargs) for path in paths]
+        batch_size = batch_size or self.batch_size
+        out = await _run_coros_in_chunks(
+            coros, batch_size=batch_size, nofiles=True, return_exceptions=True
+        )
+        if on_error == "raise":
+            ex = next(filter(is_exception, out), False)
+            if ex:
+                raise ex
+        if (
+            len(paths) > 1
+            or isinstance(path, list)
+            or paths[0] != self._strip_protocol(path)
+        ):
+            return {
+                k: v
+                for k, v in zip(paths, out)
+                if on_error != "omit" or not is_exception(v)
+            }
+        else:
+            return out[0]
+
+    async def _cat_ranges(
+        self,
+        paths,
+        starts,
+        ends,
+        max_gap=None,
+        batch_size=None,
+        on_error="return",
+        **kwargs,
+    ):
+        """Get the contents of byte ranges from one or more files
+
+        Parameters
+        ----------
+        paths: list
+            A list of of filepaths on this filesystems
+        starts, ends: int or list
+            Bytes limits of the read. If using a single int, the same value will be
+            used to read all the specified files.
+        """
+        # TODO: on_error
+        if max_gap is not None:
+            # use utils.merge_offset_ranges
+            raise NotImplementedError
+        if not isinstance(paths, list):
+            raise TypeError
+        if not isinstance(starts, Iterable):
+            starts = [starts] * len(paths)
+        if not isinstance(ends, Iterable):
+            ends = [ends] * len(paths)
+        if len(starts) != len(paths) or len(ends) != len(paths):
+            raise ValueError
+        coros = [
+            self._cat_file(p, start=s, end=e, **kwargs)
+            for p, s, e in zip(paths, starts, ends)
+        ]
+        batch_size = batch_size or self.batch_size
+        return await _run_coros_in_chunks(
+            coros, batch_size=batch_size, nofiles=True, return_exceptions=True
+        )
+
+    async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs):
+        raise NotImplementedError
+
+    async def _put(
+        self,
+        lpath,
+        rpath,
+        recursive=False,
+        callback=DEFAULT_CALLBACK,
+        batch_size=None,
+        maxdepth=None,
+        **kwargs,
+    ):
+        """Copy file(s) from local.
+
+        Copies a specific file or tree of files (if recursive=True). If rpath
+        ends with a "/", it will be assumed to be a directory, and target files
+        will go within.
+
+        The put_file method will be called concurrently on a batch of files. The
+        batch_size option can configure the amount of futures that can be executed
+        at the same time. If it is -1, then all the files will be uploaded concurrently.
+        The default can be set for this instance by passing "batch_size" in the
+        constructor, or for all instances by setting the "gather_batch_size" key
+        in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
+        """
+        if isinstance(lpath, list) and isinstance(rpath, list):
+            # No need to expand paths when both source and destination
+            # are provided as lists
+            rpaths = rpath
+            lpaths = lpath
+        else:
+            source_is_str = isinstance(lpath, str)
+            if source_is_str:
+                lpath = make_path_posix(lpath)
+            fs = LocalFileSystem()
+            lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
+            if source_is_str and (not recursive or maxdepth is not None):
+                # Non-recursive glob does not copy directories
+                lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
+                if not lpaths:
+                    return
+
+            source_is_file = len(lpaths) == 1
+            dest_is_dir = isinstance(rpath, str) and (
+                trailing_sep(rpath) or await self._isdir(rpath)
+            )
+
+            rpath = self._strip_protocol(rpath)
+            exists = source_is_str and (
+                (has_magic(lpath) and source_is_file)
+                or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
+            )
+            rpaths = other_paths(
+                lpaths,
+                rpath,
+                exists=exists,
+                flatten=not source_is_str,
+            )
+
+        is_dir = {l: os.path.isdir(l) for l in lpaths}
+        rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]]
+        file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]]
+
+        await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs])
+        batch_size = batch_size or self.batch_size
+
+        coros = []
+        callback.set_size(len(file_pairs))
+        for lfile, rfile in file_pairs:
+            put_file = callback.branch_coro(self._put_file)
+            coros.append(put_file(lfile, rfile, **kwargs))
+
+        return await _run_coros_in_chunks(
+            coros, batch_size=batch_size, callback=callback
+        )
+
+    async def _get_file(self, rpath, lpath, **kwargs):
+        raise NotImplementedError
+
+    async def _get(
+        self,
+        rpath,
+        lpath,
+        recursive=False,
+        callback=DEFAULT_CALLBACK,
+        maxdepth=None,
+        **kwargs,
+    ):
+        """Copy file(s) to local.
+
+        Copies a specific file or tree of files (if recursive=True). If lpath
+        ends with a "/", it will be assumed to be a directory, and target files
+        will go within. Can submit a list of paths, which may be glob-patterns
+        and will be expanded.
+
+        The get_file method will be called concurrently on a batch of files. The
+        batch_size option can configure the amount of futures that can be executed
+        at the same time. If it is -1, then all the files will be uploaded concurrently.
+        The default can be set for this instance by passing "batch_size" in the
+        constructor, or for all instances by setting the "gather_batch_size" key
+        in ``fsspec.config.conf``, falling back to 1/8th of the system limit .
+        """
+        if isinstance(lpath, list) and isinstance(rpath, list):
+            # No need to expand paths when both source and destination
+            # are provided as lists
+            rpaths = rpath
+            lpaths = lpath
+        else:
+            source_is_str = isinstance(rpath, str)
+            # First check for rpath trailing slash as _strip_protocol removes it.
+            source_not_trailing_sep = source_is_str and not trailing_sep(rpath)
+            rpath = self._strip_protocol(rpath)
+            rpaths = await self._expand_path(
+                rpath, recursive=recursive, maxdepth=maxdepth
+            )
+            if source_is_str and (not recursive or maxdepth is not None):
+                # Non-recursive glob does not copy directories
+                rpaths = [
+                    p for p in rpaths if not (trailing_sep(p) or await self._isdir(p))
+                ]
+                if not rpaths:
+                    return
+
+            lpath = make_path_posix(lpath)
+            source_is_file = len(rpaths) == 1
+            dest_is_dir = isinstance(lpath, str) and (
+                trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
+            )
+
+            exists = source_is_str and (
+                (has_magic(rpath) and source_is_file)
+                or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep)
+            )
+            lpaths = other_paths(
+                rpaths,
+                lpath,
+                exists=exists,
+                flatten=not source_is_str,
+            )
+
+        [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
+        batch_size = kwargs.pop("batch_size", self.batch_size)
+
+        coros = []
+        callback.set_size(len(lpaths))
+        for lpath, rpath in zip(lpaths, rpaths):
+            get_file = callback.branch_coro(self._get_file)
+            coros.append(get_file(rpath, lpath, **kwargs))
+        return await _run_coros_in_chunks(
+            coros, batch_size=batch_size, callback=callback
+        )
+
+    async def _isfile(self, path):
+        try:
+            return (await self._info(path))["type"] == "file"
+        except:  # noqa: E722
+            return False
+
+    async def _isdir(self, path):
+        try:
+            return (await self._info(path))["type"] == "directory"
+        except OSError:
+            return False
+
+    async def _size(self, path):
+        return (await self._info(path)).get("size", None)
+
+    async def _sizes(self, paths, batch_size=None):
+        batch_size = batch_size or self.batch_size
+        return await _run_coros_in_chunks(
+            [self._size(p) for p in paths], batch_size=batch_size
+        )
+
+    async def _exists(self, path, **kwargs):
+        try:
+            await self._info(path, **kwargs)
+            return True
+        except FileNotFoundError:
+            return False
+
+    async def _info(self, path, **kwargs):
+        raise NotImplementedError
+
+    async def _ls(self, path, detail=True, **kwargs):
+        raise NotImplementedError
+
+    async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs):
+        if maxdepth is not None and maxdepth < 1:
+            raise ValueError("maxdepth must be at least 1")
+
+        path = self._strip_protocol(path)
+        full_dirs = {}
+        dirs = {}
+        files = {}
+
+        detail = kwargs.pop("detail", False)
+        try:
+            listing = await self._ls(path, detail=True, **kwargs)
+        except (FileNotFoundError, OSError) as e:
+            if on_error == "raise":
+                raise
+            elif callable(on_error):
+                on_error(e)
+            if detail:
+                yield path, {}, {}
+            else:
+                yield path, [], []
+            return
+
+        for info in listing:
+            # each info name must be at least [path]/part , but here
+            # we check also for names like [path]/part/
+            pathname = info["name"].rstrip("/")
+            name = pathname.rsplit("/", 1)[-1]
+            if info["type"] == "directory" and pathname != path:
+                # do not include "self" path
+                full_dirs[name] = pathname
+                dirs[name] = info
+            elif pathname == path:
+                # file-like with same name as give path
+                files[""] = info
+            else:
+                files[name] = info
+
+        if detail:
+            yield path, dirs, files
+        else:
+            yield path, list(dirs), list(files)
+
+        if maxdepth is not None:
+            maxdepth -= 1
+            if maxdepth < 1:
+                return
+
+        for d in dirs:
+            async for _ in self._walk(
+                full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs
+            ):
+                yield _
+
+    async def _glob(self, path, maxdepth=None, **kwargs):
+        if maxdepth is not None and maxdepth < 1:
+            raise ValueError("maxdepth must be at least 1")
+
+        import re
+
+        seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
+        ends_with_sep = path.endswith(seps)  # _strip_protocol strips trailing slash
+        path = self._strip_protocol(path)
+        append_slash_to_dirname = ends_with_sep or path.endswith(
+            tuple(sep + "**" for sep in seps)
+        )
+        idx_star = path.find("*") if path.find("*") >= 0 else len(path)
+        idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
+        idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
+
+        min_idx = min(idx_star, idx_qmark, idx_brace)
+
+        detail = kwargs.pop("detail", False)
+
+        if not has_magic(path):
+            if await self._exists(path, **kwargs):
+                if not detail:
+                    return [path]
+                else:
+                    return {path: await self._info(path, **kwargs)}
+            else:
+                if not detail:
+                    return []  # glob of non-existent returns empty
+                else:
+                    return {}
+        elif "/" in path[:min_idx]:
+            min_idx = path[:min_idx].rindex("/")
+            root = path[: min_idx + 1]
+            depth = path[min_idx + 1 :].count("/") + 1
+        else:
+            root = ""
+            depth = path[min_idx + 1 :].count("/") + 1
+
+        if "**" in path:
+            if maxdepth is not None:
+                idx_double_stars = path.find("**")
+                depth_double_stars = path[idx_double_stars:].count("/") + 1
+                depth = depth - depth_double_stars + maxdepth
+            else:
+                depth = None
+
+        allpaths = await self._find(
+            root, maxdepth=depth, withdirs=True, detail=True, **kwargs
+        )
+
+        pattern = glob_translate(path + ("/" if ends_with_sep else ""))
+        pattern = re.compile(pattern)
+
+        out = {
+            p: info
+            for p, info in sorted(allpaths.items())
+            if pattern.match(
+                p + "/"
+                if append_slash_to_dirname and info["type"] == "directory"
+                else p
+            )
+        }
+
+        if detail:
+            return out
+        else:
+            return list(out)
+
+    async def _du(self, path, total=True, maxdepth=None, **kwargs):
+        sizes = {}
+        # async for?
+        for f in await self._find(path, maxdepth=maxdepth, **kwargs):
+            info = await self._info(f)
+            sizes[info["name"]] = info["size"]
+        if total:
+            return sum(sizes.values())
+        else:
+            return sizes
+
+    async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
+        path = self._strip_protocol(path)
+        out = {}
+        detail = kwargs.pop("detail", False)
+
+        # Add the root directory if withdirs is requested
+        # This is needed for posix glob compliance
+        if withdirs and path != "" and await self._isdir(path):
+            out[path] = await self._info(path)
+
+        # async for?
+        async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
+            if withdirs:
+                files.update(dirs)
+            out.update({info["name"]: info for name, info in files.items()})
+        if not out and (await self._isfile(path)):
+            # walk works on directories, but find should also return [path]
+            # when path happens to be a file
+            out[path] = {}
+        names = sorted(out)
+        if not detail:
+            return names
+        else:
+            return {name: out[name] for name in names}
+
+    async def _expand_path(self, path, recursive=False, maxdepth=None):
+        if maxdepth is not None and maxdepth < 1:
+            raise ValueError("maxdepth must be at least 1")
+
+        if isinstance(path, str):
+            out = await self._expand_path([path], recursive, maxdepth)
+        else:
+            out = set()
+            path = [self._strip_protocol(p) for p in path]
+            for p in path:  # can gather here
+                if has_magic(p):
+                    bit = set(await self._glob(p, maxdepth=maxdepth))
+                    out |= bit
+                    if recursive:
+                        # glob call above expanded one depth so if maxdepth is defined
+                        # then decrement it in expand_path call below. If it is zero
+                        # after decrementing then avoid expand_path call.
+                        if maxdepth is not None and maxdepth <= 1:
+                            continue
+                        out |= set(
+                            await self._expand_path(
+                                list(bit),
+                                recursive=recursive,
+                                maxdepth=maxdepth - 1 if maxdepth is not None else None,
+                            )
+                        )
+                    continue
+                elif recursive:
+                    rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
+                    out |= rec
+                if p not in out and (recursive is False or (await self._exists(p))):
+                    # should only check once, for the root
+                    out.add(p)
+        if not out:
+            raise FileNotFoundError(path)
+        return sorted(out)
+
+    async def _mkdir(self, path, create_parents=True, **kwargs):
+        pass  # not necessary to implement, may not have directories
+
+    async def _makedirs(self, path, exist_ok=False):
+        pass  # not necessary to implement, may not have directories
+
+    async def open_async(self, path, mode="rb", **kwargs):
+        if "b" not in mode or kwargs.get("compression"):
+            raise ValueError
+        raise NotImplementedError
+
+
+def mirror_sync_methods(obj):
+    """Populate sync and async methods for obj
+
+    For each method will create a sync version if the name refers to an async method
+    (coroutine) and there is no override in the child class; will create an async
+    method for the corresponding sync method if there is no implementation.
+
+    Uses the methods specified in
+    - async_methods: the set that an implementation is expected to provide
+    - default_async_methods: that can be derived from their sync version in
+      AbstractFileSystem
+    - AsyncFileSystem: async-specific default coroutines
+    """
+    from fsspec import AbstractFileSystem
+
+    for method in async_methods + dir(AsyncFileSystem):
+        if not method.startswith("_"):
+            continue
+        smethod = method[1:]
+        if private.match(method):
+            isco = inspect.iscoroutinefunction(getattr(obj, method, None))
+            unsync = getattr(getattr(obj, smethod, False), "__func__", None)
+            is_default = unsync is getattr(AbstractFileSystem, smethod, "")
+            if isco and is_default:
+                mth = sync_wrapper(getattr(obj, method), obj=obj)
+                setattr(obj, smethod, mth)
+                if not mth.__doc__:
+                    mth.__doc__ = getattr(
+                        getattr(AbstractFileSystem, smethod, None), "__doc__", ""
+                    )
+
+
+class FSSpecCoroutineCancel(Exception):
+    pass
+
+
+def _dump_running_tasks(
+    printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False
+):
+    import traceback
+
+    tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()]
+    if printout:
+        [task.print_stack() for task in tasks]
+    out = [
+        {
+            "locals": task._coro.cr_frame.f_locals,
+            "file": task._coro.cr_frame.f_code.co_filename,
+            "firstline": task._coro.cr_frame.f_code.co_firstlineno,
+            "linelo": task._coro.cr_frame.f_lineno,
+            "stack": traceback.format_stack(task._coro.cr_frame),
+            "task": task if with_task else None,
+        }
+        for task in tasks
+    ]
+    if cancel:
+        for t in tasks:
+            cbs = t._callbacks
+            t.cancel()
+            asyncio.futures.Future.set_exception(t, exc)
+            asyncio.futures.Future.cancel(t)
+            [cb[0](t) for cb in cbs]  # cancels any dependent concurrent.futures
+            try:
+                t._coro.throw(exc)  # exits coro, unless explicitly handled
+            except exc:
+                pass
+    return out
+
+
+class AbstractAsyncStreamedFile(AbstractBufferedFile):
+    # no read buffering, and always auto-commit
+    # TODO: readahead might still be useful here, but needs async version
+
+    async def read(self, length=-1):
+        """
+        Return data from cache, or fetch pieces as necessary
+
+        Parameters
+        ----------
+        length: int (-1)
+            Number of bytes to read; if <0, all remaining bytes.
+        """
+        length = -1 if length is None else int(length)
+        if self.mode != "rb":
+            raise ValueError("File not in read mode")
+        if length < 0:
+            length = self.size - self.loc
+        if self.closed:
+            raise ValueError("I/O operation on closed file.")
+        if length == 0:
+            # don't even bother calling fetch
+            return b""
+        out = await self._fetch_range(self.loc, self.loc + length)
+        self.loc += len(out)
+        return out
+
+    async def write(self, data):
+        """
+        Write data to buffer.
+
+        Buffer only sent on flush() or if buffer is greater than
+        or equal to blocksize.
+
+        Parameters
+        ----------
+        data: bytes
+            Set of bytes to be written.
+        """
+        if self.mode not in {"wb", "ab"}:
+            raise ValueError("File not in write mode")
+        if self.closed:
+            raise ValueError("I/O operation on closed file.")
+        if self.forced:
+            raise ValueError("This file has been force-flushed, can only close")
+        out = self.buffer.write(data)
+        self.loc += out
+        if self.buffer.tell() >= self.blocksize:
+            await self.flush()
+        return out
+
+    async def close(self):
+        """Close file
+
+        Finalizes writes, discards cache
+        """
+        if getattr(self, "_unclosable", False):
+            return
+        if self.closed:
+            return
+        if self.mode == "rb":
+            self.cache = None
+        else:
+            if not self.forced:
+                await self.flush(force=True)
+
+            if self.fs is not None:
+                self.fs.invalidate_cache(self.path)
+                self.fs.invalidate_cache(self.fs._parent(self.path))
+
+        self.closed = True
+
+    async def flush(self, force=False):
+        if self.closed:
+            raise ValueError("Flush on closed file")
+        if force and self.forced:
+            raise ValueError("Force flush cannot be called more than once")
+        if force:
+            self.forced = True
+
+        if self.mode not in {"wb", "ab"}:
+            # no-op to flush on read-mode
+            return
+
+        if not force and self.buffer.tell() < self.blocksize:
+            # Defer write on small block
+            return
+
+        if self.offset is None:
+            # Initialize a multipart upload
+            self.offset = 0
+            try:
+                await self._initiate_upload()
+            except:
+                self.closed = True
+                raise
+
+        if await self._upload_chunk(final=force) is not False:
+            self.offset += self.buffer.seek(0, 2)
+            self.buffer = io.BytesIO()
+
+    async def __aenter__(self):
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        await self.close()
+
+    async def _fetch_range(self, start, end):
+        raise NotImplementedError
+
+    async def _initiate_upload(self):
+        pass
+
+    async def _upload_chunk(self, final=False):
+        raise NotImplementedError