diff options
author | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
---|---|---|
committer | S. Solomon Darnell | 2025-03-28 21:52:21 -0500 |
commit | 4a52a71956a8d46fcb7294ac71734504bb09bcc2 (patch) | |
tree | ee3dc5af3b6313e921cd920906356f5d4febc4ed /.venv/lib/python3.12/site-packages/tqdm/contrib/concurrent.py | |
parent | cc961e04ba734dd72309fb548a2f97d67d578813 (diff) | |
download | gn-ai-master.tar.gz |
Diffstat (limited to '.venv/lib/python3.12/site-packages/tqdm/contrib/concurrent.py')
-rw-r--r-- | .venv/lib/python3.12/site-packages/tqdm/contrib/concurrent.py | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/.venv/lib/python3.12/site-packages/tqdm/contrib/concurrent.py b/.venv/lib/python3.12/site-packages/tqdm/contrib/concurrent.py new file mode 100644 index 00000000..cd81d622 --- /dev/null +++ b/.venv/lib/python3.12/site-packages/tqdm/contrib/concurrent.py @@ -0,0 +1,105 @@ +""" +Thin wrappers around `concurrent.futures`. +""" +from contextlib import contextmanager +from operator import length_hint +from os import cpu_count + +from ..auto import tqdm as tqdm_auto +from ..std import TqdmWarning + +__author__ = {"github.com/": ["casperdcl"]} +__all__ = ['thread_map', 'process_map'] + + +@contextmanager +def ensure_lock(tqdm_class, lock_name=""): + """get (create if necessary) and then restore `tqdm_class`'s lock""" + old_lock = getattr(tqdm_class, '_lock', None) # don't create a new lock + lock = old_lock or tqdm_class.get_lock() # maybe create a new lock + lock = getattr(lock, lock_name, lock) # maybe subtype + tqdm_class.set_lock(lock) + yield lock + if old_lock is None: + del tqdm_class._lock + else: + tqdm_class.set_lock(old_lock) + + +def _executor_map(PoolExecutor, fn, *iterables, **tqdm_kwargs): + """ + Implementation of `thread_map` and `process_map`. + + Parameters + ---------- + tqdm_class : [default: tqdm.auto.tqdm]. + max_workers : [default: min(32, cpu_count() + 4)]. + chunksize : [default: 1]. + lock_name : [default: "":str]. + """ + kwargs = tqdm_kwargs.copy() + if "total" not in kwargs: + kwargs["total"] = length_hint(iterables[0]) + tqdm_class = kwargs.pop("tqdm_class", tqdm_auto) + max_workers = kwargs.pop("max_workers", min(32, cpu_count() + 4)) + chunksize = kwargs.pop("chunksize", 1) + lock_name = kwargs.pop("lock_name", "") + with ensure_lock(tqdm_class, lock_name=lock_name) as lk: + # share lock in case workers are already using `tqdm` + with PoolExecutor(max_workers=max_workers, initializer=tqdm_class.set_lock, + initargs=(lk,)) as ex: + return list(tqdm_class(ex.map(fn, *iterables, chunksize=chunksize), **kwargs)) + + +def thread_map(fn, *iterables, **tqdm_kwargs): + """ + Equivalent of `list(map(fn, *iterables))` + driven by `concurrent.futures.ThreadPoolExecutor`. + + Parameters + ---------- + tqdm_class : optional + `tqdm` class to use for bars [default: tqdm.auto.tqdm]. + max_workers : int, optional + Maximum number of workers to spawn; passed to + `concurrent.futures.ThreadPoolExecutor.__init__`. + [default: max(32, cpu_count() + 4)]. + """ + from concurrent.futures import ThreadPoolExecutor + return _executor_map(ThreadPoolExecutor, fn, *iterables, **tqdm_kwargs) + + +def process_map(fn, *iterables, **tqdm_kwargs): + """ + Equivalent of `list(map(fn, *iterables))` + driven by `concurrent.futures.ProcessPoolExecutor`. + + Parameters + ---------- + tqdm_class : optional + `tqdm` class to use for bars [default: tqdm.auto.tqdm]. + max_workers : int, optional + Maximum number of workers to spawn; passed to + `concurrent.futures.ProcessPoolExecutor.__init__`. + [default: min(32, cpu_count() + 4)]. + chunksize : int, optional + Size of chunks sent to worker processes; passed to + `concurrent.futures.ProcessPoolExecutor.map`. [default: 1]. + lock_name : str, optional + Member of `tqdm_class.get_lock()` to use [default: mp_lock]. + """ + from concurrent.futures import ProcessPoolExecutor + if iterables and "chunksize" not in tqdm_kwargs: + # default `chunksize=1` has poor performance for large iterables + # (most time spent dispatching items to workers). + longest_iterable_len = max(map(length_hint, iterables)) + if longest_iterable_len > 1000: + from warnings import warn + warn("Iterable length %d > 1000 but `chunksize` is not set." + " This may seriously degrade multiprocess performance." + " Set `chunksize=1` or more." % longest_iterable_len, + TqdmWarning, stacklevel=2) + if "lock_name" not in tqdm_kwargs: + tqdm_kwargs = tqdm_kwargs.copy() + tqdm_kwargs["lock_name"] = "mp_lock" + return _executor_map(ProcessPoolExecutor, fn, *iterables, **tqdm_kwargs) |