about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/implementations/libarchive.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/fsspec/implementations/libarchive.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/libarchive.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/implementations/libarchive.py213
1 files changed, 213 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/libarchive.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/libarchive.py
new file mode 100644
index 00000000..eb6f1453
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/libarchive.py
@@ -0,0 +1,213 @@
+from contextlib import contextmanager
+from ctypes import (
+    CFUNCTYPE,
+    POINTER,
+    c_int,
+    c_longlong,
+    c_void_p,
+    cast,
+    create_string_buffer,
+)
+
+import libarchive
+import libarchive.ffi as ffi
+
+from fsspec import open_files
+from fsspec.archive import AbstractArchiveFileSystem
+from fsspec.implementations.memory import MemoryFile
+from fsspec.utils import DEFAULT_BLOCK_SIZE
+
+# Libarchive requires seekable files or memory only for certain archive
+# types. However, since we read the directory first to cache the contents
+# and also allow random access to any file, the file-like object needs
+# to be seekable no matter what.
+
+# Seek call-backs (not provided in the libarchive python wrapper)
+SEEK_CALLBACK = CFUNCTYPE(c_longlong, c_int, c_void_p, c_longlong, c_int)
+read_set_seek_callback = ffi.ffi(
+    "read_set_seek_callback", [ffi.c_archive_p, SEEK_CALLBACK], c_int, ffi.check_int
+)
+new_api = hasattr(ffi, "NO_OPEN_CB")
+
+
+@contextmanager
+def custom_reader(file, format_name="all", filter_name="all", block_size=ffi.page_size):
+    """Read an archive from a seekable file-like object.
+
+    The `file` object must support the standard `readinto` and 'seek' methods.
+    """
+    buf = create_string_buffer(block_size)
+    buf_p = cast(buf, c_void_p)
+
+    def read_func(archive_p, context, ptrptr):
+        # readinto the buffer, returns number of bytes read
+        length = file.readinto(buf)
+        # write the address of the buffer into the pointer
+        ptrptr = cast(ptrptr, POINTER(c_void_p))
+        ptrptr[0] = buf_p
+        # tell libarchive how much data was written into the buffer
+        return length
+
+    def seek_func(archive_p, context, offset, whence):
+        file.seek(offset, whence)
+        # tell libarchvie the current position
+        return file.tell()
+
+    read_cb = ffi.READ_CALLBACK(read_func)
+    seek_cb = SEEK_CALLBACK(seek_func)
+
+    if new_api:
+        open_cb = ffi.NO_OPEN_CB
+        close_cb = ffi.NO_CLOSE_CB
+    else:
+        open_cb = libarchive.read.OPEN_CALLBACK(ffi.VOID_CB)
+        close_cb = libarchive.read.CLOSE_CALLBACK(ffi.VOID_CB)
+
+    with libarchive.read.new_archive_read(format_name, filter_name) as archive_p:
+        read_set_seek_callback(archive_p, seek_cb)
+        ffi.read_open(archive_p, None, open_cb, read_cb, close_cb)
+        yield libarchive.read.ArchiveRead(archive_p)
+
+
+class LibArchiveFileSystem(AbstractArchiveFileSystem):
+    """Compressed archives as a file-system (read-only)
+
+    Supports the following formats:
+    tar, pax , cpio, ISO9660, zip, mtree, shar, ar, raw, xar, lha/lzh, rar
+    Microsoft CAB, 7-Zip, WARC
+
+    See the libarchive documentation for further restrictions.
+    https://www.libarchive.org/
+
+    Keeps file object open while instance lives. It only works in seekable
+    file-like objects. In case the filesystem does not support this kind of
+    file object, it is recommended to cache locally.
+
+    This class is pickleable, but not necessarily thread-safe (depends on the
+    platform). See libarchive documentation for details.
+    """
+
+    root_marker = ""
+    protocol = "libarchive"
+    cachable = False
+
+    def __init__(
+        self,
+        fo="",
+        mode="r",
+        target_protocol=None,
+        target_options=None,
+        block_size=DEFAULT_BLOCK_SIZE,
+        **kwargs,
+    ):
+        """
+        Parameters
+        ----------
+        fo: str or file-like
+            Contains ZIP, and must exist. If a str, will fetch file using
+            :meth:`~fsspec.open_files`, which must return one file exactly.
+        mode: str
+            Currently, only 'r' accepted
+        target_protocol: str (optional)
+            If ``fo`` is a string, this value can be used to override the
+            FS protocol inferred from a URL
+        target_options: dict (optional)
+            Kwargs passed when instantiating the target FS, if ``fo`` is
+            a string.
+        """
+        super().__init__(self, **kwargs)
+        if mode != "r":
+            raise ValueError("Only read from archive files accepted")
+        if isinstance(fo, str):
+            files = open_files(fo, protocol=target_protocol, **(target_options or {}))
+            if len(files) != 1:
+                raise ValueError(
+                    f'Path "{fo}" did not resolve to exactly one file: "{files}"'
+                )
+            fo = files[0]
+        self.of = fo
+        self.fo = fo.__enter__()  # the whole instance is a context
+        self.block_size = block_size
+        self.dir_cache = None
+
+    @contextmanager
+    def _open_archive(self):
+        self.fo.seek(0)
+        with custom_reader(self.fo, block_size=self.block_size) as arc:
+            yield arc
+
+    @classmethod
+    def _strip_protocol(cls, path):
+        # file paths are always relative to the archive root
+        return super()._strip_protocol(path).lstrip("/")
+
+    def _get_dirs(self):
+        fields = {
+            "name": "pathname",
+            "size": "size",
+            "created": "ctime",
+            "mode": "mode",
+            "uid": "uid",
+            "gid": "gid",
+            "mtime": "mtime",
+        }
+
+        if self.dir_cache is not None:
+            return
+
+        self.dir_cache = {}
+        list_names = []
+        with self._open_archive() as arc:
+            for entry in arc:
+                if not entry.isdir and not entry.isfile:
+                    # Skip symbolic links, fifo entries, etc.
+                    continue
+                self.dir_cache.update(
+                    {
+                        dirname: {"name": dirname, "size": 0, "type": "directory"}
+                        for dirname in self._all_dirnames(set(entry.name))
+                    }
+                )
+                f = {key: getattr(entry, fields[key]) for key in fields}
+                f["type"] = "directory" if entry.isdir else "file"
+                list_names.append(entry.name)
+
+                self.dir_cache[f["name"]] = f
+        # libarchive does not seem to return an entry for the directories (at least
+        # not in all formats), so get the directories names from the files names
+        self.dir_cache.update(
+            {
+                dirname: {"name": dirname, "size": 0, "type": "directory"}
+                for dirname in self._all_dirnames(list_names)
+            }
+        )
+
+    def _open(
+        self,
+        path,
+        mode="rb",
+        block_size=None,
+        autocommit=True,
+        cache_options=None,
+        **kwargs,
+    ):
+        path = self._strip_protocol(path)
+        if mode != "rb":
+            raise NotImplementedError
+
+        data = bytes()
+        with self._open_archive() as arc:
+            for entry in arc:
+                if entry.pathname != path:
+                    continue
+
+                if entry.size == 0:
+                    # empty file, so there are no blocks
+                    break
+
+                for block in entry.get_blocks(entry.size):
+                    data = block
+                    break
+                else:
+                    raise ValueError
+        return MemoryFile(fs=self, path=path, data=data)