aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/fsspec/implementations/ftp.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/ftp.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/implementations/ftp.py395
1 files changed, 395 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/ftp.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/ftp.py
new file mode 100644
index 00000000..4186b6ce
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/ftp.py
@@ -0,0 +1,395 @@
+import os
+import sys
+import uuid
+import warnings
+from ftplib import FTP, FTP_TLS, Error, error_perm
+from typing import Any
+
+from ..spec import AbstractBufferedFile, AbstractFileSystem
+from ..utils import infer_storage_options, isfilelike
+
+
+class FTPFileSystem(AbstractFileSystem):
+ """A filesystem over classic FTP"""
+
+ root_marker = "/"
+ cachable = False
+ protocol = "ftp"
+
+ def __init__(
+ self,
+ host,
+ port=21,
+ username=None,
+ password=None,
+ acct=None,
+ block_size=None,
+ tempdir=None,
+ timeout=30,
+ encoding="utf-8",
+ tls=False,
+ **kwargs,
+ ):
+ """
+ You can use _get_kwargs_from_urls to get some kwargs from
+ a reasonable FTP url.
+
+ Authentication will be anonymous if username/password are not
+ given.
+
+ Parameters
+ ----------
+ host: str
+ The remote server name/ip to connect to
+ port: int
+ Port to connect with
+ username: str or None
+ If authenticating, the user's identifier
+ password: str of None
+ User's password on the server, if using
+ acct: str or None
+ Some servers also need an "account" string for auth
+ block_size: int or None
+ If given, the read-ahead or write buffer size.
+ tempdir: str
+ Directory on remote to put temporary files when in a transaction
+ timeout: int
+ Timeout of the ftp connection in seconds
+ encoding: str
+ Encoding to use for directories and filenames in FTP connection
+ tls: bool
+ Use FTP-TLS, by default False
+ """
+ super().__init__(**kwargs)
+ self.host = host
+ self.port = port
+ self.tempdir = tempdir or "/tmp"
+ self.cred = username or "", password or "", acct or ""
+ self.timeout = timeout
+ self.encoding = encoding
+ if block_size is not None:
+ self.blocksize = block_size
+ else:
+ self.blocksize = 2**16
+ self.tls = tls
+ self._connect()
+ if self.tls:
+ self.ftp.prot_p()
+
+ def _connect(self):
+ if self.tls:
+ ftp_cls = FTP_TLS
+ else:
+ ftp_cls = FTP
+ if sys.version_info >= (3, 9):
+ self.ftp = ftp_cls(timeout=self.timeout, encoding=self.encoding)
+ elif self.encoding:
+ warnings.warn("`encoding` not supported for python<3.9, ignoring")
+ self.ftp = ftp_cls(timeout=self.timeout)
+ else:
+ self.ftp = ftp_cls(timeout=self.timeout)
+ self.ftp.connect(self.host, self.port)
+ self.ftp.login(*self.cred)
+
+ @classmethod
+ def _strip_protocol(cls, path):
+ return "/" + infer_storage_options(path)["path"].lstrip("/").rstrip("/")
+
+ @staticmethod
+ def _get_kwargs_from_urls(urlpath):
+ out = infer_storage_options(urlpath)
+ out.pop("path", None)
+ out.pop("protocol", None)
+ return out
+
+ def ls(self, path, detail=True, **kwargs):
+ path = self._strip_protocol(path)
+ out = []
+ if path not in self.dircache:
+ try:
+ try:
+ out = [
+ (fn, details)
+ for (fn, details) in self.ftp.mlsd(path)
+ if fn not in [".", ".."]
+ and details["type"] not in ["pdir", "cdir"]
+ ]
+ except error_perm:
+ out = _mlsd2(self.ftp, path) # Not platform independent
+ for fn, details in out:
+ details["name"] = "/".join(
+ ["" if path == "/" else path, fn.lstrip("/")]
+ )
+ if details["type"] == "file":
+ details["size"] = int(details["size"])
+ else:
+ details["size"] = 0
+ if details["type"] == "dir":
+ details["type"] = "directory"
+ self.dircache[path] = out
+ except Error:
+ try:
+ info = self.info(path)
+ if info["type"] == "file":
+ out = [(path, info)]
+ except (Error, IndexError) as exc:
+ raise FileNotFoundError(path) from exc
+ files = self.dircache.get(path, out)
+ if not detail:
+ return sorted([fn for fn, details in files])
+ return [details for fn, details in files]
+
+ def info(self, path, **kwargs):
+ # implement with direct method
+ path = self._strip_protocol(path)
+ if path == "/":
+ # special case, since this dir has no real entry
+ return {"name": "/", "size": 0, "type": "directory"}
+ files = self.ls(self._parent(path).lstrip("/"), True)
+ try:
+ out = next(f for f in files if f["name"] == path)
+ except StopIteration as exc:
+ raise FileNotFoundError(path) from exc
+ return out
+
+ def get_file(self, rpath, lpath, **kwargs):
+ if self.isdir(rpath):
+ if not os.path.exists(lpath):
+ os.mkdir(lpath)
+ return
+ if isfilelike(lpath):
+ outfile = lpath
+ else:
+ outfile = open(lpath, "wb")
+
+ def cb(x):
+ outfile.write(x)
+
+ self.ftp.retrbinary(
+ f"RETR {rpath}",
+ blocksize=self.blocksize,
+ callback=cb,
+ )
+ if not isfilelike(lpath):
+ outfile.close()
+
+ def cat_file(self, path, start=None, end=None, **kwargs):
+ if end is not None:
+ return super().cat_file(path, start, end, **kwargs)
+ out = []
+
+ def cb(x):
+ out.append(x)
+
+ try:
+ self.ftp.retrbinary(
+ f"RETR {path}",
+ blocksize=self.blocksize,
+ rest=start,
+ callback=cb,
+ )
+ except (Error, error_perm) as orig_exc:
+ raise FileNotFoundError(path) from orig_exc
+ return b"".join(out)
+
+ def _open(
+ self,
+ path,
+ mode="rb",
+ block_size=None,
+ cache_options=None,
+ autocommit=True,
+ **kwargs,
+ ):
+ path = self._strip_protocol(path)
+ block_size = block_size or self.blocksize
+ return FTPFile(
+ self,
+ path,
+ mode=mode,
+ block_size=block_size,
+ tempdir=self.tempdir,
+ autocommit=autocommit,
+ cache_options=cache_options,
+ )
+
+ def _rm(self, path):
+ path = self._strip_protocol(path)
+ self.ftp.delete(path)
+ self.invalidate_cache(self._parent(path))
+
+ def rm(self, path, recursive=False, maxdepth=None):
+ paths = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
+ for p in reversed(paths):
+ if self.isfile(p):
+ self.rm_file(p)
+ else:
+ self.rmdir(p)
+
+ def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None:
+ path = self._strip_protocol(path)
+ parent = self._parent(path)
+ if parent != self.root_marker and not self.exists(parent) and create_parents:
+ self.mkdir(parent, create_parents=create_parents)
+
+ self.ftp.mkd(path)
+ self.invalidate_cache(self._parent(path))
+
+ def makedirs(self, path: str, exist_ok: bool = False) -> None:
+ path = self._strip_protocol(path)
+ if self.exists(path):
+ # NB: "/" does not "exist" as it has no directory entry
+ if not exist_ok:
+ raise FileExistsError(f"{path} exists without `exist_ok`")
+ # exists_ok=True -> no-op
+ else:
+ self.mkdir(path, create_parents=True)
+
+ def rmdir(self, path):
+ path = self._strip_protocol(path)
+ self.ftp.rmd(path)
+ self.invalidate_cache(self._parent(path))
+
+ def mv(self, path1, path2, **kwargs):
+ path1 = self._strip_protocol(path1)
+ path2 = self._strip_protocol(path2)
+ self.ftp.rename(path1, path2)
+ self.invalidate_cache(self._parent(path1))
+ self.invalidate_cache(self._parent(path2))
+
+ def __del__(self):
+ self.ftp.close()
+
+ def invalidate_cache(self, path=None):
+ if path is None:
+ self.dircache.clear()
+ else:
+ self.dircache.pop(path, None)
+ super().invalidate_cache(path)
+
+
+class TransferDone(Exception):
+ """Internal exception to break out of transfer"""
+
+ pass
+
+
+class FTPFile(AbstractBufferedFile):
+ """Interact with a remote FTP file with read/write buffering"""
+
+ def __init__(
+ self,
+ fs,
+ path,
+ mode="rb",
+ block_size="default",
+ autocommit=True,
+ cache_type="readahead",
+ cache_options=None,
+ **kwargs,
+ ):
+ super().__init__(
+ fs,
+ path,
+ mode=mode,
+ block_size=block_size,
+ autocommit=autocommit,
+ cache_type=cache_type,
+ cache_options=cache_options,
+ **kwargs,
+ )
+ if not autocommit:
+ self.target = self.path
+ self.path = "/".join([kwargs["tempdir"], str(uuid.uuid4())])
+
+ def commit(self):
+ self.fs.mv(self.path, self.target)
+
+ def discard(self):
+ self.fs.rm(self.path)
+
+ def _fetch_range(self, start, end):
+ """Get bytes between given byte limits
+
+ Implemented by raising an exception in the fetch callback when the
+ number of bytes received reaches the requested amount.
+
+ Will fail if the server does not respect the REST command on
+ retrieve requests.
+ """
+ out = []
+ total = [0]
+
+ def callback(x):
+ total[0] += len(x)
+ if total[0] > end - start:
+ out.append(x[: (end - start) - total[0]])
+ if end < self.size:
+ raise TransferDone
+ else:
+ out.append(x)
+
+ if total[0] == end - start and end < self.size:
+ raise TransferDone
+
+ try:
+ self.fs.ftp.retrbinary(
+ f"RETR {self.path}",
+ blocksize=self.blocksize,
+ rest=start,
+ callback=callback,
+ )
+ except TransferDone:
+ try:
+ # stop transfer, we got enough bytes for this block
+ self.fs.ftp.abort()
+ self.fs.ftp.getmultiline()
+ except Error:
+ self.fs._connect()
+
+ return b"".join(out)
+
+ def _upload_chunk(self, final=False):
+ self.buffer.seek(0)
+ self.fs.ftp.storbinary(
+ f"STOR {self.path}", self.buffer, blocksize=self.blocksize, rest=self.offset
+ )
+ return True
+
+
+def _mlsd2(ftp, path="."):
+ """
+ Fall back to using `dir` instead of `mlsd` if not supported.
+
+ This parses a Linux style `ls -l` response to `dir`, but the response may
+ be platform dependent.
+
+ Parameters
+ ----------
+ ftp: ftplib.FTP
+ path: str
+ Expects to be given path, but defaults to ".".
+ """
+ lines = []
+ minfo = []
+ ftp.dir(path, lines.append)
+ for line in lines:
+ split_line = line.split()
+ if len(split_line) < 9:
+ continue
+ this = (
+ split_line[-1],
+ {
+ "modify": " ".join(split_line[5:8]),
+ "unix.owner": split_line[2],
+ "unix.group": split_line[3],
+ "unix.mode": split_line[0],
+ "size": split_line[4],
+ },
+ )
+ if this[1]["unix.mode"][0] == "d":
+ this[1]["type"] = "dir"
+ else:
+ this[1]["type"] = "file"
+ minfo.append(this)
+ return minfo