aboutsummaryrefslogtreecommitdiff
path: root/.venv/lib/python3.12/site-packages/fsspec/callbacks.py
diff options
context:
space:
mode:
Diffstat (limited to '.venv/lib/python3.12/site-packages/fsspec/callbacks.py')
-rw-r--r--.venv/lib/python3.12/site-packages/fsspec/callbacks.py324
1 files changed, 324 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/fsspec/callbacks.py b/.venv/lib/python3.12/site-packages/fsspec/callbacks.py
new file mode 100644
index 00000000..7ca99ca6
--- /dev/null
+++ b/.venv/lib/python3.12/site-packages/fsspec/callbacks.py
@@ -0,0 +1,324 @@
+from functools import wraps
+
+
+class Callback:
+ """
+ Base class and interface for callback mechanism
+
+ This class can be used directly for monitoring file transfers by
+ providing ``callback=Callback(hooks=...)`` (see the ``hooks`` argument,
+ below), or subclassed for more specialised behaviour.
+
+ Parameters
+ ----------
+ size: int (optional)
+ Nominal quantity for the value that corresponds to a complete
+ transfer, e.g., total number of tiles or total number of
+ bytes
+ value: int (0)
+ Starting internal counter value
+ hooks: dict or None
+ A dict of named functions to be called on each update. The signature
+ of these must be ``f(size, value, **kwargs)``
+ """
+
+ def __init__(self, size=None, value=0, hooks=None, **kwargs):
+ self.size = size
+ self.value = value
+ self.hooks = hooks or {}
+ self.kw = kwargs
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *exc_args):
+ self.close()
+
+ def close(self):
+ """Close callback."""
+
+ def branched(self, path_1, path_2, **kwargs):
+ """
+ Return callback for child transfers
+
+ If this callback is operating at a higher level, e.g., put, which may
+ trigger transfers that can also be monitored. The function returns a callback
+ that has to be passed to the child method, e.g., put_file,
+ as `callback=` argument.
+
+ The implementation uses `callback.branch` for compatibility.
+ When implementing callbacks, it is recommended to override this function instead
+ of `branch` and avoid calling `super().branched(...)`.
+
+ Prefer using this function over `branch`.
+
+ Parameters
+ ----------
+ path_1: str
+ Child's source path
+ path_2: str
+ Child's destination path
+ **kwargs:
+ Arbitrary keyword arguments
+
+ Returns
+ -------
+ callback: Callback
+ A callback instance to be passed to the child method
+ """
+ self.branch(path_1, path_2, kwargs)
+ # mutate kwargs so that we can force the caller to pass "callback=" explicitly
+ return kwargs.pop("callback", DEFAULT_CALLBACK)
+
+ def branch_coro(self, fn):
+ """
+ Wraps a coroutine, and pass a new child callback to it.
+ """
+
+ @wraps(fn)
+ async def func(path1, path2: str, **kwargs):
+ with self.branched(path1, path2, **kwargs) as child:
+ return await fn(path1, path2, callback=child, **kwargs)
+
+ return func
+
+ def set_size(self, size):
+ """
+ Set the internal maximum size attribute
+
+ Usually called if not initially set at instantiation. Note that this
+ triggers a ``call()``.
+
+ Parameters
+ ----------
+ size: int
+ """
+ self.size = size
+ self.call()
+
+ def absolute_update(self, value):
+ """
+ Set the internal value state
+
+ Triggers ``call()``
+
+ Parameters
+ ----------
+ value: int
+ """
+ self.value = value
+ self.call()
+
+ def relative_update(self, inc=1):
+ """
+ Delta increment the internal counter
+
+ Triggers ``call()``
+
+ Parameters
+ ----------
+ inc: int
+ """
+ self.value += inc
+ self.call()
+
+ def call(self, hook_name=None, **kwargs):
+ """
+ Execute hook(s) with current state
+
+ Each function is passed the internal size and current value
+
+ Parameters
+ ----------
+ hook_name: str or None
+ If given, execute on this hook
+ kwargs: passed on to (all) hook(s)
+ """
+ if not self.hooks:
+ return
+ kw = self.kw.copy()
+ kw.update(kwargs)
+ if hook_name:
+ if hook_name not in self.hooks:
+ return
+ return self.hooks[hook_name](self.size, self.value, **kw)
+ for hook in self.hooks.values() or []:
+ hook(self.size, self.value, **kw)
+
+ def wrap(self, iterable):
+ """
+ Wrap an iterable to call ``relative_update`` on each iterations
+
+ Parameters
+ ----------
+ iterable: Iterable
+ The iterable that is being wrapped
+ """
+ for item in iterable:
+ self.relative_update()
+ yield item
+
+ def branch(self, path_1, path_2, kwargs):
+ """
+ Set callbacks for child transfers
+
+ If this callback is operating at a higher level, e.g., put, which may
+ trigger transfers that can also be monitored. The passed kwargs are
+ to be *mutated* to add ``callback=``, if this class supports branching
+ to children.
+
+ Parameters
+ ----------
+ path_1: str
+ Child's source path
+ path_2: str
+ Child's destination path
+ kwargs: dict
+ arguments passed to child method, e.g., put_file.
+
+ Returns
+ -------
+
+ """
+ return None
+
+ def no_op(self, *_, **__):
+ pass
+
+ def __getattr__(self, item):
+ """
+ If undefined methods are called on this class, nothing happens
+ """
+ return self.no_op
+
+ @classmethod
+ def as_callback(cls, maybe_callback=None):
+ """Transform callback=... into Callback instance
+
+ For the special value of ``None``, return the global instance of
+ ``NoOpCallback``. This is an alternative to including
+ ``callback=DEFAULT_CALLBACK`` directly in a method signature.
+ """
+ if maybe_callback is None:
+ return DEFAULT_CALLBACK
+ return maybe_callback
+
+
+class NoOpCallback(Callback):
+ """
+ This implementation of Callback does exactly nothing
+ """
+
+ def call(self, *args, **kwargs):
+ return None
+
+
+class DotPrinterCallback(Callback):
+ """
+ Simple example Callback implementation
+
+ Almost identical to Callback with a hook that prints a char; here we
+ demonstrate how the outer layer may print "#" and the inner layer "."
+ """
+
+ def __init__(self, chr_to_print="#", **kwargs):
+ self.chr = chr_to_print
+ super().__init__(**kwargs)
+
+ def branch(self, path_1, path_2, kwargs):
+ """Mutate kwargs to add new instance with different print char"""
+ kwargs["callback"] = DotPrinterCallback(".")
+
+ def call(self, **kwargs):
+ """Just outputs a character"""
+ print(self.chr, end="")
+
+
+class TqdmCallback(Callback):
+ """
+ A callback to display a progress bar using tqdm
+
+ Parameters
+ ----------
+ tqdm_kwargs : dict, (optional)
+ Any argument accepted by the tqdm constructor.
+ See the `tqdm doc <https://tqdm.github.io/docs/tqdm/#__init__>`_.
+ Will be forwarded to `tqdm_cls`.
+ tqdm_cls: (optional)
+ subclass of `tqdm.tqdm`. If not passed, it will default to `tqdm.tqdm`.
+
+ Examples
+ --------
+ >>> import fsspec
+ >>> from fsspec.callbacks import TqdmCallback
+ >>> fs = fsspec.filesystem("memory")
+ >>> path2distant_data = "/your-path"
+ >>> fs.upload(
+ ".",
+ path2distant_data,
+ recursive=True,
+ callback=TqdmCallback(),
+ )
+
+ You can forward args to tqdm using the ``tqdm_kwargs`` parameter.
+
+ >>> fs.upload(
+ ".",
+ path2distant_data,
+ recursive=True,
+ callback=TqdmCallback(tqdm_kwargs={"desc": "Your tqdm description"}),
+ )
+
+ You can also customize the progress bar by passing a subclass of `tqdm`.
+
+ .. code-block:: python
+
+ class TqdmFormat(tqdm):
+ '''Provides a `total_time` format parameter'''
+ @property
+ def format_dict(self):
+ d = super().format_dict
+ total_time = d["elapsed"] * (d["total"] or 0) / max(d["n"], 1)
+ d.update(total_time=self.format_interval(total_time) + " in total")
+ return d
+
+ >>> with TqdmCallback(
+ tqdm_kwargs={
+ "desc": "desc",
+ "bar_format": "{total_time}: {percentage:.0f}%|{bar}{r_bar}",
+ },
+ tqdm_cls=TqdmFormat,
+ ) as callback:
+ fs.upload(".", path2distant_data, recursive=True, callback=callback)
+ """
+
+ def __init__(self, tqdm_kwargs=None, *args, **kwargs):
+ try:
+ from tqdm import tqdm
+
+ except ImportError as exce:
+ raise ImportError(
+ "Using TqdmCallback requires tqdm to be installed"
+ ) from exce
+
+ self._tqdm_cls = kwargs.pop("tqdm_cls", tqdm)
+ self._tqdm_kwargs = tqdm_kwargs or {}
+ self.tqdm = None
+ super().__init__(*args, **kwargs)
+
+ def call(self, *args, **kwargs):
+ if self.tqdm is None:
+ self.tqdm = self._tqdm_cls(total=self.size, **self._tqdm_kwargs)
+ self.tqdm.total = self.size
+ self.tqdm.update(self.value - self.tqdm.n)
+
+ def close(self):
+ if self.tqdm is not None:
+ self.tqdm.close()
+ self.tqdm = None
+
+ def __del__(self):
+ return self.close()
+
+
+DEFAULT_CALLBACK = _DEFAULT_CALLBACK = NoOpCallback()