diff options
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.py | 485 |
1 files changed, 485 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.py new file mode 100644 index 00000000..c6e0d844 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/webhdfs.py @@ -0,0 +1,485 @@ +# https://hadoop.apache.org/docs/r1.0.4/webhdfs.html + +import logging +import os +import secrets +import shutil +import tempfile +import uuid +from contextlib import suppress +from urllib.parse import quote + +import requests + +from ..spec import AbstractBufferedFile, AbstractFileSystem +from ..utils import infer_storage_options, tokenize + +logger = logging.getLogger("webhdfs") + + +class WebHDFS(AbstractFileSystem): + """ + Interface to HDFS over HTTP using the WebHDFS API. Supports also HttpFS gateways. + + Four auth mechanisms are supported: + + insecure: no auth is done, and the user is assumed to be whoever they + say they are (parameter ``user``), or a predefined value such as + "dr.who" if not given + spnego: when kerberos authentication is enabled, auth is negotiated by + requests_kerberos https://github.com/requests/requests-kerberos . + This establishes a session based on existing kinit login and/or + specified principal/password; parameters are passed with ``kerb_kwargs`` + token: uses an existing Hadoop delegation token from another secured + service. Indeed, this client can also generate such tokens when + not insecure. Note that tokens expire, but can be renewed (by a + previously specified user) and may allow for proxying. + basic-auth: used when both parameter ``user`` and parameter ``password`` + are provided. + + """ + + tempdir = str(tempfile.gettempdir()) + protocol = "webhdfs", "webHDFS" + + def __init__( + self, + host, + port=50070, + kerberos=False, + token=None, + user=None, + password=None, + proxy_to=None, + kerb_kwargs=None, + data_proxy=None, + use_https=False, + session_cert=None, + session_verify=True, + **kwargs, + ): + """ + Parameters + ---------- + host: str + Name-node address + port: int + Port for webHDFS + kerberos: bool + Whether to authenticate with kerberos for this connection + token: str or None + If given, use this token on every call to authenticate. A user + and user-proxy may be encoded in the token and should not be also + given + user: str or None + If given, assert the user name to connect with + password: str or None + If given, assert the password to use for basic auth. If password + is provided, user must be provided also + proxy_to: str or None + If given, the user has the authority to proxy, and this value is + the user in who's name actions are taken + kerb_kwargs: dict + Any extra arguments for HTTPKerberosAuth, see + `<https://github.com/requests/requests-kerberos/blob/master/requests_kerberos/kerberos_.py>`_ + data_proxy: dict, callable or None + If given, map data-node addresses. This can be necessary if the + HDFS cluster is behind a proxy, running on Docker or otherwise has + a mismatch between the host-names given by the name-node and the + address by which to refer to them from the client. If a dict, + maps host names ``host->data_proxy[host]``; if a callable, full + URLs are passed, and function must conform to + ``url->data_proxy(url)``. + use_https: bool + Whether to connect to the Name-node using HTTPS instead of HTTP + session_cert: str or Tuple[str, str] or None + Path to a certificate file, or tuple of (cert, key) files to use + for the requests.Session + session_verify: str, bool or None + Path to a certificate file to use for verifying the requests.Session. + kwargs + """ + if self._cached: + return + super().__init__(**kwargs) + self.url = f"{'https' if use_https else 'http'}://{host}:{port}/webhdfs/v1" + self.kerb = kerberos + self.kerb_kwargs = kerb_kwargs or {} + self.pars = {} + self.proxy = data_proxy or {} + if token is not None: + if user is not None or proxy_to is not None: + raise ValueError( + "If passing a delegation token, must not set " + "user or proxy_to, as these are encoded in the" + " token" + ) + self.pars["delegation"] = token + self.user = user + self.password = password + + if password is not None: + if user is None: + raise ValueError( + "If passing a password, the user must also be" + "set in order to set up the basic-auth" + ) + else: + if user is not None: + self.pars["user.name"] = user + + if proxy_to is not None: + self.pars["doas"] = proxy_to + if kerberos and user is not None: + raise ValueError( + "If using Kerberos auth, do not specify the " + "user, this is handled by kinit." + ) + + self.session_cert = session_cert + self.session_verify = session_verify + + self._connect() + + self._fsid = f"webhdfs_{tokenize(host, port)}" + + @property + def fsid(self): + return self._fsid + + def _connect(self): + self.session = requests.Session() + + if self.session_cert: + self.session.cert = self.session_cert + + self.session.verify = self.session_verify + + if self.kerb: + from requests_kerberos import HTTPKerberosAuth + + self.session.auth = HTTPKerberosAuth(**self.kerb_kwargs) + + if self.user is not None and self.password is not None: + from requests.auth import HTTPBasicAuth + + self.session.auth = HTTPBasicAuth(self.user, self.password) + + def _call(self, op, method="get", path=None, data=None, redirect=True, **kwargs): + path = self._strip_protocol(path) if path is not None else "" + url = self._apply_proxy(self.url + quote(path, safe="/=")) + args = kwargs.copy() + args.update(self.pars) + args["op"] = op.upper() + logger.debug("sending %s with %s", url, method) + out = self.session.request( + method=method.upper(), + url=url, + params=args, + data=data, + allow_redirects=redirect, + ) + if out.status_code in [400, 401, 403, 404, 500]: + try: + err = out.json() + msg = err["RemoteException"]["message"] + exp = err["RemoteException"]["exception"] + except (ValueError, KeyError): + pass + else: + if exp in ["IllegalArgumentException", "UnsupportedOperationException"]: + raise ValueError(msg) + elif exp in ["SecurityException", "AccessControlException"]: + raise PermissionError(msg) + elif exp in ["FileNotFoundException"]: + raise FileNotFoundError(msg) + else: + raise RuntimeError(msg) + out.raise_for_status() + return out + + def _open( + self, + path, + mode="rb", + block_size=None, + autocommit=True, + replication=None, + permissions=None, + **kwargs, + ): + """ + + Parameters + ---------- + path: str + File location + mode: str + 'rb', 'wb', etc. + block_size: int + Client buffer size for read-ahead or write buffer + autocommit: bool + If False, writes to temporary file that only gets put in final + location upon commit + replication: int + Number of copies of file on the cluster, write mode only + permissions: str or int + posix permissions, write mode only + kwargs + + Returns + ------- + WebHDFile instance + """ + block_size = block_size or self.blocksize + return WebHDFile( + self, + path, + mode=mode, + block_size=block_size, + tempdir=self.tempdir, + autocommit=autocommit, + replication=replication, + permissions=permissions, + ) + + @staticmethod + def _process_info(info): + info["type"] = info["type"].lower() + info["size"] = info["length"] + return info + + @classmethod + def _strip_protocol(cls, path): + return infer_storage_options(path)["path"] + + @staticmethod + def _get_kwargs_from_urls(urlpath): + out = infer_storage_options(urlpath) + out.pop("path", None) + out.pop("protocol", None) + if "username" in out: + out["user"] = out.pop("username") + return out + + def info(self, path): + out = self._call("GETFILESTATUS", path=path) + info = out.json()["FileStatus"] + info["name"] = path + return self._process_info(info) + + def ls(self, path, detail=False): + out = self._call("LISTSTATUS", path=path) + infos = out.json()["FileStatuses"]["FileStatus"] + for info in infos: + self._process_info(info) + info["name"] = path.rstrip("/") + "/" + info["pathSuffix"] + if detail: + return sorted(infos, key=lambda i: i["name"]) + else: + return sorted(info["name"] for info in infos) + + def content_summary(self, path): + """Total numbers of files, directories and bytes under path""" + out = self._call("GETCONTENTSUMMARY", path=path) + return out.json()["ContentSummary"] + + def ukey(self, path): + """Checksum info of file, giving method and result""" + out = self._call("GETFILECHECKSUM", path=path, redirect=False) + if "Location" in out.headers: + location = self._apply_proxy(out.headers["Location"]) + out2 = self.session.get(location) + out2.raise_for_status() + return out2.json()["FileChecksum"] + else: + out.raise_for_status() + return out.json()["FileChecksum"] + + def home_directory(self): + """Get user's home directory""" + out = self._call("GETHOMEDIRECTORY") + return out.json()["Path"] + + def get_delegation_token(self, renewer=None): + """Retrieve token which can give the same authority to other uses + + Parameters + ---------- + renewer: str or None + User who may use this token; if None, will be current user + """ + if renewer: + out = self._call("GETDELEGATIONTOKEN", renewer=renewer) + else: + out = self._call("GETDELEGATIONTOKEN") + t = out.json()["Token"] + if t is None: + raise ValueError("No token available for this user/security context") + return t["urlString"] + + def renew_delegation_token(self, token): + """Make token live longer. Returns new expiry time""" + out = self._call("RENEWDELEGATIONTOKEN", method="put", token=token) + return out.json()["long"] + + def cancel_delegation_token(self, token): + """Stop the token from being useful""" + self._call("CANCELDELEGATIONTOKEN", method="put", token=token) + + def chmod(self, path, mod): + """Set the permission at path + + Parameters + ---------- + path: str + location to set (file or directory) + mod: str or int + posix epresentation or permission, give as oct string, e.g, '777' + or 0o777 + """ + self._call("SETPERMISSION", method="put", path=path, permission=mod) + + def chown(self, path, owner=None, group=None): + """Change owning user and/or group""" + kwargs = {} + if owner is not None: + kwargs["owner"] = owner + if group is not None: + kwargs["group"] = group + self._call("SETOWNER", method="put", path=path, **kwargs) + + def set_replication(self, path, replication): + """ + Set file replication factor + + Parameters + ---------- + path: str + File location (not for directories) + replication: int + Number of copies of file on the cluster. Should be smaller than + number of data nodes; normally 3 on most systems. + """ + self._call("SETREPLICATION", path=path, method="put", replication=replication) + + def mkdir(self, path, **kwargs): + self._call("MKDIRS", method="put", path=path) + + def makedirs(self, path, exist_ok=False): + if exist_ok is False and self.exists(path): + raise FileExistsError(path) + self.mkdir(path) + + def mv(self, path1, path2, **kwargs): + self._call("RENAME", method="put", path=path1, destination=path2) + + def rm(self, path, recursive=False, **kwargs): + self._call( + "DELETE", + method="delete", + path=path, + recursive="true" if recursive else "false", + ) + + def rm_file(self, path, **kwargs): + self.rm(path) + + def cp_file(self, lpath, rpath, **kwargs): + with self.open(lpath) as lstream: + tmp_fname = "/".join([self._parent(rpath), f".tmp.{secrets.token_hex(16)}"]) + # Perform an atomic copy (stream to a temporary file and + # move it to the actual destination). + try: + with self.open(tmp_fname, "wb") as rstream: + shutil.copyfileobj(lstream, rstream) + self.mv(tmp_fname, rpath) + except BaseException: + with suppress(FileNotFoundError): + self.rm(tmp_fname) + raise + + def _apply_proxy(self, location): + if self.proxy and callable(self.proxy): + location = self.proxy(location) + elif self.proxy: + # as a dict + for k, v in self.proxy.items(): + location = location.replace(k, v, 1) + return location + + +class WebHDFile(AbstractBufferedFile): + """A file living in HDFS over webHDFS""" + + def __init__(self, fs, path, **kwargs): + super().__init__(fs, path, **kwargs) + kwargs = kwargs.copy() + if kwargs.get("permissions", None) is None: + kwargs.pop("permissions", None) + if kwargs.get("replication", None) is None: + kwargs.pop("replication", None) + self.permissions = kwargs.pop("permissions", 511) + tempdir = kwargs.pop("tempdir") + if kwargs.pop("autocommit", False) is False: + self.target = self.path + self.path = os.path.join(tempdir, str(uuid.uuid4())) + + def _upload_chunk(self, final=False): + """Write one part of a multi-block file upload + + Parameters + ========== + final: bool + This is the last block, so should complete file, if + self.autocommit is True. + """ + out = self.fs.session.post( + self.location, + data=self.buffer.getvalue(), + headers={"content-type": "application/octet-stream"}, + ) + out.raise_for_status() + return True + + def _initiate_upload(self): + """Create remote file/upload""" + kwargs = self.kwargs.copy() + if "a" in self.mode: + op, method = "APPEND", "POST" + else: + op, method = "CREATE", "PUT" + kwargs["overwrite"] = "true" + out = self.fs._call(op, method, self.path, redirect=False, **kwargs) + location = self.fs._apply_proxy(out.headers["Location"]) + if "w" in self.mode: + # create empty file to append to + out2 = self.fs.session.put( + location, headers={"content-type": "application/octet-stream"} + ) + out2.raise_for_status() + # after creating empty file, change location to append to + out2 = self.fs._call("APPEND", "POST", self.path, redirect=False, **kwargs) + self.location = self.fs._apply_proxy(out2.headers["Location"]) + + def _fetch_range(self, start, end): + start = max(start, 0) + end = min(self.size, end) + if start >= end or start >= self.size: + return b"" + out = self.fs._call( + "OPEN", path=self.path, offset=start, length=end - start, redirect=False + ) + out.raise_for_status() + if "Location" in out.headers: + location = out.headers["Location"] + out2 = self.fs.session.get(self.fs._apply_proxy(location)) + return out2.content + else: + return out.content + + def commit(self): + self.fs.mv(self.path, self.target) + + def discard(self): + self.fs.rm(self.path) |