about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/implementations/zip.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/fsspec/implementations/zip.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/zip.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/implementations/zip.py177
1 files changed, 177 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/zip.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/zip.py
new file mode 100644
index 00000000..6db3ae27
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/zip.py
@@ -0,0 +1,177 @@
+import os
+import zipfile
+
+import fsspec
+from fsspec.archive import AbstractArchiveFileSystem
+
+
+class ZipFileSystem(AbstractArchiveFileSystem):
+    """Read/Write contents of ZIP archive as a file-system
+
+    Keeps file object open while instance lives.
+
+    This class is pickleable, but not necessarily thread-safe
+    """
+
+    root_marker = ""
+    protocol = "zip"
+    cachable = False
+
+    def __init__(
+        self,
+        fo="",
+        mode="r",
+        target_protocol=None,
+        target_options=None,
+        compression=zipfile.ZIP_STORED,
+        allowZip64=True,
+        compresslevel=None,
+        **kwargs,
+    ):
+        """
+        Parameters
+        ----------
+        fo: str or file-like
+            Contains ZIP, and must exist. If a str, will fetch file using
+            :meth:`~fsspec.open_files`, which must return one file exactly.
+        mode: str
+            Accept: "r", "w", "a"
+        target_protocol: str (optional)
+            If ``fo`` is a string, this value can be used to override the
+            FS protocol inferred from a URL
+        target_options: dict (optional)
+            Kwargs passed when instantiating the target FS, if ``fo`` is
+            a string.
+        compression, allowZip64, compresslevel: passed to ZipFile
+            Only relevant when creating a ZIP
+        """
+        super().__init__(self, **kwargs)
+        if mode not in set("rwa"):
+            raise ValueError(f"mode '{mode}' no understood")
+        self.mode = mode
+        if isinstance(fo, (str, os.PathLike)):
+            if mode == "a":
+                m = "r+b"
+            else:
+                m = mode + "b"
+            fo = fsspec.open(
+                fo, mode=m, protocol=target_protocol, **(target_options or {})
+            )
+        self.force_zip_64 = allowZip64
+        self.of = fo
+        self.fo = fo.__enter__()  # the whole instance is a context
+        self.zip = zipfile.ZipFile(
+            self.fo,
+            mode=mode,
+            compression=compression,
+            allowZip64=allowZip64,
+            compresslevel=compresslevel,
+        )
+        self.dir_cache = None
+
+    @classmethod
+    def _strip_protocol(cls, path):
+        # zip file paths are always relative to the archive root
+        return super()._strip_protocol(path).lstrip("/")
+
+    def __del__(self):
+        if hasattr(self, "zip"):
+            self.close()
+            del self.zip
+
+    def close(self):
+        """Commits any write changes to the file. Done on ``del`` too."""
+        self.zip.close()
+
+    def _get_dirs(self):
+        if self.dir_cache is None or self.mode in set("wa"):
+            # when writing, dir_cache is always in the ZipFile's attributes,
+            # not read from the file.
+            files = self.zip.infolist()
+            self.dir_cache = {
+                dirname.rstrip("/"): {
+                    "name": dirname.rstrip("/"),
+                    "size": 0,
+                    "type": "directory",
+                }
+                for dirname in self._all_dirnames(self.zip.namelist())
+            }
+            for z in files:
+                f = {s: getattr(z, s, None) for s in zipfile.ZipInfo.__slots__}
+                f.update(
+                    {
+                        "name": z.filename.rstrip("/"),
+                        "size": z.file_size,
+                        "type": ("directory" if z.is_dir() else "file"),
+                    }
+                )
+                self.dir_cache[f["name"]] = f
+
+    def pipe_file(self, path, value, **kwargs):
+        # override upstream, because we know the exact file size in this case
+        self.zip.writestr(path, value, **kwargs)
+
+    def _open(
+        self,
+        path,
+        mode="rb",
+        block_size=None,
+        autocommit=True,
+        cache_options=None,
+        **kwargs,
+    ):
+        path = self._strip_protocol(path)
+        if "r" in mode and self.mode in set("wa"):
+            if self.exists(path):
+                raise OSError("ZipFS can only be open for reading or writing, not both")
+            raise FileNotFoundError(path)
+        if "r" in self.mode and "w" in mode:
+            raise OSError("ZipFS can only be open for reading or writing, not both")
+        out = self.zip.open(path, mode.strip("b"), force_zip64=self.force_zip_64)
+        if "r" in mode:
+            info = self.info(path)
+            out.size = info["size"]
+            out.name = info["name"]
+        return out
+
+    def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
+        if maxdepth is not None and maxdepth < 1:
+            raise ValueError("maxdepth must be at least 1")
+
+        # Remove the leading slash, as the zip file paths are always
+        # given without a leading slash
+        path = path.lstrip("/")
+        path_parts = list(filter(lambda s: bool(s), path.split("/")))
+
+        def _matching_starts(file_path):
+            file_parts = filter(lambda s: bool(s), file_path.split("/"))
+            return all(a == b for a, b in zip(path_parts, file_parts))
+
+        self._get_dirs()
+
+        result = {}
+        # To match posix find, if an exact file name is given, we should
+        # return only that file
+        if path in self.dir_cache and self.dir_cache[path]["type"] == "file":
+            result[path] = self.dir_cache[path]
+            return result if detail else [path]
+
+        for file_path, file_info in self.dir_cache.items():
+            if not (path == "" or _matching_starts(file_path)):
+                continue
+
+            if file_info["type"] == "directory":
+                if withdirs:
+                    if file_path not in result:
+                        result[file_path.strip("/")] = file_info
+                continue
+
+            if file_path not in result:
+                result[file_path] = file_info if detail else None
+
+        if maxdepth:
+            path_depth = path.count("/")
+            result = {
+                k: v for k, v in result.items() if k.count("/") - path_depth < maxdepth
+            }
+        return result if detail else sorted(result)