about summary refs log tree commit diff
path: root/.venv/lib/python3.12/site-packages/fsspec/gui.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/gui.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/gui.py416
1 files changed, 416 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/gui.py b/.venv/lib/python3.12/site-packages/fsspec/gui.py
new file mode 100644
index 00000000..321379eb
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/gui.py
@@ -0,0 +1,416 @@
+import ast
+import contextlib
+import logging
+import os
+import re
+from typing import ClassVar, Sequence
+
+import panel as pn
+
+from .core import OpenFile, get_filesystem_class, split_protocol
+from .registry import known_implementations
+
+pn.extension()
+logger = logging.getLogger("fsspec.gui")
+
+
+class SigSlot:
+    """Signal-slot mixin, for Panel event passing
+
+    Include this class in a widget manager's superclasses to be able to
+    register events and callbacks on Panel widgets managed by that class.
+
+    The method ``_register`` should be called as widgets are added, and external
+    code should call ``connect`` to associate callbacks.
+
+    By default, all signals emit a DEBUG logging statement.
+    """
+
+    # names of signals that this class may emit each of which must be
+    # set by _register for any new instance
+    signals: ClassVar[Sequence[str]] = []
+    # names of actions that this class may respond to
+    slots: ClassVar[Sequence[str]] = []
+
+    # each of which must be a method name
+
+    def __init__(self):
+        self._ignoring_events = False
+        self._sigs = {}
+        self._map = {}
+        self._setup()
+
+    def _setup(self):
+        """Create GUI elements and register signals"""
+        self.panel = pn.pane.PaneBase()
+        # no signals to set up in the base class
+
+    def _register(
+        self, widget, name, thing="value", log_level=logging.DEBUG, auto=False
+    ):
+        """Watch the given attribute of a widget and assign it a named event
+
+        This is normally called at the time a widget is instantiated, in the
+        class which owns it.
+
+        Parameters
+        ----------
+        widget : pn.layout.Panel or None
+            Widget to watch. If None, an anonymous signal not associated with
+            any widget.
+        name : str
+            Name of this event
+        thing : str
+            Attribute of the given widget to watch
+        log_level : int
+            When the signal is triggered, a logging event of the given level
+            will be fired in the dfviz logger.
+        auto : bool
+            If True, automatically connects with a method in this class of the
+            same name.
+        """
+        if name not in self.signals:
+            raise ValueError(f"Attempt to assign an undeclared signal: {name}")
+        self._sigs[name] = {
+            "widget": widget,
+            "callbacks": [],
+            "thing": thing,
+            "log": log_level,
+        }
+        wn = "-".join(
+            [
+                getattr(widget, "name", str(widget)) if widget is not None else "none",
+                thing,
+            ]
+        )
+        self._map[wn] = name
+        if widget is not None:
+            widget.param.watch(self._signal, thing, onlychanged=True)
+        if auto and hasattr(self, name):
+            self.connect(name, getattr(self, name))
+
+    def _repr_mimebundle_(self, *args, **kwargs):
+        """Display in a notebook or a server"""
+        try:
+            return self.panel._repr_mimebundle_(*args, **kwargs)
+        except (ValueError, AttributeError) as exc:
+            raise NotImplementedError(
+                "Panel does not seem to be set up properly"
+            ) from exc
+
+    def connect(self, signal, slot):
+        """Associate call back with given event
+
+        The callback must be a function which takes the "new" value of the
+        watched attribute as the only parameter. If the callback return False,
+        this cancels any further processing of the given event.
+
+        Alternatively, the callback can be a string, in which case it means
+        emitting the correspondingly-named event (i.e., connect to self)
+        """
+        self._sigs[signal]["callbacks"].append(slot)
+
+    def _signal(self, event):
+        """This is called by a an action on a widget
+
+        Within an self.ignore_events context, nothing happens.
+
+        Tests can execute this method by directly changing the values of
+        widget components.
+        """
+        if not self._ignoring_events:
+            wn = "-".join([event.obj.name, event.name])
+            if wn in self._map and self._map[wn] in self._sigs:
+                self._emit(self._map[wn], event.new)
+
+    @contextlib.contextmanager
+    def ignore_events(self):
+        """Temporarily turn off events processing in this instance
+
+        (does not propagate to children)
+        """
+        self._ignoring_events = True
+        try:
+            yield
+        finally:
+            self._ignoring_events = False
+
+    def _emit(self, sig, value=None):
+        """An event happened, call its callbacks
+
+        This method can be used in tests to simulate message passing without
+        directly changing visual elements.
+
+        Calling of callbacks will halt whenever one returns False.
+        """
+        logger.log(self._sigs[sig]["log"], f"{sig}: {value}")
+        for callback in self._sigs[sig]["callbacks"]:
+            if isinstance(callback, str):
+                self._emit(callback)
+            else:
+                try:
+                    # running callbacks should not break the interface
+                    ret = callback(value)
+                    if ret is False:
+                        break
+                except Exception as e:
+                    logger.exception(
+                        "Exception (%s) while executing callback for signal: %s",
+                        e,
+                        sig,
+                    )
+
+    def show(self, threads=False):
+        """Open a new browser tab and display this instance's interface"""
+        self.panel.show(threads=threads, verbose=False)
+        return self
+
+
+class SingleSelect(SigSlot):
+    """A multiselect which only allows you to select one item for an event"""
+
+    signals = ["_selected", "selected"]  # the first is internal
+    slots = ["set_options", "set_selection", "add", "clear", "select"]
+
+    def __init__(self, **kwargs):
+        self.kwargs = kwargs
+        super().__init__()
+
+    def _setup(self):
+        self.panel = pn.widgets.MultiSelect(**self.kwargs)
+        self._register(self.panel, "_selected", "value")
+        self._register(None, "selected")
+        self.connect("_selected", self.select_one)
+
+    def _signal(self, *args, **kwargs):
+        super()._signal(*args, **kwargs)
+
+    def select_one(self, *_):
+        with self.ignore_events():
+            val = [self.panel.value[-1]] if self.panel.value else []
+            self.panel.value = val
+        self._emit("selected", self.panel.value)
+
+    def set_options(self, options):
+        self.panel.options = options
+
+    def clear(self):
+        self.panel.options = []
+
+    @property
+    def value(self):
+        return self.panel.value
+
+    def set_selection(self, selection):
+        self.panel.value = [selection]
+
+
+class FileSelector(SigSlot):
+    """Panel-based graphical file selector widget
+
+    Instances of this widget are interactive and can be displayed in jupyter by having
+    them as the output of a cell,  or in a separate browser tab using ``.show()``.
+    """
+
+    signals = [
+        "protocol_changed",
+        "selection_changed",
+        "directory_entered",
+        "home_clicked",
+        "up_clicked",
+        "go_clicked",
+        "filters_changed",
+    ]
+    slots = ["set_filters", "go_home"]
+
+    def __init__(self, url=None, filters=None, ignore=None, kwargs=None):
+        """
+
+        Parameters
+        ----------
+        url : str (optional)
+            Initial value of the URL to populate the dialog; should include protocol
+        filters : list(str) (optional)
+            File endings to include in the listings. If not included, all files are
+            allowed. Does not affect directories.
+            If given, the endings will appear as checkboxes in the interface
+        ignore : list(str) (optional)
+            Regex(s) of file basename patterns to ignore, e.g., "\\." for typical
+            hidden files on posix
+        kwargs : dict (optional)
+            To pass to file system instance
+        """
+        if url:
+            self.init_protocol, url = split_protocol(url)
+        else:
+            self.init_protocol, url = "file", os.getcwd()
+        self.init_url = url
+        self.init_kwargs = (kwargs if isinstance(kwargs, str) else str(kwargs)) or "{}"
+        self.filters = filters
+        self.ignore = [re.compile(i) for i in ignore or []]
+        self._fs = None
+        super().__init__()
+
+    def _setup(self):
+        self.url = pn.widgets.TextInput(
+            name="url",
+            value=self.init_url,
+            align="end",
+            sizing_mode="stretch_width",
+            width_policy="max",
+        )
+        self.protocol = pn.widgets.Select(
+            options=sorted(known_implementations),
+            value=self.init_protocol,
+            name="protocol",
+            align="center",
+        )
+        self.kwargs = pn.widgets.TextInput(
+            name="kwargs", value=self.init_kwargs, align="center"
+        )
+        self.go = pn.widgets.Button(name="⇨", align="end", width=45)
+        self.main = SingleSelect(size=10)
+        self.home = pn.widgets.Button(name="🏠", width=40, height=30, align="end")
+        self.up = pn.widgets.Button(name="‹", width=30, height=30, align="end")
+
+        self._register(self.protocol, "protocol_changed", auto=True)
+        self._register(self.go, "go_clicked", "clicks", auto=True)
+        self._register(self.up, "up_clicked", "clicks", auto=True)
+        self._register(self.home, "home_clicked", "clicks", auto=True)
+        self._register(None, "selection_changed")
+        self.main.connect("selected", self.selection_changed)
+        self._register(None, "directory_entered")
+        self.prev_protocol = self.protocol.value
+        self.prev_kwargs = self.storage_options
+
+        self.filter_sel = pn.widgets.CheckBoxGroup(
+            value=[], options=[], inline=False, align="end", width_policy="min"
+        )
+        self._register(self.filter_sel, "filters_changed", auto=True)
+
+        self.panel = pn.Column(
+            pn.Row(self.protocol, self.kwargs),
+            pn.Row(self.home, self.up, self.url, self.go, self.filter_sel),
+            self.main.panel,
+        )
+        self.set_filters(self.filters)
+        self.go_clicked()
+
+    def set_filters(self, filters=None):
+        self.filters = filters
+        if filters:
+            self.filter_sel.options = filters
+            self.filter_sel.value = filters
+        else:
+            self.filter_sel.options = []
+            self.filter_sel.value = []
+
+    @property
+    def storage_options(self):
+        """Value of the kwargs box as a dictionary"""
+        return ast.literal_eval(self.kwargs.value) or {}
+
+    @property
+    def fs(self):
+        """Current filesystem instance"""
+        if self._fs is None:
+            cls = get_filesystem_class(self.protocol.value)
+            self._fs = cls(**self.storage_options)
+        return self._fs
+
+    @property
+    def urlpath(self):
+        """URL of currently selected item"""
+        return (
+            (f"{self.protocol.value}://{self.main.value[0]}")
+            if self.main.value
+            else None
+        )
+
+    def open_file(self, mode="rb", compression=None, encoding=None):
+        """Create OpenFile instance for the currently selected item
+
+        For example, in a notebook you might do something like
+
+        .. code-block::
+
+            [ ]: sel = FileSelector(); sel
+
+            # user selects their file
+
+            [ ]: with sel.open_file('rb') as f:
+            ...      out = f.read()
+
+        Parameters
+        ----------
+        mode: str (optional)
+            Open mode for the file.
+        compression: str (optional)
+            The interact with the file as compressed. Set to 'infer' to guess
+            compression from the file ending
+        encoding: str (optional)
+            If using text mode, use this encoding; defaults to UTF8.
+        """
+        if self.urlpath is None:
+            raise ValueError("No file selected")
+        return OpenFile(self.fs, self.urlpath, mode, compression, encoding)
+
+    def filters_changed(self, values):
+        self.filters = values
+        self.go_clicked()
+
+    def selection_changed(self, *_):
+        if self.urlpath is None:
+            return
+        if self.fs.isdir(self.urlpath):
+            self.url.value = self.fs._strip_protocol(self.urlpath)
+        self.go_clicked()
+
+    def go_clicked(self, *_):
+        if (
+            self.prev_protocol != self.protocol.value
+            or self.prev_kwargs != self.storage_options
+        ):
+            self._fs = None  # causes fs to be recreated
+            self.prev_protocol = self.protocol.value
+            self.prev_kwargs = self.storage_options
+        listing = sorted(
+            self.fs.ls(self.url.value, detail=True), key=lambda x: x["name"]
+        )
+        listing = [
+            l
+            for l in listing
+            if not any(i.match(l["name"].rsplit("/", 1)[-1]) for i in self.ignore)
+        ]
+        folders = {
+            "📁 " + o["name"].rsplit("/", 1)[-1]: o["name"]
+            for o in listing
+            if o["type"] == "directory"
+        }
+        files = {
+            "📄 " + o["name"].rsplit("/", 1)[-1]: o["name"]
+            for o in listing
+            if o["type"] == "file"
+        }
+        if self.filters:
+            files = {
+                k: v
+                for k, v in files.items()
+                if any(v.endswith(ext) for ext in self.filters)
+            }
+        self.main.set_options(dict(**folders, **files))
+
+    def protocol_changed(self, *_):
+        self._fs = None
+        self.main.options = []
+        self.url.value = ""
+
+    def home_clicked(self, *_):
+        self.protocol.value = self.init_protocol
+        self.kwargs.value = self.init_kwargs
+        self.url.value = self.init_url
+        self.go_clicked()
+
+    def up_clicked(self, *_):
+        self.url.value = self.fs._parent(self.url.value)
+        self.go_clicked()