aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/fsspec/compression.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/compression.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/compression.py175
1 files changed, 175 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/compression.py b/.venv/lib/python3.12/site-packages/fsspec/compression.py
new file mode 100644
index 00000000..fc519c24
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/compression.py
@@ -0,0 +1,175 @@
+"""Helper functions for a standard streaming compression API"""
+
+from zipfile import ZipFile
+
+import fsspec.utils
+from fsspec.spec import AbstractBufferedFile
+
+
+def noop_file(file, mode, **kwargs):
+ return file
+
+
+# TODO: files should also be available as contexts
+# should be functions of the form func(infile, mode=, **kwargs) -> file-like
+compr = {None: noop_file}
+
+
+def register_compression(name, callback, extensions, force=False):
+ """Register an "inferable" file compression type.
+
+ Registers transparent file compression type for use with fsspec.open.
+ Compression can be specified by name in open, or "infer"-ed for any files
+ ending with the given extensions.
+
+ Args:
+ name: (str) The compression type name. Eg. "gzip".
+ callback: A callable of form (infile, mode, **kwargs) -> file-like.
+ Accepts an input file-like object, the target mode and kwargs.
+ Returns a wrapped file-like object.
+ extensions: (str, Iterable[str]) A file extension, or list of file
+ extensions for which to infer this compression scheme. Eg. "gz".
+ force: (bool) Force re-registration of compression type or extensions.
+
+ Raises:
+ ValueError: If name or extensions already registered, and not force.
+
+ """
+ if isinstance(extensions, str):
+ extensions = [extensions]
+
+ # Validate registration
+ if name in compr and not force:
+ raise ValueError(f"Duplicate compression registration: {name}")
+
+ for ext in extensions:
+ if ext in fsspec.utils.compressions and not force:
+ raise ValueError(f"Duplicate compression file extension: {ext} ({name})")
+
+ compr[name] = callback
+
+ for ext in extensions:
+ fsspec.utils.compressions[ext] = name
+
+
+def unzip(infile, mode="rb", filename=None, **kwargs):
+ if "r" not in mode:
+ filename = filename or "file"
+ z = ZipFile(infile, mode="w", **kwargs)
+ fo = z.open(filename, mode="w")
+ fo.close = lambda closer=fo.close: closer() or z.close()
+ return fo
+ z = ZipFile(infile)
+ if filename is None:
+ filename = z.namelist()[0]
+ return z.open(filename, mode="r", **kwargs)
+
+
+register_compression("zip", unzip, "zip")
+
+try:
+ from bz2 import BZ2File
+except ImportError:
+ pass
+else:
+ register_compression("bz2", BZ2File, "bz2")
+
+try: # pragma: no cover
+ from isal import igzip
+
+ def isal(infile, mode="rb", **kwargs):
+ return igzip.IGzipFile(fileobj=infile, mode=mode, **kwargs)
+
+ register_compression("gzip", isal, "gz")
+except ImportError:
+ from gzip import GzipFile
+
+ register_compression(
+ "gzip", lambda f, **kwargs: GzipFile(fileobj=f, **kwargs), "gz"
+ )
+
+try:
+ from lzma import LZMAFile
+
+ register_compression("lzma", LZMAFile, "lzma")
+ register_compression("xz", LZMAFile, "xz")
+except ImportError:
+ pass
+
+try:
+ import lzmaffi
+
+ register_compression("lzma", lzmaffi.LZMAFile, "lzma", force=True)
+ register_compression("xz", lzmaffi.LZMAFile, "xz", force=True)
+except ImportError:
+ pass
+
+
+class SnappyFile(AbstractBufferedFile):
+ def __init__(self, infile, mode, **kwargs):
+ import snappy
+
+ super().__init__(
+ fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs
+ )
+ self.infile = infile
+ if "r" in mode:
+ self.codec = snappy.StreamDecompressor()
+ else:
+ self.codec = snappy.StreamCompressor()
+
+ def _upload_chunk(self, final=False):
+ self.buffer.seek(0)
+ out = self.codec.add_chunk(self.buffer.read())
+ self.infile.write(out)
+ return True
+
+ def seek(self, loc, whence=0):
+ raise NotImplementedError("SnappyFile is not seekable")
+
+ def seekable(self):
+ return False
+
+ def _fetch_range(self, start, end):
+ """Get the specified set of bytes from remote"""
+ data = self.infile.read(end - start)
+ return self.codec.decompress(data)
+
+
+try:
+ import snappy
+
+ snappy.compress(b"")
+ # Snappy may use the .sz file extension, but this is not part of the
+ # standard implementation.
+ register_compression("snappy", SnappyFile, [])
+
+except (ImportError, NameError, AttributeError):
+ pass
+
+try:
+ import lz4.frame
+
+ register_compression("lz4", lz4.frame.open, "lz4")
+except ImportError:
+ pass
+
+try:
+ import zstandard as zstd
+
+ def zstandard_file(infile, mode="rb"):
+ if "r" in mode:
+ cctx = zstd.ZstdDecompressor()
+ return cctx.stream_reader(infile)
+ else:
+ cctx = zstd.ZstdCompressor(level=10)
+ return cctx.stream_writer(infile)
+
+ register_compression("zstd", zstandard_file, "zst")
+except ImportError:
+ pass
+
+
+def available_compressions():
+ """Return a list of the implemented compressions."""
+ return list(compr)