about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/implementations/arrow.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/arrow.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/implementations/arrow.py304
1 files changed, 304 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/arrow.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/arrow.py
new file mode 100644
index 00000000..530df901
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/arrow.py
@@ -0,0 +1,304 @@
+import errno
+import io
+import os
+import secrets
+import shutil
+from contextlib import suppress
+from functools import cached_property, wraps
+from urllib.parse import parse_qs
+
+from fsspec.spec import AbstractFileSystem
+from fsspec.utils import (
+    get_package_version_without_import,
+    infer_storage_options,
+    mirror_from,
+    tokenize,
+)
+
+
+def wrap_exceptions(func):
+    @wraps(func)
+    def wrapper(*args, **kwargs):
+        try:
+            return func(*args, **kwargs)
+        except OSError as exception:
+            if not exception.args:
+                raise
+
+            message, *args = exception.args
+            if isinstance(message, str) and "does not exist" in message:
+                raise FileNotFoundError(errno.ENOENT, message) from exception
+            else:
+                raise
+
+    return wrapper
+
+
+PYARROW_VERSION = None
+
+
+class ArrowFSWrapper(AbstractFileSystem):
+    """FSSpec-compatible wrapper of pyarrow.fs.FileSystem.
+
+    Parameters
+    ----------
+    fs : pyarrow.fs.FileSystem
+
+    """
+
+    root_marker = "/"
+
+    def __init__(self, fs, **kwargs):
+        global PYARROW_VERSION
+        PYARROW_VERSION = get_package_version_without_import("pyarrow")
+        self.fs = fs
+        super().__init__(**kwargs)
+
+    @property
+    def protocol(self):
+        return self.fs.type_name
+
+    @cached_property
+    def fsid(self):
+        return "hdfs_" + tokenize(self.fs.host, self.fs.port)
+
+    @classmethod
+    def _strip_protocol(cls, path):
+        ops = infer_storage_options(path)
+        path = ops["path"]
+        if path.startswith("//"):
+            # special case for "hdfs://path" (without the triple slash)
+            path = path[1:]
+        return path
+
+    def ls(self, path, detail=False, **kwargs):
+        path = self._strip_protocol(path)
+        from pyarrow.fs import FileSelector
+
+        entries = [
+            self._make_entry(entry)
+            for entry in self.fs.get_file_info(FileSelector(path))
+        ]
+        if detail:
+            return entries
+        else:
+            return [entry["name"] for entry in entries]
+
+    def info(self, path, **kwargs):
+        path = self._strip_protocol(path)
+        [info] = self.fs.get_file_info([path])
+        return self._make_entry(info)
+
+    def exists(self, path):
+        path = self._strip_protocol(path)
+        try:
+            self.info(path)
+        except FileNotFoundError:
+            return False
+        else:
+            return True
+
+    def _make_entry(self, info):
+        from pyarrow.fs import FileType
+
+        if info.type is FileType.Directory:
+            kind = "directory"
+        elif info.type is FileType.File:
+            kind = "file"
+        elif info.type is FileType.NotFound:
+            raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), info.path)
+        else:
+            kind = "other"
+
+        return {
+            "name": info.path,
+            "size": info.size,
+            "type": kind,
+            "mtime": info.mtime,
+        }
+
+    @wrap_exceptions
+    def cp_file(self, path1, path2, **kwargs):
+        path1 = self._strip_protocol(path1).rstrip("/")
+        path2 = self._strip_protocol(path2).rstrip("/")
+
+        with self._open(path1, "rb") as lstream:
+            tmp_fname = f"{path2}.tmp.{secrets.token_hex(6)}"
+            try:
+                with self.open(tmp_fname, "wb") as rstream:
+                    shutil.copyfileobj(lstream, rstream)
+                self.fs.move(tmp_fname, path2)
+            except BaseException:
+                with suppress(FileNotFoundError):
+                    self.fs.delete_file(tmp_fname)
+                raise
+
+    @wrap_exceptions
+    def mv(self, path1, path2, **kwargs):
+        path1 = self._strip_protocol(path1).rstrip("/")
+        path2 = self._strip_protocol(path2).rstrip("/")
+        self.fs.move(path1, path2)
+
+    @wrap_exceptions
+    def rm_file(self, path):
+        path = self._strip_protocol(path)
+        self.fs.delete_file(path)
+
+    @wrap_exceptions
+    def rm(self, path, recursive=False, maxdepth=None):
+        path = self._strip_protocol(path).rstrip("/")
+        if self.isdir(path):
+            if recursive:
+                self.fs.delete_dir(path)
+            else:
+                raise ValueError("Can't delete directories without recursive=False")
+        else:
+            self.fs.delete_file(path)
+
+    @wrap_exceptions
+    def _open(self, path, mode="rb", block_size=None, seekable=True, **kwargs):
+        if mode == "rb":
+            if seekable:
+                method = self.fs.open_input_file
+            else:
+                method = self.fs.open_input_stream
+        elif mode == "wb":
+            method = self.fs.open_output_stream
+        elif mode == "ab":
+            method = self.fs.open_append_stream
+        else:
+            raise ValueError(f"unsupported mode for Arrow filesystem: {mode!r}")
+
+        _kwargs = {}
+        if mode != "rb" or not seekable:
+            if int(PYARROW_VERSION.split(".")[0]) >= 4:
+                # disable compression auto-detection
+                _kwargs["compression"] = None
+        stream = method(path, **_kwargs)
+
+        return ArrowFile(self, stream, path, mode, block_size, **kwargs)
+
+    @wrap_exceptions
+    def mkdir(self, path, create_parents=True, **kwargs):
+        path = self._strip_protocol(path)
+        if create_parents:
+            self.makedirs(path, exist_ok=True)
+        else:
+            self.fs.create_dir(path, recursive=False)
+
+    @wrap_exceptions
+    def makedirs(self, path, exist_ok=False):
+        path = self._strip_protocol(path)
+        self.fs.create_dir(path, recursive=True)
+
+    @wrap_exceptions
+    def rmdir(self, path):
+        path = self._strip_protocol(path)
+        self.fs.delete_dir(path)
+
+    @wrap_exceptions
+    def modified(self, path):
+        path = self._strip_protocol(path)
+        return self.fs.get_file_info(path).mtime
+
+    def cat_file(self, path, start=None, end=None, **kwargs):
+        kwargs["seekable"] = start not in [None, 0]
+        return super().cat_file(path, start=None, end=None, **kwargs)
+
+    def get_file(self, rpath, lpath, **kwargs):
+        kwargs["seekable"] = False
+        super().get_file(rpath, lpath, **kwargs)
+
+
+@mirror_from(
+    "stream",
+    [
+        "read",
+        "seek",
+        "tell",
+        "write",
+        "readable",
+        "writable",
+        "close",
+        "size",
+        "seekable",
+    ],
+)
+class ArrowFile(io.IOBase):
+    def __init__(self, fs, stream, path, mode, block_size=None, **kwargs):
+        self.path = path
+        self.mode = mode
+
+        self.fs = fs
+        self.stream = stream
+
+        self.blocksize = self.block_size = block_size
+        self.kwargs = kwargs
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args):
+        return self.close()
+
+
+class HadoopFileSystem(ArrowFSWrapper):
+    """A wrapper on top of the pyarrow.fs.HadoopFileSystem
+    to connect it's interface with fsspec"""
+
+    protocol = "hdfs"
+
+    def __init__(
+        self,
+        host="default",
+        port=0,
+        user=None,
+        kerb_ticket=None,
+        replication=3,
+        extra_conf=None,
+        **kwargs,
+    ):
+        """
+
+        Parameters
+        ----------
+        host: str
+            Hostname, IP or "default" to try to read from Hadoop config
+        port: int
+            Port to connect on, or default from Hadoop config if 0
+        user: str or None
+            If given, connect as this username
+        kerb_ticket: str or None
+            If given, use this ticket for authentication
+        replication: int
+            set replication factor of file for write operations. default value is 3.
+        extra_conf: None or dict
+            Passed on to HadoopFileSystem
+        """
+        from pyarrow.fs import HadoopFileSystem
+
+        fs = HadoopFileSystem(
+            host=host,
+            port=port,
+            user=user,
+            kerb_ticket=kerb_ticket,
+            replication=replication,
+            extra_conf=extra_conf,
+        )
+        super().__init__(fs=fs, **kwargs)
+
+    @staticmethod
+    def _get_kwargs_from_urls(path):
+        ops = infer_storage_options(path)
+        out = {}
+        if ops.get("host", None):
+            out["host"] = ops["host"]
+        if ops.get("username", None):
+            out["user"] = ops["username"]
+        if ops.get("port", None):
+            out["port"] = ops["port"]
+        if ops.get("url_query", None):
+            queries = parse_qs(ops["url_query"])
+            if queries.get("replication", None):
+                out["replication"] = int(queries["replication"][0])
+        return out