1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
from contextlib import contextmanager
from ctypes import (
CFUNCTYPE,
POINTER,
c_int,
c_longlong,
c_void_p,
cast,
create_string_buffer,
)
import libarchive
import libarchive.ffi as ffi
from fsspec import open_files
from fsspec.archive import AbstractArchiveFileSystem
from fsspec.implementations.memory import MemoryFile
from fsspec.utils import DEFAULT_BLOCK_SIZE
# Libarchive requires seekable files or memory only for certain archive
# types. However, since we read the directory first to cache the contents
# and also allow random access to any file, the file-like object needs
# to be seekable no matter what.
# Seek call-backs (not provided in the libarchive python wrapper)
SEEK_CALLBACK = CFUNCTYPE(c_longlong, c_int, c_void_p, c_longlong, c_int)
read_set_seek_callback = ffi.ffi(
"read_set_seek_callback", [ffi.c_archive_p, SEEK_CALLBACK], c_int, ffi.check_int
)
new_api = hasattr(ffi, "NO_OPEN_CB")
@contextmanager
def custom_reader(file, format_name="all", filter_name="all", block_size=ffi.page_size):
"""Read an archive from a seekable file-like object.
The `file` object must support the standard `readinto` and 'seek' methods.
"""
buf = create_string_buffer(block_size)
buf_p = cast(buf, c_void_p)
def read_func(archive_p, context, ptrptr):
# readinto the buffer, returns number of bytes read
length = file.readinto(buf)
# write the address of the buffer into the pointer
ptrptr = cast(ptrptr, POINTER(c_void_p))
ptrptr[0] = buf_p
# tell libarchive how much data was written into the buffer
return length
def seek_func(archive_p, context, offset, whence):
file.seek(offset, whence)
# tell libarchvie the current position
return file.tell()
read_cb = ffi.READ_CALLBACK(read_func)
seek_cb = SEEK_CALLBACK(seek_func)
if new_api:
open_cb = ffi.NO_OPEN_CB
close_cb = ffi.NO_CLOSE_CB
else:
open_cb = libarchive.read.OPEN_CALLBACK(ffi.VOID_CB)
close_cb = libarchive.read.CLOSE_CALLBACK(ffi.VOID_CB)
with libarchive.read.new_archive_read(format_name, filter_name) as archive_p:
read_set_seek_callback(archive_p, seek_cb)
ffi.read_open(archive_p, None, open_cb, read_cb, close_cb)
yield libarchive.read.ArchiveRead(archive_p)
class LibArchiveFileSystem(AbstractArchiveFileSystem):
"""Compressed archives as a file-system (read-only)
Supports the following formats:
tar, pax , cpio, ISO9660, zip, mtree, shar, ar, raw, xar, lha/lzh, rar
Microsoft CAB, 7-Zip, WARC
See the libarchive documentation for further restrictions.
https://www.libarchive.org/
Keeps file object open while instance lives. It only works in seekable
file-like objects. In case the filesystem does not support this kind of
file object, it is recommended to cache locally.
This class is pickleable, but not necessarily thread-safe (depends on the
platform). See libarchive documentation for details.
"""
root_marker = ""
protocol = "libarchive"
cachable = False
def __init__(
self,
fo="",
mode="r",
target_protocol=None,
target_options=None,
block_size=DEFAULT_BLOCK_SIZE,
**kwargs,
):
"""
Parameters
----------
fo: str or file-like
Contains ZIP, and must exist. If a str, will fetch file using
:meth:`~fsspec.open_files`, which must return one file exactly.
mode: str
Currently, only 'r' accepted
target_protocol: str (optional)
If ``fo`` is a string, this value can be used to override the
FS protocol inferred from a URL
target_options: dict (optional)
Kwargs passed when instantiating the target FS, if ``fo`` is
a string.
"""
super().__init__(self, **kwargs)
if mode != "r":
raise ValueError("Only read from archive files accepted")
if isinstance(fo, str):
files = open_files(fo, protocol=target_protocol, **(target_options or {}))
if len(files) != 1:
raise ValueError(
f'Path "{fo}" did not resolve to exactly one file: "{files}"'
)
fo = files[0]
self.of = fo
self.fo = fo.__enter__() # the whole instance is a context
self.block_size = block_size
self.dir_cache = None
@contextmanager
def _open_archive(self):
self.fo.seek(0)
with custom_reader(self.fo, block_size=self.block_size) as arc:
yield arc
@classmethod
def _strip_protocol(cls, path):
# file paths are always relative to the archive root
return super()._strip_protocol(path).lstrip("/")
def _get_dirs(self):
fields = {
"name": "pathname",
"size": "size",
"created": "ctime",
"mode": "mode",
"uid": "uid",
"gid": "gid",
"mtime": "mtime",
}
if self.dir_cache is not None:
return
self.dir_cache = {}
list_names = []
with self._open_archive() as arc:
for entry in arc:
if not entry.isdir and not entry.isfile:
# Skip symbolic links, fifo entries, etc.
continue
self.dir_cache.update(
{
dirname: {"name": dirname, "size": 0, "type": "directory"}
for dirname in self._all_dirnames(set(entry.name))
}
)
f = {key: getattr(entry, fields[key]) for key in fields}
f["type"] = "directory" if entry.isdir else "file"
list_names.append(entry.name)
self.dir_cache[f["name"]] = f
# libarchive does not seem to return an entry for the directories (at least
# not in all formats), so get the directories names from the files names
self.dir_cache.update(
{
dirname: {"name": dirname, "size": 0, "type": "directory"}
for dirname in self._all_dirnames(list_names)
}
)
def _open(
self,
path,
mode="rb",
block_size=None,
autocommit=True,
cache_options=None,
**kwargs,
):
path = self._strip_protocol(path)
if mode != "rb":
raise NotImplementedError
data = bytes()
with self._open_archive() as arc:
for entry in arc:
if entry.pathname != path:
continue
if entry.size == 0:
# empty file, so there are no blocks
break
for block in entry.get_blocks(entry.size):
data = block
break
else:
raise ValueError
return MemoryFile(fs=self, path=path, data=data)
|