1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
import datetime
import logging
import os
import types
import uuid
from stat import S_ISDIR, S_ISLNK
import paramiko
from .. import AbstractFileSystem
from ..utils import infer_storage_options
logger = logging.getLogger("fsspec.sftp")
class SFTPFileSystem(AbstractFileSystem):
"""Files over SFTP/SSH
Peer-to-peer filesystem over SSH using paramiko.
Note: if using this with the ``open`` or ``open_files``, with full URLs,
there is no way to tell if a path is relative, so all paths are assumed
to be absolute.
"""
protocol = "sftp", "ssh"
def __init__(self, host, **ssh_kwargs):
"""
Parameters
----------
host: str
Hostname or IP as a string
temppath: str
Location on the server to put files, when within a transaction
ssh_kwargs: dict
Parameters passed on to connection. See details in
https://docs.paramiko.org/en/3.3/api/client.html#paramiko.client.SSHClient.connect
May include port, username, password...
"""
if self._cached:
return
super().__init__(**ssh_kwargs)
self.temppath = ssh_kwargs.pop("temppath", "/tmp") # remote temp directory
self.host = host
self.ssh_kwargs = ssh_kwargs
self._connect()
def _connect(self):
logger.debug("Connecting to SFTP server %s", self.host)
self.client = paramiko.SSHClient()
self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
self.client.connect(self.host, **self.ssh_kwargs)
self.ftp = self.client.open_sftp()
@classmethod
def _strip_protocol(cls, path):
return infer_storage_options(path)["path"]
@staticmethod
def _get_kwargs_from_urls(urlpath):
out = infer_storage_options(urlpath)
out.pop("path", None)
out.pop("protocol", None)
return out
def mkdir(self, path, create_parents=True, mode=511):
logger.debug("Creating folder %s", path)
if self.exists(path):
raise FileExistsError(f"File exists: {path}")
if create_parents:
self.makedirs(path)
else:
self.ftp.mkdir(path, mode)
def makedirs(self, path, exist_ok=False, mode=511):
if self.exists(path) and not exist_ok:
raise FileExistsError(f"File exists: {path}")
parts = path.split("/")
new_path = "/" if path[:1] == "/" else ""
for part in parts:
if part:
new_path = f"{new_path}/{part}" if new_path else part
if not self.exists(new_path):
self.ftp.mkdir(new_path, mode)
def rmdir(self, path):
logger.debug("Removing folder %s", path)
self.ftp.rmdir(path)
def info(self, path):
stat = self._decode_stat(self.ftp.stat(path))
stat["name"] = path
return stat
@staticmethod
def _decode_stat(stat, parent_path=None):
if S_ISDIR(stat.st_mode):
t = "directory"
elif S_ISLNK(stat.st_mode):
t = "link"
else:
t = "file"
out = {
"name": "",
"size": stat.st_size,
"type": t,
"uid": stat.st_uid,
"gid": stat.st_gid,
"time": datetime.datetime.fromtimestamp(
stat.st_atime, tz=datetime.timezone.utc
),
"mtime": datetime.datetime.fromtimestamp(
stat.st_mtime, tz=datetime.timezone.utc
),
}
if parent_path:
out["name"] = "/".join([parent_path.rstrip("/"), stat.filename])
return out
def ls(self, path, detail=False):
logger.debug("Listing folder %s", path)
stats = [self._decode_stat(stat, path) for stat in self.ftp.listdir_iter(path)]
if detail:
return stats
else:
paths = [stat["name"] for stat in stats]
return sorted(paths)
def put(self, lpath, rpath, callback=None, **kwargs):
logger.debug("Put file %s into %s", lpath, rpath)
self.ftp.put(lpath, rpath)
def get_file(self, rpath, lpath, **kwargs):
if self.isdir(rpath):
os.makedirs(lpath, exist_ok=True)
else:
self.ftp.get(self._strip_protocol(rpath), lpath)
def _open(self, path, mode="rb", block_size=None, **kwargs):
"""
block_size: int or None
If 0, no buffering, if 1, line buffering, if >1, buffer that many
bytes, if None use default from paramiko.
"""
logger.debug("Opening file %s", path)
if kwargs.get("autocommit", True) is False:
# writes to temporary file, move on commit
path2 = "/".join([self.temppath, str(uuid.uuid4())])
f = self.ftp.open(path2, mode, bufsize=block_size if block_size else -1)
f.temppath = path2
f.targetpath = path
f.fs = self
f.commit = types.MethodType(commit_a_file, f)
f.discard = types.MethodType(discard_a_file, f)
else:
f = self.ftp.open(path, mode, bufsize=block_size if block_size else -1)
return f
def _rm(self, path):
if self.isdir(path):
self.ftp.rmdir(path)
else:
self.ftp.remove(path)
def mv(self, old, new):
logger.debug("Renaming %s into %s", old, new)
self.ftp.posix_rename(old, new)
def commit_a_file(self):
self.fs.mv(self.temppath, self.targetpath)
def discard_a_file(self):
self.fs._rm(self.temppath)
|