about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.py485
1 files changed, 485 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.py
new file mode 100644
index 00000000..c6e0d844
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.py
@@ -0,0 +1,485 @@
+# https://hadoop.apache.org/docs/r1.0.4/webhdfs.html
+
+import logging
+import os
+import secrets
+import shutil
+import tempfile
+import uuid
+from contextlib import suppress
+from urllib.parse import quote
+
+import requests
+
+from ..spec import AbstractBufferedFile, AbstractFileSystem
+from ..utils import infer_storage_options, tokenize
+
+logger = logging.getLogger("webhdfs")
+
+
+class WebHDFS(AbstractFileSystem):
+    """
+    Interface to HDFS over HTTP using the WebHDFS API. Supports also HttpFS gateways.
+
+    Four auth mechanisms are supported:
+
+    insecure: no auth is done, and the user is assumed to be whoever they
+        say they are (parameter ``user``), or a predefined value such as
+        "dr.who" if not given
+    spnego: when kerberos authentication is enabled, auth is negotiated by
+        requests_kerberos https://github.com/requests/requests-kerberos .
+        This establishes a session based on existing kinit login and/or
+        specified principal/password; parameters are passed with ``kerb_kwargs``
+    token: uses an existing Hadoop delegation token from another secured
+        service. Indeed, this client can also generate such tokens when
+        not insecure. Note that tokens expire, but can be renewed (by a
+        previously specified user) and may allow for proxying.
+    basic-auth: used when both parameter ``user`` and parameter ``password``
+        are provided.
+
+    """
+
+    tempdir = str(tempfile.gettempdir())
+    protocol = "webhdfs", "webHDFS"
+
+    def __init__(
+        self,
+        host,
+        port=50070,
+        kerberos=False,
+        token=None,
+        user=None,
+        password=None,
+        proxy_to=None,
+        kerb_kwargs=None,
+        data_proxy=None,
+        use_https=False,
+        session_cert=None,
+        session_verify=True,
+        **kwargs,
+    ):
+        """
+        Parameters
+        ----------
+        host: str
+            Name-node address
+        port: int
+            Port for webHDFS
+        kerberos: bool
+            Whether to authenticate with kerberos for this connection
+        token: str or None
+            If given, use this token on every call to authenticate. A user
+            and user-proxy may be encoded in the token and should not be also
+            given
+        user: str or None
+            If given, assert the user name to connect with
+        password: str or None
+            If given, assert the password to use for basic auth. If password
+            is provided, user must be provided also
+        proxy_to: str or None
+            If given, the user has the authority to proxy, and this value is
+            the user in who's name actions are taken
+        kerb_kwargs: dict
+            Any extra arguments for HTTPKerberosAuth, see
+            `<https://github.com/requests/requests-kerberos/blob/master/requests_kerberos/kerberos_.py>`_
+        data_proxy: dict, callable or None
+            If given, map data-node addresses. This can be necessary if the
+            HDFS cluster is behind a proxy, running on Docker or otherwise has
+            a mismatch between the host-names given by the name-node and the
+            address by which to refer to them from the client. If a dict,
+            maps host names ``host->data_proxy[host]``; if a callable, full
+            URLs are passed, and function must conform to
+            ``url->data_proxy(url)``.
+        use_https: bool
+            Whether to connect to the Name-node using HTTPS instead of HTTP
+        session_cert: str or Tuple[str, str] or None
+            Path to a certificate file, or tuple of (cert, key) files to use
+            for the requests.Session
+        session_verify: str, bool or None
+            Path to a certificate file to use for verifying the requests.Session.
+        kwargs
+        """
+        if self._cached:
+            return
+        super().__init__(**kwargs)
+        self.url = f"{'https' if use_https else 'http'}://{host}:{port}/webhdfs/v1"
+        self.kerb = kerberos
+        self.kerb_kwargs = kerb_kwargs or {}
+        self.pars = {}
+        self.proxy = data_proxy or {}
+        if token is not None:
+            if user is not None or proxy_to is not None:
+                raise ValueError(
+                    "If passing a delegation token, must not set "
+                    "user or proxy_to, as these are encoded in the"
+                    " token"
+                )
+            self.pars["delegation"] = token
+        self.user = user
+        self.password = password
+
+        if password is not None:
+            if user is None:
+                raise ValueError(
+                    "If passing a password, the user must also be"
+                    "set in order to set up the basic-auth"
+                )
+        else:
+            if user is not None:
+                self.pars["user.name"] = user
+
+        if proxy_to is not None:
+            self.pars["doas"] = proxy_to
+        if kerberos and user is not None:
+            raise ValueError(
+                "If using Kerberos auth, do not specify the "
+                "user, this is handled by kinit."
+            )
+
+        self.session_cert = session_cert
+        self.session_verify = session_verify
+
+        self._connect()
+
+        self._fsid = f"webhdfs_{tokenize(host, port)}"
+
+    @property
+    def fsid(self):
+        return self._fsid
+
+    def _connect(self):
+        self.session = requests.Session()
+
+        if self.session_cert:
+            self.session.cert = self.session_cert
+
+        self.session.verify = self.session_verify
+
+        if self.kerb:
+            from requests_kerberos import HTTPKerberosAuth
+
+            self.session.auth = HTTPKerberosAuth(**self.kerb_kwargs)
+
+        if self.user is not None and self.password is not None:
+            from requests.auth import HTTPBasicAuth
+
+            self.session.auth = HTTPBasicAuth(self.user, self.password)
+
+    def _call(self, op, method="get", path=None, data=None, redirect=True, **kwargs):
+        path = self._strip_protocol(path) if path is not None else ""
+        url = self._apply_proxy(self.url + quote(path, safe="/="))
+        args = kwargs.copy()
+        args.update(self.pars)
+        args["op"] = op.upper()
+        logger.debug("sending %s with %s", url, method)
+        out = self.session.request(
+            method=method.upper(),
+            url=url,
+            params=args,
+            data=data,
+            allow_redirects=redirect,
+        )
+        if out.status_code in [400, 401, 403, 404, 500]:
+            try:
+                err = out.json()
+                msg = err["RemoteException"]["message"]
+                exp = err["RemoteException"]["exception"]
+            except (ValueError, KeyError):
+                pass
+            else:
+                if exp in ["IllegalArgumentException", "UnsupportedOperationException"]:
+                    raise ValueError(msg)
+                elif exp in ["SecurityException", "AccessControlException"]:
+                    raise PermissionError(msg)
+                elif exp in ["FileNotFoundException"]:
+                    raise FileNotFoundError(msg)
+                else:
+                    raise RuntimeError(msg)
+        out.raise_for_status()
+        return out
+
+    def _open(
+        self,
+        path,
+        mode="rb",
+        block_size=None,
+        autocommit=True,
+        replication=None,
+        permissions=None,
+        **kwargs,
+    ):
+        """
+
+        Parameters
+        ----------
+        path: str
+            File location
+        mode: str
+            'rb', 'wb', etc.
+        block_size: int
+            Client buffer size for read-ahead or write buffer
+        autocommit: bool
+            If False, writes to temporary file that only gets put in final
+            location upon commit
+        replication: int
+            Number of copies of file on the cluster, write mode only
+        permissions: str or int
+            posix permissions, write mode only
+        kwargs
+
+        Returns
+        -------
+        WebHDFile instance
+        """
+        block_size = block_size or self.blocksize
+        return WebHDFile(
+            self,
+            path,
+            mode=mode,
+            block_size=block_size,
+            tempdir=self.tempdir,
+            autocommit=autocommit,
+            replication=replication,
+            permissions=permissions,
+        )
+
+    @staticmethod
+    def _process_info(info):
+        info["type"] = info["type"].lower()
+        info["size"] = info["length"]
+        return info
+
+    @classmethod
+    def _strip_protocol(cls, path):
+        return infer_storage_options(path)["path"]
+
+    @staticmethod
+    def _get_kwargs_from_urls(urlpath):
+        out = infer_storage_options(urlpath)
+        out.pop("path", None)
+        out.pop("protocol", None)
+        if "username" in out:
+            out["user"] = out.pop("username")
+        return out
+
+    def info(self, path):
+        out = self._call("GETFILESTATUS", path=path)
+        info = out.json()["FileStatus"]
+        info["name"] = path
+        return self._process_info(info)
+
+    def ls(self, path, detail=False):
+        out = self._call("LISTSTATUS", path=path)
+        infos = out.json()["FileStatuses"]["FileStatus"]
+        for info in infos:
+            self._process_info(info)
+            info["name"] = path.rstrip("/") + "/" + info["pathSuffix"]
+        if detail:
+            return sorted(infos, key=lambda i: i["name"])
+        else:
+            return sorted(info["name"] for info in infos)
+
+    def content_summary(self, path):
+        """Total numbers of files, directories and bytes under path"""
+        out = self._call("GETCONTENTSUMMARY", path=path)
+        return out.json()["ContentSummary"]
+
+    def ukey(self, path):
+        """Checksum info of file, giving method and result"""
+        out = self._call("GETFILECHECKSUM", path=path, redirect=False)
+        if "Location" in out.headers:
+            location = self._apply_proxy(out.headers["Location"])
+            out2 = self.session.get(location)
+            out2.raise_for_status()
+            return out2.json()["FileChecksum"]
+        else:
+            out.raise_for_status()
+            return out.json()["FileChecksum"]
+
+    def home_directory(self):
+        """Get user's home directory"""
+        out = self._call("GETHOMEDIRECTORY")
+        return out.json()["Path"]
+
+    def get_delegation_token(self, renewer=None):
+        """Retrieve token which can give the same authority to other uses
+
+        Parameters
+        ----------
+        renewer: str or None
+            User who may use this token; if None, will be current user
+        """
+        if renewer:
+            out = self._call("GETDELEGATIONTOKEN", renewer=renewer)
+        else:
+            out = self._call("GETDELEGATIONTOKEN")
+        t = out.json()["Token"]
+        if t is None:
+            raise ValueError("No token available for this user/security context")
+        return t["urlString"]
+
+    def renew_delegation_token(self, token):
+        """Make token live longer. Returns new expiry time"""
+        out = self._call("RENEWDELEGATIONTOKEN", method="put", token=token)
+        return out.json()["long"]
+
+    def cancel_delegation_token(self, token):
+        """Stop the token from being useful"""
+        self._call("CANCELDELEGATIONTOKEN", method="put", token=token)
+
+    def chmod(self, path, mod):
+        """Set the permission at path
+
+        Parameters
+        ----------
+        path: str
+            location to set (file or directory)
+        mod: str or int
+            posix epresentation or permission, give as oct string, e.g, '777'
+            or 0o777
+        """
+        self._call("SETPERMISSION", method="put", path=path, permission=mod)
+
+    def chown(self, path, owner=None, group=None):
+        """Change owning user and/or group"""
+        kwargs = {}
+        if owner is not None:
+            kwargs["owner"] = owner
+        if group is not None:
+            kwargs["group"] = group
+        self._call("SETOWNER", method="put", path=path, **kwargs)
+
+    def set_replication(self, path, replication):
+        """
+        Set file replication factor
+
+        Parameters
+        ----------
+        path: str
+            File location (not for directories)
+        replication: int
+            Number of copies of file on the cluster. Should be smaller than
+            number of data nodes; normally 3 on most systems.
+        """
+        self._call("SETREPLICATION", path=path, method="put", replication=replication)
+
+    def mkdir(self, path, **kwargs):
+        self._call("MKDIRS", method="put", path=path)
+
+    def makedirs(self, path, exist_ok=False):
+        if exist_ok is False and self.exists(path):
+            raise FileExistsError(path)
+        self.mkdir(path)
+
+    def mv(self, path1, path2, **kwargs):
+        self._call("RENAME", method="put", path=path1, destination=path2)
+
+    def rm(self, path, recursive=False, **kwargs):
+        self._call(
+            "DELETE",
+            method="delete",
+            path=path,
+            recursive="true" if recursive else "false",
+        )
+
+    def rm_file(self, path, **kwargs):
+        self.rm(path)
+
+    def cp_file(self, lpath, rpath, **kwargs):
+        with self.open(lpath) as lstream:
+            tmp_fname = "/".join([self._parent(rpath), f".tmp.{secrets.token_hex(16)}"])
+            # Perform an atomic copy (stream to a temporary file and
+            # move it to the actual destination).
+            try:
+                with self.open(tmp_fname, "wb") as rstream:
+                    shutil.copyfileobj(lstream, rstream)
+                self.mv(tmp_fname, rpath)
+            except BaseException:
+                with suppress(FileNotFoundError):
+                    self.rm(tmp_fname)
+                raise
+
+    def _apply_proxy(self, location):
+        if self.proxy and callable(self.proxy):
+            location = self.proxy(location)
+        elif self.proxy:
+            # as a dict
+            for k, v in self.proxy.items():
+                location = location.replace(k, v, 1)
+        return location
+
+
+class WebHDFile(AbstractBufferedFile):
+    """A file living in HDFS over webHDFS"""
+
+    def __init__(self, fs, path, **kwargs):
+        super().__init__(fs, path, **kwargs)
+        kwargs = kwargs.copy()
+        if kwargs.get("permissions", None) is None:
+            kwargs.pop("permissions", None)
+        if kwargs.get("replication", None) is None:
+            kwargs.pop("replication", None)
+        self.permissions = kwargs.pop("permissions", 511)
+        tempdir = kwargs.pop("tempdir")
+        if kwargs.pop("autocommit", False) is False:
+            self.target = self.path
+            self.path = os.path.join(tempdir, str(uuid.uuid4()))
+
+    def _upload_chunk(self, final=False):
+        """Write one part of a multi-block file upload
+
+        Parameters
+        ==========
+        final: bool
+            This is the last block, so should complete file, if
+            self.autocommit is True.
+        """
+        out = self.fs.session.post(
+            self.location,
+            data=self.buffer.getvalue(),
+            headers={"content-type": "application/octet-stream"},
+        )
+        out.raise_for_status()
+        return True
+
+    def _initiate_upload(self):
+        """Create remote file/upload"""
+        kwargs = self.kwargs.copy()
+        if "a" in self.mode:
+            op, method = "APPEND", "POST"
+        else:
+            op, method = "CREATE", "PUT"
+            kwargs["overwrite"] = "true"
+        out = self.fs._call(op, method, self.path, redirect=False, **kwargs)
+        location = self.fs._apply_proxy(out.headers["Location"])
+        if "w" in self.mode:
+            # create empty file to append to
+            out2 = self.fs.session.put(
+                location, headers={"content-type": "application/octet-stream"}
+            )
+            out2.raise_for_status()
+            # after creating empty file, change location to append to
+            out2 = self.fs._call("APPEND", "POST", self.path, redirect=False, **kwargs)
+            self.location = self.fs._apply_proxy(out2.headers["Location"])
+
+    def _fetch_range(self, start, end):
+        start = max(start, 0)
+        end = min(self.size, end)
+        if start >= end or start >= self.size:
+            return b""
+        out = self.fs._call(
+            "OPEN", path=self.path, offset=start, length=end - start, redirect=False
+        )
+        out.raise_for_status()
+        if "Location" in out.headers:
+            location = out.headers["Location"]
+            out2 = self.fs.session.get(self.fs._apply_proxy(location))
+            return out2.content
+        else:
+            return out.content
+
+    def commit(self):
+        self.fs.mv(self.path, self.target)
+
+    def discard(self):
+        self.fs.rm(self.path)