diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/asyn.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/fsspec/asyn.py | 1098 |
1 files changed, 1098 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/asyn.py b/.venv/lib/python3.12/site-packages/fsspec/asyn.py new file mode 100644 index 00000000..adeb88fb --- /dev/null +++ b/.venv/lib/python3.12/site-packages/fsspec/asyn.py @@ -0,0 +1,1098 @@ +import asyncio +import asyncio.events +import functools +import inspect +import io +import numbers +import os +import re +import threading +from contextlib import contextmanager +from glob import has_magic +from typing import TYPE_CHECKING, Iterable + +from .callbacks import DEFAULT_CALLBACK +from .exceptions import FSTimeoutError +from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep +from .spec import AbstractBufferedFile, AbstractFileSystem +from .utils import glob_translate, is_exception, other_paths + +private = re.compile("_[^_]") +iothread = [None] # dedicated fsspec IO thread +loop = [None] # global event loop for any non-async instance +_lock = None # global lock placeholder +get_running_loop = asyncio.get_running_loop + + +def get_lock(): + """Allocate or return a threading lock. + + The lock is allocated on first use to allow setting one lock per forked process. + """ + global _lock + if not _lock: + _lock = threading.Lock() + return _lock + + +def reset_lock(): + """Reset the global lock. + + This should be called only on the init of a forked process to reset the lock to + None, enabling the new forked process to get a new lock. + """ + global _lock + + iothread[0] = None + loop[0] = None + _lock = None + + +async def _runner(event, coro, result, timeout=None): + timeout = timeout if timeout else None # convert 0 or 0.0 to None + if timeout is not None: + coro = asyncio.wait_for(coro, timeout=timeout) + try: + result[0] = await coro + except Exception as ex: + result[0] = ex + finally: + event.set() + + +def sync(loop, func, *args, timeout=None, **kwargs): + """ + Make loop run coroutine until it returns. Runs in other thread + + Examples + -------- + >>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args, + timeout=timeout, **kwargs) + """ + timeout = timeout if timeout else None # convert 0 or 0.0 to None + # NB: if the loop is not running *yet*, it is OK to submit work + # and we will wait for it + if loop is None or loop.is_closed(): + raise RuntimeError("Loop is not running") + try: + loop0 = asyncio.events.get_running_loop() + if loop0 is loop: + raise NotImplementedError("Calling sync() from within a running loop") + except NotImplementedError: + raise + except RuntimeError: + pass + coro = func(*args, **kwargs) + result = [None] + event = threading.Event() + asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) + while True: + # this loops allows thread to get interrupted + if event.wait(1): + break + if timeout is not None: + timeout -= 1 + if timeout < 0: + raise FSTimeoutError + + return_result = result[0] + if isinstance(return_result, asyncio.TimeoutError): + # suppress asyncio.TimeoutError, raise FSTimeoutError + raise FSTimeoutError from return_result + elif isinstance(return_result, BaseException): + raise return_result + else: + return return_result + + +def sync_wrapper(func, obj=None): + """Given a function, make so can be called in blocking contexts + + Leave obj=None if defining within a class. Pass the instance if attaching + as an attribute of the instance. + """ + + @functools.wraps(func) + def wrapper(*args, **kwargs): + self = obj or args[0] + return sync(self.loop, func, *args, **kwargs) + + return wrapper + + +@contextmanager +def _selector_policy(): + original_policy = asyncio.get_event_loop_policy() + try: + if os.name == "nt" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + + yield + finally: + asyncio.set_event_loop_policy(original_policy) + + +def get_loop(): + """Create or return the default fsspec IO loop + + The loop will be running on a separate thread. + """ + if loop[0] is None: + with get_lock(): + # repeat the check just in case the loop got filled between the + # previous two calls from another thread + if loop[0] is None: + with _selector_policy(): + loop[0] = asyncio.new_event_loop() + th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") + th.daemon = True + th.start() + iothread[0] = th + return loop[0] + + +if TYPE_CHECKING: + import resource + + ResourceError = resource.error +else: + try: + import resource + except ImportError: + resource = None + ResourceError = OSError + else: + ResourceError = getattr(resource, "error", OSError) + +_DEFAULT_BATCH_SIZE = 128 +_NOFILES_DEFAULT_BATCH_SIZE = 1280 + + +def _get_batch_size(nofiles=False): + from fsspec.config import conf + + if nofiles: + if "nofiles_gather_batch_size" in conf: + return conf["nofiles_gather_batch_size"] + else: + if "gather_batch_size" in conf: + return conf["gather_batch_size"] + if nofiles: + return _NOFILES_DEFAULT_BATCH_SIZE + if resource is None: + return _DEFAULT_BATCH_SIZE + + try: + soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) + except (ImportError, ValueError, ResourceError): + return _DEFAULT_BATCH_SIZE + + if soft_limit == resource.RLIM_INFINITY: + return -1 + else: + return soft_limit // 8 + + +def running_async() -> bool: + """Being executed by an event loop?""" + try: + asyncio.get_running_loop() + return True + except RuntimeError: + return False + + +async def _run_coros_in_chunks( + coros, + batch_size=None, + callback=DEFAULT_CALLBACK, + timeout=None, + return_exceptions=False, + nofiles=False, +): + """Run the given coroutines in chunks. + + Parameters + ---------- + coros: list of coroutines to run + batch_size: int or None + Number of coroutines to submit/wait on simultaneously. + If -1, then it will not be any throttling. If + None, it will be inferred from _get_batch_size() + callback: fsspec.callbacks.Callback instance + Gets a relative_update when each coroutine completes + timeout: number or None + If given, each coroutine times out after this time. Note that, since + there are multiple batches, the total run time of this function will in + general be longer + return_exceptions: bool + Same meaning as in asyncio.gather + nofiles: bool + If inferring the batch_size, does this operation involve local files? + If yes, you normally expect smaller batches. + """ + + if batch_size is None: + batch_size = _get_batch_size(nofiles=nofiles) + + if batch_size == -1: + batch_size = len(coros) + + assert batch_size > 0 + + async def _run_coro(coro, i): + try: + return await asyncio.wait_for(coro, timeout=timeout), i + except Exception as e: + if not return_exceptions: + raise + return e, i + finally: + callback.relative_update(1) + + i = 0 + n = len(coros) + results = [None] * n + pending = set() + + while pending or i < n: + while len(pending) < batch_size and i < n: + pending.add(asyncio.ensure_future(_run_coro(coros[i], i))) + i += 1 + + if not pending: + break + + done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) + while done: + result, k = await done.pop() + results[k] = result + + return results + + +# these methods should be implemented as async by any async-able backend +async_methods = [ + "_ls", + "_cat_file", + "_get_file", + "_put_file", + "_rm_file", + "_cp_file", + "_pipe_file", + "_expand_path", + "_info", + "_isfile", + "_isdir", + "_exists", + "_walk", + "_glob", + "_find", + "_du", + "_size", + "_mkdir", + "_makedirs", +] + + +class AsyncFileSystem(AbstractFileSystem): + """Async file operations, default implementations + + Passes bulk operations to asyncio.gather for concurrent operation. + + Implementations that have concurrent batch operations and/or async methods + should inherit from this class instead of AbstractFileSystem. Docstrings are + copied from the un-underscored method in AbstractFileSystem, if not given. + """ + + # note that methods do not have docstring here; they will be copied + # for _* methods and inferred for overridden methods. + + async_impl = True + mirror_sync_methods = True + disable_throttling = False + + def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): + self.asynchronous = asynchronous + self._pid = os.getpid() + if not asynchronous: + self._loop = loop or get_loop() + else: + self._loop = None + self.batch_size = batch_size + super().__init__(*args, **kwargs) + + @property + def loop(self): + if self._pid != os.getpid(): + raise RuntimeError("This class is not fork-safe") + return self._loop + + async def _rm_file(self, path, **kwargs): + raise NotImplementedError + + async def _rm(self, path, recursive=False, batch_size=None, **kwargs): + # TODO: implement on_error + batch_size = batch_size or self.batch_size + path = await self._expand_path(path, recursive=recursive) + return await _run_coros_in_chunks( + [self._rm_file(p, **kwargs) for p in reversed(path)], + batch_size=batch_size, + nofiles=True, + ) + + async def _cp_file(self, path1, path2, **kwargs): + raise NotImplementedError + + async def _mv_file(self, path1, path2): + await self._cp_file(path1, path2) + await self._rm_file(path1) + + async def _copy( + self, + path1, + path2, + recursive=False, + on_error=None, + maxdepth=None, + batch_size=None, + **kwargs, + ): + if on_error is None and recursive: + on_error = "ignore" + elif on_error is None: + on_error = "raise" + + if isinstance(path1, list) and isinstance(path2, list): + # No need to expand paths when both source and destination + # are provided as lists + paths1 = path1 + paths2 = path2 + else: + source_is_str = isinstance(path1, str) + paths1 = await self._expand_path( + path1, maxdepth=maxdepth, recursive=recursive + ) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not copy directories + paths1 = [ + p for p in paths1 if not (trailing_sep(p) or await self._isdir(p)) + ] + if not paths1: + return + + source_is_file = len(paths1) == 1 + dest_is_dir = isinstance(path2, str) and ( + trailing_sep(path2) or await self._isdir(path2) + ) + + exists = source_is_str and ( + (has_magic(path1) and source_is_file) + or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) + ) + paths2 = other_paths( + paths1, + path2, + exists=exists, + flatten=not source_is_str, + ) + + batch_size = batch_size or self.batch_size + coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)] + result = await _run_coros_in_chunks( + coros, batch_size=batch_size, return_exceptions=True, nofiles=True + ) + + for ex in filter(is_exception, result): + if on_error == "ignore" and isinstance(ex, FileNotFoundError): + continue + raise ex + + async def _pipe_file(self, path, value, mode="overwrite", **kwargs): + raise NotImplementedError + + async def _pipe(self, path, value=None, batch_size=None, **kwargs): + if isinstance(path, str): + path = {path: value} + batch_size = batch_size or self.batch_size + return await _run_coros_in_chunks( + [self._pipe_file(k, v, **kwargs) for k, v in path.items()], + batch_size=batch_size, + nofiles=True, + ) + + async def _process_limits(self, url, start, end): + """Helper for "Range"-based _cat_file""" + size = None + suff = False + if start is not None and start < 0: + # if start is negative and end None, end is the "suffix length" + if end is None: + end = -start + start = "" + suff = True + else: + size = size or (await self._info(url))["size"] + start = size + start + elif start is None: + start = 0 + if not suff: + if end is not None and end < 0: + if start is not None: + size = size or (await self._info(url))["size"] + end = size + end + elif end is None: + end = "" + if isinstance(end, numbers.Integral): + end -= 1 # bytes range is inclusive + return f"bytes={start}-{end}" + + async def _cat_file(self, path, start=None, end=None, **kwargs): + raise NotImplementedError + + async def _cat( + self, path, recursive=False, on_error="raise", batch_size=None, **kwargs + ): + paths = await self._expand_path(path, recursive=recursive) + coros = [self._cat_file(path, **kwargs) for path in paths] + batch_size = batch_size or self.batch_size + out = await _run_coros_in_chunks( + coros, batch_size=batch_size, nofiles=True, return_exceptions=True + ) + if on_error == "raise": + ex = next(filter(is_exception, out), False) + if ex: + raise ex + if ( + len(paths) > 1 + or isinstance(path, list) + or paths[0] != self._strip_protocol(path) + ): + return { + k: v + for k, v in zip(paths, out) + if on_error != "omit" or not is_exception(v) + } + else: + return out[0] + + async def _cat_ranges( + self, + paths, + starts, + ends, + max_gap=None, + batch_size=None, + on_error="return", + **kwargs, + ): + """Get the contents of byte ranges from one or more files + + Parameters + ---------- + paths: list + A list of of filepaths on this filesystems + starts, ends: int or list + Bytes limits of the read. If using a single int, the same value will be + used to read all the specified files. + """ + # TODO: on_error + if max_gap is not None: + # use utils.merge_offset_ranges + raise NotImplementedError + if not isinstance(paths, list): + raise TypeError + if not isinstance(starts, Iterable): + starts = [starts] * len(paths) + if not isinstance(ends, Iterable): + ends = [ends] * len(paths) + if len(starts) != len(paths) or len(ends) != len(paths): + raise ValueError + coros = [ + self._cat_file(p, start=s, end=e, **kwargs) + for p, s, e in zip(paths, starts, ends) + ] + batch_size = batch_size or self.batch_size + return await _run_coros_in_chunks( + coros, batch_size=batch_size, nofiles=True, return_exceptions=True + ) + + async def _put_file(self, lpath, rpath, mode="overwrite", **kwargs): + raise NotImplementedError + + async def _put( + self, + lpath, + rpath, + recursive=False, + callback=DEFAULT_CALLBACK, + batch_size=None, + maxdepth=None, + **kwargs, + ): + """Copy file(s) from local. + + Copies a specific file or tree of files (if recursive=True). If rpath + ends with a "/", it will be assumed to be a directory, and target files + will go within. + + The put_file method will be called concurrently on a batch of files. The + batch_size option can configure the amount of futures that can be executed + at the same time. If it is -1, then all the files will be uploaded concurrently. + The default can be set for this instance by passing "batch_size" in the + constructor, or for all instances by setting the "gather_batch_size" key + in ``fsspec.config.conf``, falling back to 1/8th of the system limit . + """ + if isinstance(lpath, list) and isinstance(rpath, list): + # No need to expand paths when both source and destination + # are provided as lists + rpaths = rpath + lpaths = lpath + else: + source_is_str = isinstance(lpath, str) + if source_is_str: + lpath = make_path_posix(lpath) + fs = LocalFileSystem() + lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not copy directories + lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] + if not lpaths: + return + + source_is_file = len(lpaths) == 1 + dest_is_dir = isinstance(rpath, str) and ( + trailing_sep(rpath) or await self._isdir(rpath) + ) + + rpath = self._strip_protocol(rpath) + exists = source_is_str and ( + (has_magic(lpath) and source_is_file) + or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) + ) + rpaths = other_paths( + lpaths, + rpath, + exists=exists, + flatten=not source_is_str, + ) + + is_dir = {l: os.path.isdir(l) for l in lpaths} + rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] + file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] + + await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) + batch_size = batch_size or self.batch_size + + coros = [] + callback.set_size(len(file_pairs)) + for lfile, rfile in file_pairs: + put_file = callback.branch_coro(self._put_file) + coros.append(put_file(lfile, rfile, **kwargs)) + + return await _run_coros_in_chunks( + coros, batch_size=batch_size, callback=callback + ) + + async def _get_file(self, rpath, lpath, **kwargs): + raise NotImplementedError + + async def _get( + self, + rpath, + lpath, + recursive=False, + callback=DEFAULT_CALLBACK, + maxdepth=None, + **kwargs, + ): + """Copy file(s) to local. + + Copies a specific file or tree of files (if recursive=True). If lpath + ends with a "/", it will be assumed to be a directory, and target files + will go within. Can submit a list of paths, which may be glob-patterns + and will be expanded. + + The get_file method will be called concurrently on a batch of files. The + batch_size option can configure the amount of futures that can be executed + at the same time. If it is -1, then all the files will be uploaded concurrently. + The default can be set for this instance by passing "batch_size" in the + constructor, or for all instances by setting the "gather_batch_size" key + in ``fsspec.config.conf``, falling back to 1/8th of the system limit . + """ + if isinstance(lpath, list) and isinstance(rpath, list): + # No need to expand paths when both source and destination + # are provided as lists + rpaths = rpath + lpaths = lpath + else: + source_is_str = isinstance(rpath, str) + # First check for rpath trailing slash as _strip_protocol removes it. + source_not_trailing_sep = source_is_str and not trailing_sep(rpath) + rpath = self._strip_protocol(rpath) + rpaths = await self._expand_path( + rpath, recursive=recursive, maxdepth=maxdepth + ) + if source_is_str and (not recursive or maxdepth is not None): + # Non-recursive glob does not copy directories + rpaths = [ + p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) + ] + if not rpaths: + return + + lpath = make_path_posix(lpath) + source_is_file = len(rpaths) == 1 + dest_is_dir = isinstance(lpath, str) and ( + trailing_sep(lpath) or LocalFileSystem().isdir(lpath) + ) + + exists = source_is_str and ( + (has_magic(rpath) and source_is_file) + or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep) + ) + lpaths = other_paths( + rpaths, + lpath, + exists=exists, + flatten=not source_is_str, + ) + + [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] + batch_size = kwargs.pop("batch_size", self.batch_size) + + coros = [] + callback.set_size(len(lpaths)) + for lpath, rpath in zip(lpaths, rpaths): + get_file = callback.branch_coro(self._get_file) + coros.append(get_file(rpath, lpath, **kwargs)) + return await _run_coros_in_chunks( + coros, batch_size=batch_size, callback=callback + ) + + async def _isfile(self, path): + try: + return (await self._info(path))["type"] == "file" + except: # noqa: E722 + return False + + async def _isdir(self, path): + try: + return (await self._info(path))["type"] == "directory" + except OSError: + return False + + async def _size(self, path): + return (await self._info(path)).get("size", None) + + async def _sizes(self, paths, batch_size=None): + batch_size = batch_size or self.batch_size + return await _run_coros_in_chunks( + [self._size(p) for p in paths], batch_size=batch_size + ) + + async def _exists(self, path, **kwargs): + try: + await self._info(path, **kwargs) + return True + except FileNotFoundError: + return False + + async def _info(self, path, **kwargs): + raise NotImplementedError + + async def _ls(self, path, detail=True, **kwargs): + raise NotImplementedError + + async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): + if maxdepth is not None and maxdepth < 1: + raise ValueError("maxdepth must be at least 1") + + path = self._strip_protocol(path) + full_dirs = {} + dirs = {} + files = {} + + detail = kwargs.pop("detail", False) + try: + listing = await self._ls(path, detail=True, **kwargs) + except (FileNotFoundError, OSError) as e: + if on_error == "raise": + raise + elif callable(on_error): + on_error(e) + if detail: + yield path, {}, {} + else: + yield path, [], [] + return + + for info in listing: + # each info name must be at least [path]/part , but here + # we check also for names like [path]/part/ + pathname = info["name"].rstrip("/") + name = pathname.rsplit("/", 1)[-1] + if info["type"] == "directory" and pathname != path: + # do not include "self" path + full_dirs[name] = pathname + dirs[name] = info + elif pathname == path: + # file-like with same name as give path + files[""] = info + else: + files[name] = info + + if detail: + yield path, dirs, files + else: + yield path, list(dirs), list(files) + + if maxdepth is not None: + maxdepth -= 1 + if maxdepth < 1: + return + + for d in dirs: + async for _ in self._walk( + full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs + ): + yield _ + + async def _glob(self, path, maxdepth=None, **kwargs): + if maxdepth is not None and maxdepth < 1: + raise ValueError("maxdepth must be at least 1") + + import re + + seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) + ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash + path = self._strip_protocol(path) + append_slash_to_dirname = ends_with_sep or path.endswith( + tuple(sep + "**" for sep in seps) + ) + idx_star = path.find("*") if path.find("*") >= 0 else len(path) + idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) + idx_brace = path.find("[") if path.find("[") >= 0 else len(path) + + min_idx = min(idx_star, idx_qmark, idx_brace) + + detail = kwargs.pop("detail", False) + + if not has_magic(path): + if await self._exists(path, **kwargs): + if not detail: + return [path] + else: + return {path: await self._info(path, **kwargs)} + else: + if not detail: + return [] # glob of non-existent returns empty + else: + return {} + elif "/" in path[:min_idx]: + min_idx = path[:min_idx].rindex("/") + root = path[: min_idx + 1] + depth = path[min_idx + 1 :].count("/") + 1 + else: + root = "" + depth = path[min_idx + 1 :].count("/") + 1 + + if "**" in path: + if maxdepth is not None: + idx_double_stars = path.find("**") + depth_double_stars = path[idx_double_stars:].count("/") + 1 + depth = depth - depth_double_stars + maxdepth + else: + depth = None + + allpaths = await self._find( + root, maxdepth=depth, withdirs=True, detail=True, **kwargs + ) + + pattern = glob_translate(path + ("/" if ends_with_sep else "")) + pattern = re.compile(pattern) + + out = { + p: info + for p, info in sorted(allpaths.items()) + if pattern.match( + p + "/" + if append_slash_to_dirname and info["type"] == "directory" + else p + ) + } + + if detail: + return out + else: + return list(out) + + async def _du(self, path, total=True, maxdepth=None, **kwargs): + sizes = {} + # async for? + for f in await self._find(path, maxdepth=maxdepth, **kwargs): + info = await self._info(f) + sizes[info["name"]] = info["size"] + if total: + return sum(sizes.values()) + else: + return sizes + + async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): + path = self._strip_protocol(path) + out = {} + detail = kwargs.pop("detail", False) + + # Add the root directory if withdirs is requested + # This is needed for posix glob compliance + if withdirs and path != "" and await self._isdir(path): + out[path] = await self._info(path) + + # async for? + async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): + if withdirs: + files.update(dirs) + out.update({info["name"]: info for name, info in files.items()}) + if not out and (await self._isfile(path)): + # walk works on directories, but find should also return [path] + # when path happens to be a file + out[path] = {} + names = sorted(out) + if not detail: + return names + else: + return {name: out[name] for name in names} + + async def _expand_path(self, path, recursive=False, maxdepth=None): + if maxdepth is not None and maxdepth < 1: + raise ValueError("maxdepth must be at least 1") + + if isinstance(path, str): + out = await self._expand_path([path], recursive, maxdepth) + else: + out = set() + path = [self._strip_protocol(p) for p in path] + for p in path: # can gather here + if has_magic(p): + bit = set(await self._glob(p, maxdepth=maxdepth)) + out |= bit + if recursive: + # glob call above expanded one depth so if maxdepth is defined + # then decrement it in expand_path call below. If it is zero + # after decrementing then avoid expand_path call. + if maxdepth is not None and maxdepth <= 1: + continue + out |= set( + await self._expand_path( + list(bit), + recursive=recursive, + maxdepth=maxdepth - 1 if maxdepth is not None else None, + ) + ) + continue + elif recursive: + rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) + out |= rec + if p not in out and (recursive is False or (await self._exists(p))): + # should only check once, for the root + out.add(p) + if not out: + raise FileNotFoundError(path) + return sorted(out) + + async def _mkdir(self, path, create_parents=True, **kwargs): + pass # not necessary to implement, may not have directories + + async def _makedirs(self, path, exist_ok=False): + pass # not necessary to implement, may not have directories + + async def open_async(self, path, mode="rb", **kwargs): + if "b" not in mode or kwargs.get("compression"): + raise ValueError + raise NotImplementedError + + +def mirror_sync_methods(obj): + """Populate sync and async methods for obj + + For each method will create a sync version if the name refers to an async method + (coroutine) and there is no override in the child class; will create an async + method for the corresponding sync method if there is no implementation. + + Uses the methods specified in + - async_methods: the set that an implementation is expected to provide + - default_async_methods: that can be derived from their sync version in + AbstractFileSystem + - AsyncFileSystem: async-specific default coroutines + """ + from fsspec import AbstractFileSystem + + for method in async_methods + dir(AsyncFileSystem): + if not method.startswith("_"): + continue + smethod = method[1:] + if private.match(method): + isco = inspect.iscoroutinefunction(getattr(obj, method, None)) + unsync = getattr(getattr(obj, smethod, False), "__func__", None) + is_default = unsync is getattr(AbstractFileSystem, smethod, "") + if isco and is_default: + mth = sync_wrapper(getattr(obj, method), obj=obj) + setattr(obj, smethod, mth) + if not mth.__doc__: + mth.__doc__ = getattr( + getattr(AbstractFileSystem, smethod, None), "__doc__", "" + ) + + +class FSSpecCoroutineCancel(Exception): + pass + + +def _dump_running_tasks( + printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False +): + import traceback + + tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()] + if printout: + [task.print_stack() for task in tasks] + out = [ + { + "locals": task._coro.cr_frame.f_locals, + "file": task._coro.cr_frame.f_code.co_filename, + "firstline": task._coro.cr_frame.f_code.co_firstlineno, + "linelo": task._coro.cr_frame.f_lineno, + "stack": traceback.format_stack(task._coro.cr_frame), + "task": task if with_task else None, + } + for task in tasks + ] + if cancel: + for t in tasks: + cbs = t._callbacks + t.cancel() + asyncio.futures.Future.set_exception(t, exc) + asyncio.futures.Future.cancel(t) + [cb[0](t) for cb in cbs] # cancels any dependent concurrent.futures + try: + t._coro.throw(exc) # exits coro, unless explicitly handled + except exc: + pass + return out + + +class AbstractAsyncStreamedFile(AbstractBufferedFile): + # no read buffering, and always auto-commit + # TODO: readahead might still be useful here, but needs async version + + async def read(self, length=-1): + """ + Return data from cache, or fetch pieces as necessary + + Parameters + ---------- + length: int (-1) + Number of bytes to read; if <0, all remaining bytes. + """ + length = -1 if length is None else int(length) + if self.mode != "rb": + raise ValueError("File not in read mode") + if length < 0: + length = self.size - self.loc + if self.closed: + raise ValueError("I/O operation on closed file.") + if length == 0: + # don't even bother calling fetch + return b"" + out = await self._fetch_range(self.loc, self.loc + length) + self.loc += len(out) + return out + + async def write(self, data): + """ + Write data to buffer. + + Buffer only sent on flush() or if buffer is greater than + or equal to blocksize. + + Parameters + ---------- + data: bytes + Set of bytes to be written. + """ + if self.mode not in {"wb", "ab"}: + raise ValueError("File not in write mode") + if self.closed: + raise ValueError("I/O operation on closed file.") + if self.forced: + raise ValueError("This file has been force-flushed, can only close") + out = self.buffer.write(data) + self.loc += out + if self.buffer.tell() >= self.blocksize: + await self.flush() + return out + + async def close(self): + """Close file + + Finalizes writes, discards cache + """ + if getattr(self, "_unclosable", False): + return + if self.closed: + return + if self.mode == "rb": + self.cache = None + else: + if not self.forced: + await self.flush(force=True) + + if self.fs is not None: + self.fs.invalidate_cache(self.path) + self.fs.invalidate_cache(self.fs._parent(self.path)) + + self.closed = True + + async def flush(self, force=False): + if self.closed: + raise ValueError("Flush on closed file") + if force and self.forced: + raise ValueError("Force flush cannot be called more than once") + if force: + self.forced = True + + if self.mode not in {"wb", "ab"}: + # no-op to flush on read-mode + return + + if not force and self.buffer.tell() < self.blocksize: + # Defer write on small block + return + + if self.offset is None: + # Initialize a multipart upload + self.offset = 0 + try: + await self._initiate_upload() + except: + self.closed = True + raise + + if await self._upload_chunk(final=force) is not False: + self.offset += self.buffer.seek(0, 2) + self.buffer = io.BytesIO() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.close() + + async def _fetch_range(self, start, end): + raise NotImplementedError + + async def _initiate_upload(self): + pass + + async def _upload_chunk(self, final=False): + raise NotImplementedError |