diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/dirfs.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/fsspec/implementations/dirfs.py | 384 |
1 files changed, 384 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/dirfs.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/dirfs.py new file mode 100644 index 00000000..0bee8646 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/dirfs.py @@ -0,0 +1,384 @@ +from .. import filesystem +from ..asyn import AsyncFileSystem + + +class DirFileSystem(AsyncFileSystem): + """Directory prefix filesystem + + The DirFileSystem is a filesystem-wrapper. It assumes every path it is dealing with + is relative to the `path`. After performing the necessary paths operation it + delegates everything to the wrapped filesystem. + """ + + protocol = "dir" + + def __init__( + self, + path=None, + fs=None, + fo=None, + target_protocol=None, + target_options=None, + **storage_options, + ): + """ + Parameters + ---------- + path: str + Path to the directory. + fs: AbstractFileSystem + An instantiated filesystem to wrap. + target_protocol, target_options: + if fs is none, construct it from these + fo: str + Alternate for path; do not provide both + """ + super().__init__(**storage_options) + if fs is None: + fs = filesystem(protocol=target_protocol, **(target_options or {})) + if (path is not None) ^ (fo is not None) is False: + raise ValueError("Provide path or fo, not both") + path = path or fo + + if self.asynchronous and not fs.async_impl: + raise ValueError("can't use asynchronous with non-async fs") + + if fs.async_impl and self.asynchronous != fs.asynchronous: + raise ValueError("both dirfs and fs should be in the same sync/async mode") + + self.path = fs._strip_protocol(path) + self.fs = fs + + def _join(self, path): + if isinstance(path, str): + if not self.path: + return path + if not path: + return self.path + return self.fs.sep.join((self.path, self._strip_protocol(path))) + if isinstance(path, dict): + return {self._join(_path): value for _path, value in path.items()} + return [self._join(_path) for _path in path] + + def _relpath(self, path): + if isinstance(path, str): + if not self.path: + return path + # We need to account for S3FileSystem returning paths that do not + # start with a '/' + if path == self.path or ( + self.path.startswith(self.fs.sep) and path == self.path[1:] + ): + return "" + prefix = self.path + self.fs.sep + if self.path.startswith(self.fs.sep) and not path.startswith(self.fs.sep): + prefix = prefix[1:] + assert path.startswith(prefix) + return path[len(prefix) :] + return [self._relpath(_path) for _path in path] + + # Wrappers below + + @property + def sep(self): + return self.fs.sep + + async def set_session(self, *args, **kwargs): + return await self.fs.set_session(*args, **kwargs) + + async def _rm_file(self, path, **kwargs): + return await self.fs._rm_file(self._join(path), **kwargs) + + def rm_file(self, path, **kwargs): + return self.fs.rm_file(self._join(path), **kwargs) + + async def _rm(self, path, *args, **kwargs): + return await self.fs._rm(self._join(path), *args, **kwargs) + + def rm(self, path, *args, **kwargs): + return self.fs.rm(self._join(path), *args, **kwargs) + + async def _cp_file(self, path1, path2, **kwargs): + return await self.fs._cp_file(self._join(path1), self._join(path2), **kwargs) + + def cp_file(self, path1, path2, **kwargs): + return self.fs.cp_file(self._join(path1), self._join(path2), **kwargs) + + async def _copy( + self, + path1, + path2, + *args, + **kwargs, + ): + return await self.fs._copy( + self._join(path1), + self._join(path2), + *args, + **kwargs, + ) + + def copy(self, path1, path2, *args, **kwargs): + return self.fs.copy( + self._join(path1), + self._join(path2), + *args, + **kwargs, + ) + + async def _pipe(self, path, *args, **kwargs): + return await self.fs._pipe(self._join(path), *args, **kwargs) + + def pipe(self, path, *args, **kwargs): + return self.fs.pipe(self._join(path), *args, **kwargs) + + async def _pipe_file(self, path, *args, **kwargs): + return await self.fs._pipe_file(self._join(path), *args, **kwargs) + + def pipe_file(self, path, *args, **kwargs): + return self.fs.pipe_file(self._join(path), *args, **kwargs) + + async def _cat_file(self, path, *args, **kwargs): + return await self.fs._cat_file(self._join(path), *args, **kwargs) + + def cat_file(self, path, *args, **kwargs): + return self.fs.cat_file(self._join(path), *args, **kwargs) + + async def _cat(self, path, *args, **kwargs): + ret = await self.fs._cat( + self._join(path), + *args, + **kwargs, + ) + + if isinstance(ret, dict): + return {self._relpath(key): value for key, value in ret.items()} + + return ret + + def cat(self, path, *args, **kwargs): + ret = self.fs.cat( + self._join(path), + *args, + **kwargs, + ) + + if isinstance(ret, dict): + return {self._relpath(key): value for key, value in ret.items()} + + return ret + + async def _put_file(self, lpath, rpath, **kwargs): + return await self.fs._put_file(lpath, self._join(rpath), **kwargs) + + def put_file(self, lpath, rpath, **kwargs): + return self.fs.put_file(lpath, self._join(rpath), **kwargs) + + async def _put( + self, + lpath, + rpath, + *args, + **kwargs, + ): + return await self.fs._put( + lpath, + self._join(rpath), + *args, + **kwargs, + ) + + def put(self, lpath, rpath, *args, **kwargs): + return self.fs.put( + lpath, + self._join(rpath), + *args, + **kwargs, + ) + + async def _get_file(self, rpath, lpath, **kwargs): + return await self.fs._get_file(self._join(rpath), lpath, **kwargs) + + def get_file(self, rpath, lpath, **kwargs): + return self.fs.get_file(self._join(rpath), lpath, **kwargs) + + async def _get(self, rpath, *args, **kwargs): + return await self.fs._get(self._join(rpath), *args, **kwargs) + + def get(self, rpath, *args, **kwargs): + return self.fs.get(self._join(rpath), *args, **kwargs) + + async def _isfile(self, path): + return await self.fs._isfile(self._join(path)) + + def isfile(self, path): + return self.fs.isfile(self._join(path)) + + async def _isdir(self, path): + return await self.fs._isdir(self._join(path)) + + def isdir(self, path): + return self.fs.isdir(self._join(path)) + + async def _size(self, path): + return await self.fs._size(self._join(path)) + + def size(self, path): + return self.fs.size(self._join(path)) + + async def _exists(self, path): + return await self.fs._exists(self._join(path)) + + def exists(self, path): + return self.fs.exists(self._join(path)) + + async def _info(self, path, **kwargs): + return await self.fs._info(self._join(path), **kwargs) + + def info(self, path, **kwargs): + return self.fs.info(self._join(path), **kwargs) + + async def _ls(self, path, detail=True, **kwargs): + ret = (await self.fs._ls(self._join(path), detail=detail, **kwargs)).copy() + if detail: + out = [] + for entry in ret: + entry = entry.copy() + entry["name"] = self._relpath(entry["name"]) + out.append(entry) + return out + + return self._relpath(ret) + + def ls(self, path, detail=True, **kwargs): + ret = self.fs.ls(self._join(path), detail=detail, **kwargs).copy() + if detail: + out = [] + for entry in ret: + entry = entry.copy() + entry["name"] = self._relpath(entry["name"]) + out.append(entry) + return out + + return self._relpath(ret) + + async def _walk(self, path, *args, **kwargs): + async for root, dirs, files in self.fs._walk(self._join(path), *args, **kwargs): + yield self._relpath(root), dirs, files + + def walk(self, path, *args, **kwargs): + for root, dirs, files in self.fs.walk(self._join(path), *args, **kwargs): + yield self._relpath(root), dirs, files + + async def _glob(self, path, **kwargs): + detail = kwargs.get("detail", False) + ret = await self.fs._glob(self._join(path), **kwargs) + if detail: + return {self._relpath(path): info for path, info in ret.items()} + return self._relpath(ret) + + def glob(self, path, **kwargs): + detail = kwargs.get("detail", False) + ret = self.fs.glob(self._join(path), **kwargs) + if detail: + return {self._relpath(path): info for path, info in ret.items()} + return self._relpath(ret) + + async def _du(self, path, *args, **kwargs): + total = kwargs.get("total", True) + ret = await self.fs._du(self._join(path), *args, **kwargs) + if total: + return ret + + return {self._relpath(path): size for path, size in ret.items()} + + def du(self, path, *args, **kwargs): + total = kwargs.get("total", True) + ret = self.fs.du(self._join(path), *args, **kwargs) + if total: + return ret + + return {self._relpath(path): size for path, size in ret.items()} + + async def _find(self, path, *args, **kwargs): + detail = kwargs.get("detail", False) + ret = await self.fs._find(self._join(path), *args, **kwargs) + if detail: + return {self._relpath(path): info for path, info in ret.items()} + return self._relpath(ret) + + def find(self, path, *args, **kwargs): + detail = kwargs.get("detail", False) + ret = self.fs.find(self._join(path), *args, **kwargs) + if detail: + return {self._relpath(path): info for path, info in ret.items()} + return self._relpath(ret) + + async def _expand_path(self, path, *args, **kwargs): + return self._relpath( + await self.fs._expand_path(self._join(path), *args, **kwargs) + ) + + def expand_path(self, path, *args, **kwargs): + return self._relpath(self.fs.expand_path(self._join(path), *args, **kwargs)) + + async def _mkdir(self, path, *args, **kwargs): + return await self.fs._mkdir(self._join(path), *args, **kwargs) + + def mkdir(self, path, *args, **kwargs): + return self.fs.mkdir(self._join(path), *args, **kwargs) + + async def _makedirs(self, path, *args, **kwargs): + return await self.fs._makedirs(self._join(path), *args, **kwargs) + + def makedirs(self, path, *args, **kwargs): + return self.fs.makedirs(self._join(path), *args, **kwargs) + + def rmdir(self, path): + return self.fs.rmdir(self._join(path)) + + def mv(self, path1, path2, **kwargs): + return self.fs.mv( + self._join(path1), + self._join(path2), + **kwargs, + ) + + def touch(self, path, **kwargs): + return self.fs.touch(self._join(path), **kwargs) + + def created(self, path): + return self.fs.created(self._join(path)) + + def modified(self, path): + return self.fs.modified(self._join(path)) + + def sign(self, path, *args, **kwargs): + return self.fs.sign(self._join(path), *args, **kwargs) + + def __repr__(self): + return f"{self.__class__.__qualname__}(path='{self.path}', fs={self.fs})" + + def open( + self, + path, + *args, + **kwargs, + ): + return self.fs.open( + self._join(path), + *args, + **kwargs, + ) + + async def open_async( + self, + path, + *args, + **kwargs, + ): + return await self.fs.open_async( + self._join(path), + *args, + **kwargs, + ) |