about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/implementations/dask.py
diff options
context:
space:
mode:
authorS. Solomon Darnell2025-03-28 21:52:21 -0500
committerS. Solomon Darnell2025-03-28 21:52:21 -0500
commit4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch)
treeee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/fsspec/implementations/dask.py
parentcc961e04ba734dd72309fb548a2f97d67d578813 (diff)
downloadgn-ai-4a52a71956a8d46fcb7294ac71734504bb09bcc2.tar.gz
two version of R2R are here HEAD master
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/implementations/dask.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/implementations/dask.py152
1 files changed, 152 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/implementations/dask.py b/.venv/lib/python3.12/site-packages/fsspec/implementations/dask.py
new file mode 100644
index 00000000..3e127646
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/implementations/dask.py
@@ -0,0 +1,152 @@
+import dask
+from distributed.client import Client, _get_global_client
+from distributed.worker import Worker
+
+from fsspec import filesystem
+from fsspec.spec import AbstractBufferedFile, AbstractFileSystem
+from fsspec.utils import infer_storage_options
+
+
+def _get_client(client):
+    if client is None:
+        return _get_global_client()
+    elif isinstance(client, Client):
+        return client
+    else:
+        # e.g., connection string
+        return Client(client)
+
+
+def _in_worker():
+    return bool(Worker._instances)
+
+
+class DaskWorkerFileSystem(AbstractFileSystem):
+    """View files accessible to a worker as any other remote file-system
+
+    When instances are run on the worker, uses the real filesystem. When
+    run on the client, they call the worker to provide information or data.
+
+    **Warning** this implementation is experimental, and read-only for now.
+    """
+
+    def __init__(
+        self, target_protocol=None, target_options=None, fs=None, client=None, **kwargs
+    ):
+        super().__init__(**kwargs)
+        if not (fs is None) ^ (target_protocol is None):
+            raise ValueError(
+                "Please provide one of filesystem instance (fs) or"
+                " target_protocol, not both"
+            )
+        self.target_protocol = target_protocol
+        self.target_options = target_options
+        self.worker = None
+        self.client = client
+        self.fs = fs
+        self._determine_worker()
+
+    @staticmethod
+    def _get_kwargs_from_urls(path):
+        so = infer_storage_options(path)
+        if "host" in so and "port" in so:
+            return {"client": f"{so['host']}:{so['port']}"}
+        else:
+            return {}
+
+    def _determine_worker(self):
+        if _in_worker():
+            self.worker = True
+            if self.fs is None:
+                self.fs = filesystem(
+                    self.target_protocol, **(self.target_options or {})
+                )
+        else:
+            self.worker = False
+            self.client = _get_client(self.client)
+            self.rfs = dask.delayed(self)
+
+    def mkdir(self, *args, **kwargs):
+        if self.worker:
+            self.fs.mkdir(*args, **kwargs)
+        else:
+            self.rfs.mkdir(*args, **kwargs).compute()
+
+    def rm(self, *args, **kwargs):
+        if self.worker:
+            self.fs.rm(*args, **kwargs)
+        else:
+            self.rfs.rm(*args, **kwargs).compute()
+
+    def copy(self, *args, **kwargs):
+        if self.worker:
+            self.fs.copy(*args, **kwargs)
+        else:
+            self.rfs.copy(*args, **kwargs).compute()
+
+    def mv(self, *args, **kwargs):
+        if self.worker:
+            self.fs.mv(*args, **kwargs)
+        else:
+            self.rfs.mv(*args, **kwargs).compute()
+
+    def ls(self, *args, **kwargs):
+        if self.worker:
+            return self.fs.ls(*args, **kwargs)
+        else:
+            return self.rfs.ls(*args, **kwargs).compute()
+
+    def _open(
+        self,
+        path,
+        mode="rb",
+        block_size=None,
+        autocommit=True,
+        cache_options=None,
+        **kwargs,
+    ):
+        if self.worker:
+            return self.fs._open(
+                path,
+                mode=mode,
+                block_size=block_size,
+                autocommit=autocommit,
+                cache_options=cache_options,
+                **kwargs,
+            )
+        else:
+            return DaskFile(
+                fs=self,
+                path=path,
+                mode=mode,
+                block_size=block_size,
+                autocommit=autocommit,
+                cache_options=cache_options,
+                **kwargs,
+            )
+
+    def fetch_range(self, path, mode, start, end):
+        if self.worker:
+            with self._open(path, mode) as f:
+                f.seek(start)
+                return f.read(end - start)
+        else:
+            return self.rfs.fetch_range(path, mode, start, end).compute()
+
+
+class DaskFile(AbstractBufferedFile):
+    def __init__(self, mode="rb", **kwargs):
+        if mode != "rb":
+            raise ValueError('Remote dask files can only be opened in "rb" mode')
+        super().__init__(**kwargs)
+
+    def _upload_chunk(self, final=False):
+        pass
+
+    def _initiate_upload(self):
+        """Create remote file/upload"""
+        pass
+
+    def _fetch_range(self, start, end):
+        """Get the specified set of bytes from remote"""
+        return self.fs.fetch_range(self.path, self.mode, start, end)