diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/compression.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/fsspec/compression.py | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/compression.py b/.venv/lib/python3.12/site-packages/fsspec/compression.py new file mode 100644 index 00000000..fc519c24 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/fsspec/compression.py @@ -0,0 +1,175 @@ +"""Helper functions for a standard streaming compression API""" + +from zipfile import ZipFile + +import fsspec.utils +from fsspec.spec import AbstractBufferedFile + + +def noop_file(file, mode, **kwargs): + return file + + +# TODO: files should also be available as contexts +# should be functions of the form func(infile, mode=, **kwargs) -> file-like +compr = {None: noop_file} + + +def register_compression(name, callback, extensions, force=False): + """Register an "inferable" file compression type. + + Registers transparent file compression type for use with fsspec.open. + Compression can be specified by name in open, or "infer"-ed for any files + ending with the given extensions. + + Args: + name: (str) The compression type name. Eg. "gzip". + callback: A callable of form (infile, mode, **kwargs) -> file-like. + Accepts an input file-like object, the target mode and kwargs. + Returns a wrapped file-like object. + extensions: (str, Iterable[str]) A file extension, or list of file + extensions for which to infer this compression scheme. Eg. "gz". + force: (bool) Force re-registration of compression type or extensions. + + Raises: + ValueError: If name or extensions already registered, and not force. + + """ + if isinstance(extensions, str): + extensions = [extensions] + + # Validate registration + if name in compr and not force: + raise ValueError(f"Duplicate compression registration: {name}") + + for ext in extensions: + if ext in fsspec.utils.compressions and not force: + raise ValueError(f"Duplicate compression file extension: {ext} ({name})") + + compr[name] = callback + + for ext in extensions: + fsspec.utils.compressions[ext] = name + + +def unzip(infile, mode="rb", filename=None, **kwargs): + if "r" not in mode: + filename = filename or "file" + z = ZipFile(infile, mode="w", **kwargs) + fo = z.open(filename, mode="w") + fo.close = lambda closer=fo.close: closer() or z.close() + return fo + z = ZipFile(infile) + if filename is None: + filename = z.namelist()[0] + return z.open(filename, mode="r", **kwargs) + + +register_compression("zip", unzip, "zip") + +try: + from bz2 import BZ2File +except ImportError: + pass +else: + register_compression("bz2", BZ2File, "bz2") + +try: # pragma: no cover + from isal import igzip + + def isal(infile, mode="rb", **kwargs): + return igzip.IGzipFile(fileobj=infile, mode=mode, **kwargs) + + register_compression("gzip", isal, "gz") +except ImportError: + from gzip import GzipFile + + register_compression( + "gzip", lambda f, **kwargs: GzipFile(fileobj=f, **kwargs), "gz" + ) + +try: + from lzma import LZMAFile + + register_compression("lzma", LZMAFile, "lzma") + register_compression("xz", LZMAFile, "xz") +except ImportError: + pass + +try: + import lzmaffi + + register_compression("lzma", lzmaffi.LZMAFile, "lzma", force=True) + register_compression("xz", lzmaffi.LZMAFile, "xz", force=True) +except ImportError: + pass + + +class SnappyFile(AbstractBufferedFile): + def __init__(self, infile, mode, **kwargs): + import snappy + + super().__init__( + fs=None, path="snappy", mode=mode.strip("b") + "b", size=999999999, **kwargs + ) + self.infile = infile + if "r" in mode: + self.codec = snappy.StreamDecompressor() + else: + self.codec = snappy.StreamCompressor() + + def _upload_chunk(self, final=False): + self.buffer.seek(0) + out = self.codec.add_chunk(self.buffer.read()) + self.infile.write(out) + return True + + def seek(self, loc, whence=0): + raise NotImplementedError("SnappyFile is not seekable") + + def seekable(self): + return False + + def _fetch_range(self, start, end): + """Get the specified set of bytes from remote""" + data = self.infile.read(end - start) + return self.codec.decompress(data) + + +try: + import snappy + + snappy.compress(b"") + # Snappy may use the .sz file extension, but this is not part of the + # standard implementation. + register_compression("snappy", SnappyFile, []) + +except (ImportError, NameError, AttributeError): + pass + +try: + import lz4.frame + + register_compression("lz4", lz4.frame.open, "lz4") +except ImportError: + pass + +try: + import zstandard as zstd + + def zstandard_file(infile, mode="rb"): + if "r" in mode: + cctx = zstd.ZstdDecompressor() + return cctx.stream_reader(infile) + else: + cctx = zstd.ZstdCompressor(level=10) + return cctx.stream_writer(infile) + + register_compression("zstd", zstandard_file, "zst") +except ImportError: + pass + + +def available_compressions(): + """Return a list of the implemented compressions.""" + return list(compr) |